mcp_transcoder: add sse server response support.#45374
Conversation
Signed-off-by: Yilin Guo <guoyilin@google.com>
|
CC @envoyproxy/api-shepherds: Your approval is needed for changes made to |
|
/gemini review cc @paulhong01 |
There was a problem hiding this comment.
Code Review
This pull request introduces support for Server-Sent Events (SSE) responses in the MCP JSON-RPC REST bridge filter. It adds an SseResponseExtractor to parse incoming text/event-stream chunks on the fly and wrap completed event payloads into distinct text items within the JSON-RPC content array. The review feedback highlights two critical robustness and security concerns: first, nlohmann::json::dump() could throw an exception and crash Envoy if it encounters invalid UTF-8 in the untrusted upstream payload, which should be mitigated by using a safe error-handling replacement strategy; second, the unbounded buffering of incomplete SSE events poses a memory exhaustion (DoS) risk, necessitating a maximum buffer size limit.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| json content_item; | ||
| content_item[McpConstants::TYPE_FIELD] = McpConstants::TEXT_FIELD; | ||
| content_item[McpConstants::TEXT_FIELD] = event_payload; | ||
| std::string serialized_item = content_item.dump(); |
There was a problem hiding this comment.
By default, nlohmann::json::dump() throws an exception if it encounters invalid UTF-8 characters. Since event_payload is parsed from the upstream server's response (which is untrusted external data), receiving malformed UTF-8 could cause an uncaught exception and crash the Envoy process. To prevent this, use json::error_handler_t::replace to safely replace invalid UTF-8 sequences with the replacement character.
| std::string serialized_item = content_item.dump(); | |
| std::string serialized_item = content_item.dump(-1, ' ', false, json::error_handler_t::replace); |
There was a problem hiding this comment.
Waiting reviewer's opinion.
There was a problem hiding this comment.
Can we use JsonEscaper::escapeString?
| std::vector<std::string> SseResponseExtractor::processChunk(absl::string_view chunk, | ||
| bool end_stream) { | ||
| std::vector<std::string> event_payloads; | ||
| buffer_.append(chunk.data(), chunk.size()); |
There was a problem hiding this comment.
The buffer_ member variable accumulates incoming chunks of the SSE stream until a complete event (terminated by a double newline) is found. If a misbehaving or malicious upstream server sends an infinite stream of data without any double newlines, buffer_ will grow indefinitely, potentially leading to memory exhaustion (OOM) and a Denial of Service (DoS). Consider enforcing a maximum limit on the size of buffer_ and closing the connection or returning an error if the limit is exceeded.
There was a problem hiding this comment.
Waiting reviewer's opinion.
There was a problem hiding this comment.
I think it is better to apply the max_response_body_size limitation.
| std::vector<std::string> SseResponseExtractor::processChunk(absl::string_view chunk, | ||
| bool end_stream) { | ||
| std::vector<std::string> event_payloads; | ||
| buffer_.append(chunk.data(), chunk.size()); |
There was a problem hiding this comment.
I think it is better to apply the max_response_body_size limitation.
|
|
||
| while (!buffer_view.empty()) { | ||
| // Safely handles chunk boundaries and all line-ending formats | ||
| auto result = Http::Sse::SseParser::findEventEnd(buffer_view, end_stream); |
There was a problem hiding this comment.
minor nit: Since this type is not very obvious, consider avoid using auto here.
| json content_item; | ||
| content_item[McpConstants::TYPE_FIELD] = McpConstants::TEXT_FIELD; | ||
| content_item[McpConstants::TEXT_FIELD] = event_payload; | ||
| std::string serialized_item = content_item.dump(); |
There was a problem hiding this comment.
Can we use JsonEscaper::escapeString?
| // When enabled, the response body is streamed directly to the client without buffering. Each | ||
| // chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix. | ||
| // | ||
| // For Server-Sent Events (SSE) responses (i.e. ``text/event-stream`` content type), Envoy parses |
There was a problem hiding this comment.
Are we going to support SSE case when text_content_streaming_enabled: False in the follow-up change?
|
|
||
| // Processes an incoming chunk of an SSE stream and returns any completed event payloads. | ||
| // Set end_stream to true if this is the final chunk of the response. | ||
| std::vector<std::string> processChunk(absl::string_view chunk, bool end_stream = false); |
There was a problem hiding this comment.
nit: Default it to false doesn't seem to be needed since the current logic is passing end_stream from encodeData
| std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); | ||
|
|
||
| std::string output_to_add; | ||
| if (is_sse_response_) { |
There was a problem hiding this comment.
nit/optional: consider wrapping this logic into a utility function since encodeData has contained more logic and long.
Commit Message: mcp_transcoder: add sse server response support.
Additional Description:
This PR introduces Server-Sent Events (SSE) server response support to the MCP JSON REST bridge filter. When the upstream server responds with a
text/event-streamcontent type, the filter will now parse the stream and wrap each extracted complete event payload into a distinct text content item within the JSON-RPCcontentarray on the fly.Key Changes:
SseResponseExtractor: Introducedsse_response_extractor.ccand its header to process incoming chunks of an SSE stream and return completed event payloads.McpJsonRestBridgeFilter: Modified the encoding path to check for thetext/event-streamcontent type. It now delegates chunk processing to the new SSE extractor and formats the results into the JSON-RPC streaming envelope.sse_response_extractor_test.ccto verify the handling of single/multiple events, multiline data, incomplete chunked events, and CRLF line endings. Also addedToolsCallSseStreamingTranscodingto integration tests.Risk Level: Low
Testing: Unit and Integration tests.
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A