From 8e5bcf2758ebfc9753af701ba189b0e5aca79eab Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Mon, 1 Jun 2026 21:37:40 +0000 Subject: [PATCH 1/3] mcp_transcoder: add sse server response support. Signed-off-by: Yilin Guo --- .../v3/mcp_json_rest_bridge.proto | 4 + .../filters/http/mcp_json_rest_bridge/BUILD | 11 +++ .../mcp_json_rest_bridge_filter.cc | 62 ++++++++++-- .../mcp_json_rest_bridge_filter.h | 6 ++ .../sse_response_extractor.cc | 53 +++++++++++ .../sse_response_extractor.h | 28 ++++++ .../filters/http/mcp_json_rest_bridge/BUILD | 9 ++ .../mcp_json_rest_bridge_filter_test.cc | 23 +++++ .../mcp_json_rest_bridge_integration_test.cc | 85 +++++++++++++++++ .../sse_response_extractor_test.cc | 94 +++++++++++++++++++ 10 files changed, 369 insertions(+), 6 deletions(-) create mode 100644 source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc create mode 100644 source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h create mode 100644 test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc diff --git a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto index 5ef4483c4f2d6..f5ce58c45254a 100644 --- a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto +++ b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto @@ -233,6 +233,10 @@ message ToolConfig { // When enabled, the response body is streamed directly to the client without buffering. Each // chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix. // + // For Server-Sent Events (SSE) responses (i.e. ``text/event-stream`` content type), Envoy parses + // the stream and wraps each extracted complete event payload into a distinct text content item + // within the JSON-RPC ``content`` array on the fly. + // // Streaming flow: // // .. code-block:: text diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD index b7df6e51323bc..d0d48c670599e 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -15,6 +15,7 @@ envoy_cc_library( hdrs = ["mcp_json_rest_bridge_filter.h"], deps = [ ":http_request_builder_lib", + ":sse_response_extractor_lib", "//envoy/http:filter_interface", "//envoy/server:filter_config_interface", "//source/common/common:logger_lib", @@ -51,3 +52,13 @@ envoy_cc_library( "@envoy_api//envoy/extensions/filters/http/mcp_json_rest_bridge/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "sse_response_extractor_lib", + srcs = ["sse_response_extractor.cc"], + hdrs = ["sse_response_extractor.h"], + deps = [ + "//source/common/http/sse:sse_parser_lib", + "@abseil-cpp//absl/strings", + ], +) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 3f1234f3cd579..df1d45477159b 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -12,6 +12,7 @@ #include "source/common/protobuf/utility.h" #include "source/extensions/filters/common/mcp/constants.h" #include "source/extensions/filters/http/mcp_json_rest_bridge/http_request_builder.h" +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" #include "absl/base/no_destructor.h" #include "absl/container/flat_hash_set.h" @@ -35,6 +36,13 @@ namespace McpConstants = Envoy::Extensions::Filters::Common::Mcp::McpConstants; constexpr uint32_t DEFAULT_MAX_REQUEST_BODY_SIZE = 1024 * 64; // 64KB constexpr uint32_t DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024 * 1024; // 1MB +// Check if content type is text/event-stream, ignoring parameters like charset. +// HTTP Content-Type is case-insensitive. +bool isSseContentType(absl::string_view content_type) { + absl::string_view normalized = StringUtil::trim(StringUtil::cropRight(content_type, ";")); + return absl::EqualsIgnoreCase(normalized, Http::Headers::get().ContentTypeValues.TextEventStream); +} + bool isMcpProtocolVersionSupported(absl::string_view protocol_version) { static const absl::NoDestructor> supported_mcp_versions({ McpConstants::LATEST_SUPPORTED_MCP_VERSION, @@ -276,6 +284,7 @@ McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers // (final size is unknown), and let the headers flow through immediately so // the client can start receiving data without waiting for the full body. if (mcp_operation_ == McpOperation::ToolsCall && text_content_streaming_enabled_) { + is_sse_response_ = isSseContentType(response_headers.getContentTypeValue()); buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= static_cast(Http::Code::BadRequest)); response_headers.removeContentLength(); @@ -309,9 +318,26 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat // is false on the last data frame). It is a no-op here; the suffix will be appended in // encodeTrailers. absl::string_view chunk(static_cast(data.linearize(len)), len); - // TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if - // it needs buffering. - std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); + + std::string output_to_add; + if (is_sse_response_) { + std::vector event_payloads = + sse_response_extractor_.processChunk(chunk, end_stream); + for (const auto& event_payload : event_payloads) { + json content_item; + content_item[McpConstants::TYPE_FIELD] = McpConstants::TEXT_FIELD; + content_item[McpConstants::TEXT_FIELD] = event_payload; + std::string serialized_item = content_item.dump(); + if (!is_first_sse_event_) { + absl::StrAppend(&output_to_add, ","); + } else { + is_first_sse_event_ = false; + } + absl::StrAppend(&output_to_add, serialized_item); + } + } else { + output_to_add = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); + } data.drain(len); // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted @@ -322,14 +348,16 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat if (is_first_streaming_chunk_) { ENVOY_STREAM_LOG(debug, "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, escaped_chunk.size()); + *encoder_callbacks_, len, output_to_add.size()); data.add(streaming_json_prefix_); is_first_streaming_chunk_ = false; } else { ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, escaped_chunk.size()); + *encoder_callbacks_, len, output_to_add.size()); } - data.add(escaped_chunk); + + data.add(output_to_add); + // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. // This is body + trailer case where encodeTrailer call represents the end of response body. // In that case, we should add the streaming_json_suffix at encodeTrailer call. @@ -382,6 +410,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons } void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { + if (is_sse_response_) { + json ref = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, *session_id_}, + {McpConstants::RESULT_FIELD, + { + {McpConstants::CONTENT_FIELD, json::array()}, + {McpConstants::IS_ERROR_FIELD, is_error}, + }}, + }; + std::string ref_json = ref.dump(); + std::string marker = absl::StrCat("\"", McpConstants::CONTENT_FIELD, "\":[]"); + size_t pos = ref_json.find(marker); + if (pos == std::string::npos) { + IS_ENVOY_BUG("JSON-RPC streaming marker not found in serialized envelope"); + return; + } + streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); + streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); + return; + } + // Build a reference JSON-RPC envelope with an empty text placeholder. json ref = { {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index ff7aee1420293..ced4ba4e728c0 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -12,6 +12,7 @@ #include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/extensions/filters/http/common/pass_through_filter.h" +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" #include "absl/container/flat_hash_map.h" #include "absl/status/statusor.h" @@ -143,6 +144,11 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, std::string streaming_json_suffix_; bool is_first_streaming_chunk_ = true; + // Whether the response is SSE. + bool is_sse_response_ = false; + bool is_first_sse_event_ = true; + SseResponseExtractor sse_response_extractor_; + McpJsonRestBridgeFilterConfigSharedPtr config_; }; diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc new file mode 100644 index 0000000000000..96d891560c6a0 --- /dev/null +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc @@ -0,0 +1,53 @@ +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" + +#include +#include +#include + +#include "source/common/http/sse/sse_parser.h" + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { + +std::vector SseResponseExtractor::processChunk(absl::string_view chunk, + bool end_stream) { + std::vector event_payloads; + buffer_.append(chunk.data(), chunk.size()); + + absl::string_view buffer_view = buffer_; + const uint64_t length = buffer_.size(); + + while (!buffer_view.empty()) { + // Safely handles chunk boundaries and all line-ending formats + auto result = Http::Sse::SseParser::findEventEnd(buffer_view, end_stream); + + // npos means the event hasn't reached a double blank line yet + if (result.event_start == absl::string_view::npos) { + break; + } + + absl::string_view event_str = + buffer_view.substr(result.event_start, result.event_end - result.event_start); + + Http::Sse::SseParser::ParsedEvent event = Http::Sse::SseParser::parseEvent(event_str); + + if (event.data.has_value()) { + event_payloads.push_back(*std::move(event.data)); + } + + buffer_view = buffer_view.substr(result.next_start); + } + + buffer_.erase(0, length - buffer_view.size()); + + return event_payloads; +} + +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h new file mode 100644 index 0000000000000..44ab57fe0e5a0 --- /dev/null +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { + +class SseResponseExtractor { +public: + SseResponseExtractor() = default; + + // Processes an incoming chunk of an SSE stream and returns any completed event payloads. + // Set end_stream to true if this is the final chunk of the response. + std::vector processChunk(absl::string_view chunk, bool end_stream = false); + +private: + std::string buffer_; +}; + +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD index c24da6f9e270c..88b00552b7a64 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -53,3 +53,12 @@ envoy_cc_test( "@nlohmann_json//:json", ], ) + +envoy_cc_test( + name = "sse_response_extractor_test", + srcs = ["sse_response_extractor_test.cc"], + deps = [ + "//source/extensions/filters/http/mcp_json_rest_bridge:sse_response_extractor_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc index c01d42a8350df..1a747b5a22529 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc @@ -1435,6 +1435,29 @@ TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) { R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json")); } +TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseStreaming) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-type", "text/event-stream"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + + Buffer::OwnedImpl chunk1("data: {\"a\": 1}\n\n"); + EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue); + EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":123,")); + + Buffer::OwnedImpl chunk2("data: {\"b\": 2}\n\n"); + EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_THAT(chunk2.toString(), testing::EndsWith("}}")); + + const std::string full = chunk1.toString() + chunk2.toString(); + EXPECT_EQ( + nlohmann::json::parse(full), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"{\"a\": 1}","type":"text"},{"text":"{\"b\": 2}","type":"text"}],"isError":false}})json")); +} + class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override { diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc index 788c52e1b32b5..81b09bfb78af5 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc @@ -630,6 +630,91 @@ TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) { EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); } +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallSseStreamingTranscoding) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 321, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers().getMethodValue(), StrEq("POST")); + EXPECT_THAT(upstream_request_->headers().getPathValue(), StrEq("/v1/projects/foo/keys")); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(200); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.TextEventStream); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl chunk1; + chunk1.add("data: {\"a\": 1}\n\n"); + upstream_request_->encodeData(chunk1, false); + + Buffer::OwnedImpl chunk2; + chunk2.add("data: {\"b\": 2}\n\n"); + upstream_request_->encodeData(chunk2, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("200")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 321, + "result": { + "content": [ + { + "type": "text", + "text": "{\"a\": 1}" + }, + { + "type": "text", + "text": "{\"b\": 2}" + } + ], + "isError": false + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) { const std::string config = R"EOF( name: envoy.filters.http.mcp_json_rest_bridge diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc new file mode 100644 index 0000000000000..ee84e5c3fbe76 --- /dev/null +++ b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc @@ -0,0 +1,94 @@ +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { +namespace { + +using ::testing::ElementsAre; +using ::testing::IsEmpty; + +class SseResponseExtractorTest : public testing::Test { +protected: + SseResponseExtractor extractor_; +}; + +TEST_F(SseResponseExtractorTest, ProcessSingleCompleteEvent) { + auto payloads = extractor_.processChunk("data: hello world\n\n"); + EXPECT_THAT(payloads, ElementsAre("hello world")); +} + +TEST_F(SseResponseExtractorTest, ProcessMultipleEvents) { + auto payloads = extractor_.processChunk("data: first event\n\ndata: second event\n\n"); + EXPECT_THAT(payloads, ElementsAre("first event", "second event")); +} + +TEST_F(SseResponseExtractorTest, ProcessIncompleteEvent) { + // First chunk has incomplete event, should return nothing. + auto payloads1 = extractor_.processChunk("data: first"); + EXPECT_THAT(payloads1, IsEmpty()); + + // Second chunk completes the event. + auto payloads2 = extractor_.processChunk(" event\n\n"); + EXPECT_THAT(payloads2, ElementsAre("first event")); +} + +TEST_F(SseResponseExtractorTest, ProcessMultipleChunks) { + EXPECT_THAT(extractor_.processChunk("da"), IsEmpty()); + EXPECT_THAT(extractor_.processChunk("ta: hello"), IsEmpty()); + EXPECT_THAT(extractor_.processChunk(" world\n"), IsEmpty()); + EXPECT_THAT(extractor_.processChunk("\n"), ElementsAre("hello world")); +} + +TEST_F(SseResponseExtractorTest, ProcessCommentsOnly) { + auto payloads = extractor_.processChunk(": this is a comment\n\n"); + EXPECT_THAT(payloads, IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessNoDataEvent) { + auto payloads = extractor_.processChunk("event: ping\nid: 123\n\n"); + EXPECT_THAT(payloads, IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessCRLFLineEndings) { + auto payloads = extractor_.processChunk("data: crlf test\r\n\r\n"); + EXPECT_THAT(payloads, ElementsAre("crlf test")); +} + +TEST_F(SseResponseExtractorTest, ProcessEndStreamWithIncompleteEvent) { + // Without end_stream, incomplete event is buffered. + EXPECT_THAT(extractor_.processChunk("data: last event"), IsEmpty()); + + EXPECT_THAT(extractor_.processChunk("", /*end_stream=*/true), IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessMultilineData) { + auto payloads = extractor_.processChunk("data: line one\ndata: line two\n\n"); + EXPECT_THAT(payloads, ElementsAre("line one\nline two")); +} + +TEST_F(SseResponseExtractorTest, ProcessJSONPayload) { + auto payloads = extractor_.processChunk("data: {\"foo\": \"bar\"}\n\n"); + EXPECT_THAT(payloads, ElementsAre("{\"foo\": \"bar\"}")); +} + +TEST_F(SseResponseExtractorTest, ProcessEmptyChunk) { + auto payloads = extractor_.processChunk(""); + EXPECT_THAT(payloads, IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessMixedEvents) { + auto payloads = + extractor_.processChunk("event: ping\n\ndata: hello\n\n: comment\n\ndata: world\n\n"); + EXPECT_THAT(payloads, ElementsAre("hello", "world")); +} + +} // namespace +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy From d8b5c788a2ea847c588d0509b87863c029552e4c Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Tue, 2 Jun 2026 23:57:57 +0000 Subject: [PATCH 2/3] Address comments. Signed-off-by: Yilin Guo --- .../filters/http/mcp_json_rest_bridge/BUILD | 2 + .../mcp_json_rest_bridge_filter.cc | 142 +++++++++++------- .../mcp_json_rest_bridge_filter.h | 11 +- .../sse_response_extractor.cc | 12 +- .../sse_response_extractor.h | 7 +- .../filters/http/mcp_json_rest_bridge/BUILD | 1 + .../mcp_json_rest_bridge_filter_test.cc | 37 +++++ .../sse_response_extractor_test.cc | 70 +++++---- 8 files changed, 193 insertions(+), 89 deletions(-) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD index d0d48c670599e..1ee868444d395 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -59,6 +59,8 @@ envoy_cc_library( hdrs = ["sse_response_extractor.h"], deps = [ "//source/common/http/sse:sse_parser_lib", + "@abseil-cpp//absl/status", + "@abseil-cpp//absl/status:statusor", "@abseil-cpp//absl/strings", ], ) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index df1d45477159b..db75e08e4eb04 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -313,59 +313,7 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without // buffering the full response body. if (!streaming_json_prefix_.empty()) { - uint64_t len = data.length(); - // Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream - // is false on the last data frame). It is a no-op here; the suffix will be appended in - // encodeTrailers. - absl::string_view chunk(static_cast(data.linearize(len)), len); - - std::string output_to_add; - if (is_sse_response_) { - std::vector event_payloads = - sse_response_extractor_.processChunk(chunk, end_stream); - for (const auto& event_payload : event_payloads) { - json content_item; - content_item[McpConstants::TYPE_FIELD] = McpConstants::TEXT_FIELD; - content_item[McpConstants::TEXT_FIELD] = event_payload; - std::string serialized_item = content_item.dump(); - if (!is_first_sse_event_) { - absl::StrAppend(&output_to_add, ","); - } else { - is_first_sse_event_ = false; - } - absl::StrAppend(&output_to_add, serialized_item); - } - } else { - output_to_add = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); - } - - data.drain(len); - // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted - // in the streaming fast-path due to the stateless nature of chunk processing (which lacks - // a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits). - // If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is, - // which may cause the client to fail parsing the final JSON. - if (is_first_streaming_chunk_) { - ENVOY_STREAM_LOG(debug, - "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, output_to_add.size()); - data.add(streaming_json_prefix_); - is_first_streaming_chunk_ = false; - } else { - ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, output_to_add.size()); - } - - data.add(output_to_add); - - // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. - // This is body + trailer case where encodeTrailer call represents the end of response body. - // In that case, we should add the streaming_json_suffix at encodeTrailer call. - if (end_stream) { - ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); - data.add(streaming_json_suffix_); - } - return Http::FilterDataStatus::Continue; + return encodeStreamingData(data, end_stream); } const uint32_t max_response_body_size = config_->maxResponseBodySize(); @@ -750,6 +698,94 @@ void McpJsonRestBridgeFilter::setDynamicMetadata(absl::string_view method, *decoder_callbacks_, metadata.DebugString()); } +Http::FilterDataStatus McpJsonRestBridgeFilter::encodeStreamingData(Buffer::Instance& data, + bool end_stream) { + uint64_t len = data.length(); + // Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream + // is false on the last data frame). It is a no-op here; the suffix will be appended in + // encodeTrailers. + absl::string_view chunk(static_cast(data.linearize(len)), len); + + absl::StatusOr payload = prepareStreamingPayload(chunk, end_stream); + if (!payload.ok()) { + ENVOY_STREAM_LOG(error, "Streaming payload preparation error: {}", *encoder_callbacks_, + payload.status()); + json error_json = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, session_id_.has_value() ? *session_id_ : json(nullptr)}, + {McpConstants::ERROR_FIELD, generateErrorJsonResponse(-32000, payload.status().message())}}; + encoder_callbacks_->sendLocalReply( + Http::Code::InternalServerError, error_json.dump(), + [](Http::ResponseHeaderMap& headers) { + headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + }, + Grpc::Status::WellKnownGrpcStatus::Internal, + "mcp_json_rest_bridge_filter_streaming_payload_preparation_error"); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + std::string output_to_add = *std::move(payload); + + data.drain(len); + // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted + // in the streaming fast-path due to the stateless nature of chunk processing (which lacks + // a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits). + // If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is, + // which may cause the client to fail parsing the final JSON. + if (is_first_streaming_chunk_) { + ENVOY_STREAM_LOG(debug, + "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, output_to_add.size()); + data.add(streaming_json_prefix_); + is_first_streaming_chunk_ = false; + } else { + ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, output_to_add.size()); + } + + data.add(output_to_add); + + // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. + // This is body + trailer case where encodeTrailer call represents the end of response body. + // In that case, we should add the streaming_json_suffix at encodeTrailer call. + if (end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); + data.add(streaming_json_suffix_); + } + return Http::FilterDataStatus::Continue; +} + +absl::StatusOr McpJsonRestBridgeFilter::processSseResponse(absl::string_view chunk, + bool end_stream) { + absl::StatusOr> event_payloads = + sse_response_extractor_.processChunk(chunk, end_stream); + if (!event_payloads.ok()) { + return event_payloads.status(); + } + std::string output; + for (const auto& event_payload : *event_payloads) { + std::string escaped_payload = + JsonEscaper::escapeString(event_payload, JsonEscaper::extraSpace(event_payload)); + std::string serialized_item = + absl::StrCat("{\"", McpConstants::TYPE_FIELD, "\":\"", McpConstants::TEXT_FIELD, "\",\"", + McpConstants::TEXT_FIELD, "\":\"", escaped_payload, "\"}"); + if (!is_first_sse_event_) { + absl::StrAppend(&output, ","); + } else { + is_first_sse_event_ = false; + } + absl::StrAppend(&output, serialized_item); + } + return output; +} + +absl::StatusOr +McpJsonRestBridgeFilter::prepareStreamingPayload(absl::string_view chunk, bool end_stream) { + if (is_sse_response_) { + return processSseResponse(chunk, end_stream); + } + return JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); +} + absl::Status McpJsonRestBridgeFilter::validateJsonRpcIdAndMethod(const nlohmann::json& json_rpc) { absl::StatusOr session_id = getSessionId(json_rpc); if (session_id.ok()) { diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index ced4ba4e728c0..5bd1b8f45e5d0 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -74,7 +74,7 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, public Logger::Loggable { public: explicit McpJsonRestBridgeFilter(McpJsonRestBridgeFilterConfigSharedPtr config) - : config_(config) {} + : sse_response_extractor_(config->maxResponseBodySize()), config_(config) {} // Http::StreamDecoderFilter Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, @@ -111,6 +111,15 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, // Builds streaming_json_prefix_ and streaming_json_suffix_ for the tools/call streaming path. void buildStreamingPrefixAndSuffix(bool is_error); + // Encodes incoming data chunks for streaming MCP tool calls. + Http::FilterDataStatus encodeStreamingData(Buffer::Instance& data, bool end_stream); + + // Processes an SSE response chunk and returns the serialized JSON event payloads. + absl::StatusOr processSseResponse(absl::string_view chunk, bool end_stream); + + // Prepares the escaped/formatted payload string for streaming. + absl::StatusOr prepareStreamingPayload(absl::string_view chunk, bool end_stream); + enum class McpOperation { Unspecified = 0, // Received the "/mcp" URL but has not parsed the request body yet. diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc index 96d891560c6a0..dab4bbd7aa8a3 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc @@ -6,6 +6,8 @@ #include "source/common/http/sse/sse_parser.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" #include "absl/strings/string_view.h" namespace Envoy { @@ -13,8 +15,11 @@ namespace Extensions { namespace HttpFilters { namespace McpJsonRestBridge { -std::vector SseResponseExtractor::processChunk(absl::string_view chunk, - bool end_stream) { +absl::StatusOr> SseResponseExtractor::processChunk(absl::string_view chunk, + bool end_stream) { + if (max_response_body_size_ > 0 && (buffer_.size() + chunk.size()) > max_response_body_size_) { + return absl::InvalidArgumentError("Response body limit exceeded"); + } std::vector event_payloads; buffer_.append(chunk.data(), chunk.size()); @@ -23,7 +28,8 @@ std::vector SseResponseExtractor::processChunk(absl::string_view ch while (!buffer_view.empty()) { // Safely handles chunk boundaries and all line-ending formats - auto result = Http::Sse::SseParser::findEventEnd(buffer_view, end_stream); + Http::Sse::SseParser::FindEventEndResult result = + Http::Sse::SseParser::findEventEnd(buffer_view, end_stream); // npos means the event hasn't reached a double blank line yet if (result.event_start == absl::string_view::npos) { diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h index 44ab57fe0e5a0..8c5c788e81ab9 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h @@ -3,6 +3,7 @@ #include #include +#include "absl/status/statusor.h" #include "absl/strings/string_view.h" namespace Envoy { @@ -12,14 +13,16 @@ namespace McpJsonRestBridge { class SseResponseExtractor { public: - SseResponseExtractor() = default; + explicit SseResponseExtractor(uint64_t max_response_body_size = 0) + : max_response_body_size_(max_response_body_size) {} // Processes an incoming chunk of an SSE stream and returns any completed event payloads. // Set end_stream to true if this is the final chunk of the response. - std::vector processChunk(absl::string_view chunk, bool end_stream = false); + absl::StatusOr> processChunk(absl::string_view chunk, bool end_stream); private: std::string buffer_; + const uint64_t max_response_body_size_; }; } // namespace McpJsonRestBridge diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD index 88b00552b7a64..269d60754cdd2 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -59,6 +59,7 @@ envoy_cc_test( srcs = ["sse_response_extractor_test.cc"], deps = [ "//source/extensions/filters/http/mcp_json_rest_bridge:sse_response_extractor_lib", + "//test/test_common:status_utility_lib", "//test/test_common:utility_lib", ], ) diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc index 1a747b5a22529..a50b634c61802 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc @@ -1458,6 +1458,43 @@ TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseStreaming) { R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"{\"a\": 1}","type":"text"},{"text":"{\"b\": 2}","type":"text"}],"isError":false}})json")); } +TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseExceedsMaxResponseBodySize) { + envoy::extensions::filters::http::mcp_json_rest_bridge::v3::McpJsonRestBridge proto_config; + TestUtility::loadFromYaml(R"yaml( + tool_config: + tools: + - name: "get_api_key" + http_rule: + get: "/v1/apiKeys" + text_content_streaming_enabled: true + max_response_body_size: 10 + )yaml", + proto_config); + config_ = std::make_shared(proto_config); + filter_ = std::make_unique(config_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); + + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-type", "text/event-stream"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + EXPECT_CALL( + encoder_callbacks_, + sendLocalReply( + Eq(Http::Code::InternalServerError), + StrEq( + R"json({"error":{"code":-32000,"message":"Response body limit exceeded"},"id":123,"jsonrpc":"2.0"})json"), + _, Eq(Grpc::Status::WellKnownGrpcStatus::Internal), + StrEq("mcp_json_rest_bridge_filter_streaming_payload_preparation_error"))); + + Buffer::OwnedImpl chunk("data: too long payload\n\n"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/false), + Http::FilterDataStatus::StopIterationNoBuffer); +} + class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override { diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc index ee84e5c3fbe76..e1db6d15038d3 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc @@ -1,5 +1,7 @@ #include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" +#include "test/test_common/status_utility.h" + #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -9,6 +11,7 @@ namespace HttpFilters { namespace McpJsonRestBridge { namespace { +using ::Envoy::StatusHelpers::StatusIs; using ::testing::ElementsAre; using ::testing::IsEmpty; @@ -18,73 +21,80 @@ class SseResponseExtractorTest : public testing::Test { }; TEST_F(SseResponseExtractorTest, ProcessSingleCompleteEvent) { - auto payloads = extractor_.processChunk("data: hello world\n\n"); - EXPECT_THAT(payloads, ElementsAre("hello world")); + EXPECT_THAT(*extractor_.processChunk("data: hello world\n\n", /*end_stream=*/false), + ElementsAre("hello world")); } TEST_F(SseResponseExtractorTest, ProcessMultipleEvents) { - auto payloads = extractor_.processChunk("data: first event\n\ndata: second event\n\n"); - EXPECT_THAT(payloads, ElementsAre("first event", "second event")); + EXPECT_THAT( + *extractor_.processChunk("data: first event\n\ndata: second event\n\n", /*end_stream=*/false), + ElementsAre("first event", "second event")); } TEST_F(SseResponseExtractorTest, ProcessIncompleteEvent) { // First chunk has incomplete event, should return nothing. - auto payloads1 = extractor_.processChunk("data: first"); - EXPECT_THAT(payloads1, IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("data: first", /*end_stream=*/false), IsEmpty()); // Second chunk completes the event. - auto payloads2 = extractor_.processChunk(" event\n\n"); - EXPECT_THAT(payloads2, ElementsAre("first event")); + EXPECT_THAT(*extractor_.processChunk(" event\n\n", /*end_stream=*/false), + ElementsAre("first event")); } TEST_F(SseResponseExtractorTest, ProcessMultipleChunks) { - EXPECT_THAT(extractor_.processChunk("da"), IsEmpty()); - EXPECT_THAT(extractor_.processChunk("ta: hello"), IsEmpty()); - EXPECT_THAT(extractor_.processChunk(" world\n"), IsEmpty()); - EXPECT_THAT(extractor_.processChunk("\n"), ElementsAre("hello world")); + EXPECT_THAT(*extractor_.processChunk("da", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("ta: hello", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk(" world\n", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("\n", /*end_stream=*/false), ElementsAre("hello world")); } TEST_F(SseResponseExtractorTest, ProcessCommentsOnly) { - auto payloads = extractor_.processChunk(": this is a comment\n\n"); - EXPECT_THAT(payloads, IsEmpty()); + EXPECT_THAT(*extractor_.processChunk(": this is a comment\n\n", /*end_stream=*/false), IsEmpty()); } TEST_F(SseResponseExtractorTest, ProcessNoDataEvent) { - auto payloads = extractor_.processChunk("event: ping\nid: 123\n\n"); - EXPECT_THAT(payloads, IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("event: ping\nid: 123\n\n", /*end_stream=*/false), + IsEmpty()); } TEST_F(SseResponseExtractorTest, ProcessCRLFLineEndings) { - auto payloads = extractor_.processChunk("data: crlf test\r\n\r\n"); - EXPECT_THAT(payloads, ElementsAre("crlf test")); + EXPECT_THAT(*extractor_.processChunk("data: crlf test\r\n\r\n", /*end_stream=*/false), + ElementsAre("crlf test")); } TEST_F(SseResponseExtractorTest, ProcessEndStreamWithIncompleteEvent) { // Without end_stream, incomplete event is buffered. - EXPECT_THAT(extractor_.processChunk("data: last event"), IsEmpty()); - - EXPECT_THAT(extractor_.processChunk("", /*end_stream=*/true), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("data: last event", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("", /*end_stream=*/true), IsEmpty()); } TEST_F(SseResponseExtractorTest, ProcessMultilineData) { - auto payloads = extractor_.processChunk("data: line one\ndata: line two\n\n"); - EXPECT_THAT(payloads, ElementsAre("line one\nline two")); + EXPECT_THAT(*extractor_.processChunk("data: line one\ndata: line two\n\n", /*end_stream=*/false), + ElementsAre("line one\nline two")); } TEST_F(SseResponseExtractorTest, ProcessJSONPayload) { - auto payloads = extractor_.processChunk("data: {\"foo\": \"bar\"}\n\n"); - EXPECT_THAT(payloads, ElementsAre("{\"foo\": \"bar\"}")); + EXPECT_THAT(*extractor_.processChunk("data: {\"foo\": \"bar\"}\n\n", /*end_stream=*/false), + ElementsAre("{\"foo\": \"bar\"}")); } TEST_F(SseResponseExtractorTest, ProcessEmptyChunk) { - auto payloads = extractor_.processChunk(""); - EXPECT_THAT(payloads, IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("", /*end_stream=*/false), IsEmpty()); } TEST_F(SseResponseExtractorTest, ProcessMixedEvents) { - auto payloads = - extractor_.processChunk("event: ping\n\ndata: hello\n\n: comment\n\ndata: world\n\n"); - EXPECT_THAT(payloads, ElementsAre("hello", "world")); + EXPECT_THAT(*extractor_.processChunk("event: ping\n\ndata: hello\n\n: comment\n\ndata: world\n\n", + /*end_stream=*/false), + ElementsAre("hello", "world")); +} + +TEST(SseResponseExtractorLimitTest, EnforcesLimit) { + SseResponseExtractor limited_extractor(10); + // First chunk is fine (9 bytes) + EXPECT_THAT(*limited_extractor.processChunk("data: foo", /*end_stream=*/false), IsEmpty()); + + // Second chunk exceeds the limit (9 + 2 = 11 > 10) + EXPECT_THAT(limited_extractor.processChunk("\n\n", /*end_stream=*/false), + StatusIs(absl::StatusCode::kInvalidArgument)); } } // namespace From 6a74d710bd05e54ca1c039cb5f59764b703db868 Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Wed, 3 Jun 2026 22:41:43 +0000 Subject: [PATCH 3/3] Address comments. Signed-off-by: Yilin Guo --- .../http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index db75e08e4eb04..7f2be1850472d 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -341,7 +341,7 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat if (!end_stream) { return Http::FilterDataStatus::StopIterationNoBuffer; } - + // TODO(guoyilin42): Add SSE response support for non-streaming path using the buffered body. encodeJsonRpcData(encoder_callbacks_->responseHeaders()); data.add(response_body_str_); response_body_str_.clear();