diff --git a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto index 5ef4483c4f2d6..f5ce58c45254a 100644 --- a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto +++ b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto @@ -233,6 +233,10 @@ message ToolConfig { // When enabled, the response body is streamed directly to the client without buffering. Each // chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix. // + // For Server-Sent Events (SSE) responses (i.e. ``text/event-stream`` content type), Envoy parses + // the stream and wraps each extracted complete event payload into a distinct text content item + // within the JSON-RPC ``content`` array on the fly. + // // Streaming flow: // // .. code-block:: text diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD index b7df6e51323bc..1ee868444d395 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/source/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -15,6 +15,7 @@ envoy_cc_library( hdrs = ["mcp_json_rest_bridge_filter.h"], deps = [ ":http_request_builder_lib", + ":sse_response_extractor_lib", "//envoy/http:filter_interface", "//envoy/server:filter_config_interface", "//source/common/common:logger_lib", @@ -51,3 +52,15 @@ envoy_cc_library( "@envoy_api//envoy/extensions/filters/http/mcp_json_rest_bridge/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "sse_response_extractor_lib", + srcs = ["sse_response_extractor.cc"], + hdrs = ["sse_response_extractor.h"], + deps = [ + "//source/common/http/sse:sse_parser_lib", + "@abseil-cpp//absl/status", + "@abseil-cpp//absl/status:statusor", + "@abseil-cpp//absl/strings", + ], +) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 3f1234f3cd579..7f2be1850472d 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -12,6 +12,7 @@ #include "source/common/protobuf/utility.h" #include "source/extensions/filters/common/mcp/constants.h" #include "source/extensions/filters/http/mcp_json_rest_bridge/http_request_builder.h" +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" #include "absl/base/no_destructor.h" #include "absl/container/flat_hash_set.h" @@ -35,6 +36,13 @@ namespace McpConstants = Envoy::Extensions::Filters::Common::Mcp::McpConstants; constexpr uint32_t DEFAULT_MAX_REQUEST_BODY_SIZE = 1024 * 64; // 64KB constexpr uint32_t DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024 * 1024; // 1MB +// Check if content type is text/event-stream, ignoring parameters like charset. +// HTTP Content-Type is case-insensitive. +bool isSseContentType(absl::string_view content_type) { + absl::string_view normalized = StringUtil::trim(StringUtil::cropRight(content_type, ";")); + return absl::EqualsIgnoreCase(normalized, Http::Headers::get().ContentTypeValues.TextEventStream); +} + bool isMcpProtocolVersionSupported(absl::string_view protocol_version) { static const absl::NoDestructor> supported_mcp_versions({ McpConstants::LATEST_SUPPORTED_MCP_VERSION, @@ -276,6 +284,7 @@ McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers // (final size is unknown), and let the headers flow through immediately so // the client can start receiving data without waiting for the full body. if (mcp_operation_ == McpOperation::ToolsCall && text_content_streaming_enabled_) { + is_sse_response_ = isSseContentType(response_headers.getContentTypeValue()); buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= static_cast(Http::Code::BadRequest)); response_headers.removeContentLength(); @@ -304,40 +313,7 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without // buffering the full response body. if (!streaming_json_prefix_.empty()) { - uint64_t len = data.length(); - // Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream - // is false on the last data frame). It is a no-op here; the suffix will be appended in - // encodeTrailers. - absl::string_view chunk(static_cast(data.linearize(len)), len); - // TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if - // it needs buffering. - std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); - - data.drain(len); - // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted - // in the streaming fast-path due to the stateless nature of chunk processing (which lacks - // a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits). - // If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is, - // which may cause the client to fail parsing the final JSON. - if (is_first_streaming_chunk_) { - ENVOY_STREAM_LOG(debug, - "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, escaped_chunk.size()); - data.add(streaming_json_prefix_); - is_first_streaming_chunk_ = false; - } else { - ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", - *encoder_callbacks_, len, escaped_chunk.size()); - } - data.add(escaped_chunk); - // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. - // This is body + trailer case where encodeTrailer call represents the end of response body. - // In that case, we should add the streaming_json_suffix at encodeTrailer call. - if (end_stream) { - ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); - data.add(streaming_json_suffix_); - } - return Http::FilterDataStatus::Continue; + return encodeStreamingData(data, end_stream); } const uint32_t max_response_body_size = config_->maxResponseBodySize(); @@ -365,7 +341,7 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat if (!end_stream) { return Http::FilterDataStatus::StopIterationNoBuffer; } - + // TODO(guoyilin42): Add SSE response support for non-streaming path using the buffered body. encodeJsonRpcData(encoder_callbacks_->responseHeaders()); data.add(response_body_str_); response_body_str_.clear(); @@ -382,6 +358,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons } void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { + if (is_sse_response_) { + json ref = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, *session_id_}, + {McpConstants::RESULT_FIELD, + { + {McpConstants::CONTENT_FIELD, json::array()}, + {McpConstants::IS_ERROR_FIELD, is_error}, + }}, + }; + std::string ref_json = ref.dump(); + std::string marker = absl::StrCat("\"", McpConstants::CONTENT_FIELD, "\":[]"); + size_t pos = ref_json.find(marker); + if (pos == std::string::npos) { + IS_ENVOY_BUG("JSON-RPC streaming marker not found in serialized envelope"); + return; + } + streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); + streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); + return; + } + // Build a reference JSON-RPC envelope with an empty text placeholder. json ref = { {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, @@ -700,6 +698,94 @@ void McpJsonRestBridgeFilter::setDynamicMetadata(absl::string_view method, *decoder_callbacks_, metadata.DebugString()); } +Http::FilterDataStatus McpJsonRestBridgeFilter::encodeStreamingData(Buffer::Instance& data, + bool end_stream) { + uint64_t len = data.length(); + // Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream + // is false on the last data frame). It is a no-op here; the suffix will be appended in + // encodeTrailers. + absl::string_view chunk(static_cast(data.linearize(len)), len); + + absl::StatusOr payload = prepareStreamingPayload(chunk, end_stream); + if (!payload.ok()) { + ENVOY_STREAM_LOG(error, "Streaming payload preparation error: {}", *encoder_callbacks_, + payload.status()); + json error_json = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, session_id_.has_value() ? *session_id_ : json(nullptr)}, + {McpConstants::ERROR_FIELD, generateErrorJsonResponse(-32000, payload.status().message())}}; + encoder_callbacks_->sendLocalReply( + Http::Code::InternalServerError, error_json.dump(), + [](Http::ResponseHeaderMap& headers) { + headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + }, + Grpc::Status::WellKnownGrpcStatus::Internal, + "mcp_json_rest_bridge_filter_streaming_payload_preparation_error"); + return Http::FilterDataStatus::StopIterationNoBuffer; + } + std::string output_to_add = *std::move(payload); + + data.drain(len); + // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted + // in the streaming fast-path due to the stateless nature of chunk processing (which lacks + // a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits). + // If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is, + // which may cause the client to fail parsing the final JSON. + if (is_first_streaming_chunk_) { + ENVOY_STREAM_LOG(debug, + "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, output_to_add.size()); + data.add(streaming_json_prefix_); + is_first_streaming_chunk_ = false; + } else { + ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, output_to_add.size()); + } + + data.add(output_to_add); + + // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. + // This is body + trailer case where encodeTrailer call represents the end of response body. + // In that case, we should add the streaming_json_suffix at encodeTrailer call. + if (end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); + data.add(streaming_json_suffix_); + } + return Http::FilterDataStatus::Continue; +} + +absl::StatusOr McpJsonRestBridgeFilter::processSseResponse(absl::string_view chunk, + bool end_stream) { + absl::StatusOr> event_payloads = + sse_response_extractor_.processChunk(chunk, end_stream); + if (!event_payloads.ok()) { + return event_payloads.status(); + } + std::string output; + for (const auto& event_payload : *event_payloads) { + std::string escaped_payload = + JsonEscaper::escapeString(event_payload, JsonEscaper::extraSpace(event_payload)); + std::string serialized_item = + absl::StrCat("{\"", McpConstants::TYPE_FIELD, "\":\"", McpConstants::TEXT_FIELD, "\",\"", + McpConstants::TEXT_FIELD, "\":\"", escaped_payload, "\"}"); + if (!is_first_sse_event_) { + absl::StrAppend(&output, ","); + } else { + is_first_sse_event_ = false; + } + absl::StrAppend(&output, serialized_item); + } + return output; +} + +absl::StatusOr +McpJsonRestBridgeFilter::prepareStreamingPayload(absl::string_view chunk, bool end_stream) { + if (is_sse_response_) { + return processSseResponse(chunk, end_stream); + } + return JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); +} + absl::Status McpJsonRestBridgeFilter::validateJsonRpcIdAndMethod(const nlohmann::json& json_rpc) { absl::StatusOr session_id = getSessionId(json_rpc); if (session_id.ok()) { diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index ff7aee1420293..5bd1b8f45e5d0 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -12,6 +12,7 @@ #include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/extensions/filters/http/common/pass_through_filter.h" +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" #include "absl/container/flat_hash_map.h" #include "absl/status/statusor.h" @@ -73,7 +74,7 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, public Logger::Loggable { public: explicit McpJsonRestBridgeFilter(McpJsonRestBridgeFilterConfigSharedPtr config) - : config_(config) {} + : sse_response_extractor_(config->maxResponseBodySize()), config_(config) {} // Http::StreamDecoderFilter Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers, @@ -110,6 +111,15 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, // Builds streaming_json_prefix_ and streaming_json_suffix_ for the tools/call streaming path. void buildStreamingPrefixAndSuffix(bool is_error); + // Encodes incoming data chunks for streaming MCP tool calls. + Http::FilterDataStatus encodeStreamingData(Buffer::Instance& data, bool end_stream); + + // Processes an SSE response chunk and returns the serialized JSON event payloads. + absl::StatusOr processSseResponse(absl::string_view chunk, bool end_stream); + + // Prepares the escaped/formatted payload string for streaming. + absl::StatusOr prepareStreamingPayload(absl::string_view chunk, bool end_stream); + enum class McpOperation { Unspecified = 0, // Received the "/mcp" URL but has not parsed the request body yet. @@ -143,6 +153,11 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, std::string streaming_json_suffix_; bool is_first_streaming_chunk_ = true; + // Whether the response is SSE. + bool is_sse_response_ = false; + bool is_first_sse_event_ = true; + SseResponseExtractor sse_response_extractor_; + McpJsonRestBridgeFilterConfigSharedPtr config_; }; diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc new file mode 100644 index 0000000000000..dab4bbd7aa8a3 --- /dev/null +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.cc @@ -0,0 +1,59 @@ +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" + +#include +#include +#include + +#include "source/common/http/sse/sse_parser.h" + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { + +absl::StatusOr> SseResponseExtractor::processChunk(absl::string_view chunk, + bool end_stream) { + if (max_response_body_size_ > 0 && (buffer_.size() + chunk.size()) > max_response_body_size_) { + return absl::InvalidArgumentError("Response body limit exceeded"); + } + std::vector event_payloads; + buffer_.append(chunk.data(), chunk.size()); + + absl::string_view buffer_view = buffer_; + const uint64_t length = buffer_.size(); + + while (!buffer_view.empty()) { + // Safely handles chunk boundaries and all line-ending formats + Http::Sse::SseParser::FindEventEndResult result = + Http::Sse::SseParser::findEventEnd(buffer_view, end_stream); + + // npos means the event hasn't reached a double blank line yet + if (result.event_start == absl::string_view::npos) { + break; + } + + absl::string_view event_str = + buffer_view.substr(result.event_start, result.event_end - result.event_start); + + Http::Sse::SseParser::ParsedEvent event = Http::Sse::SseParser::parseEvent(event_str); + + if (event.data.has_value()) { + event_payloads.push_back(*std::move(event.data)); + } + + buffer_view = buffer_view.substr(result.next_start); + } + + buffer_.erase(0, length - buffer_view.size()); + + return event_payloads; +} + +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h new file mode 100644 index 0000000000000..8c5c788e81ab9 --- /dev/null +++ b/source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { + +class SseResponseExtractor { +public: + explicit SseResponseExtractor(uint64_t max_response_body_size = 0) + : max_response_body_size_(max_response_body_size) {} + + // Processes an incoming chunk of an SSE stream and returns any completed event payloads. + // Set end_stream to true if this is the final chunk of the response. + absl::StatusOr> processChunk(absl::string_view chunk, bool end_stream); + +private: + std::string buffer_; + const uint64_t max_response_body_size_; +}; + +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD index 55ecbd245c5a7..6f5d6bff2a0fe 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/BUILD +++ b/test/extensions/filters/http/mcp_json_rest_bridge/BUILD @@ -51,3 +51,13 @@ envoy_cc_test( "@nlohmann_json//:json", ], ) + +envoy_cc_test( + name = "sse_response_extractor_test", + srcs = ["sse_response_extractor_test.cc"], + deps = [ + "//source/extensions/filters/http/mcp_json_rest_bridge:sse_response_extractor_lib", + "//test/test_common:status_utility_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc index 27e416ac2190a..f68a19fefb5dc 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc @@ -1395,6 +1395,66 @@ TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) { R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json")); } +TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseStreaming) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-type", "text/event-stream"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + + Buffer::OwnedImpl chunk1("data: {\"a\": 1}\n\n"); + EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue); + EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":123,")); + + Buffer::OwnedImpl chunk2("data: {\"b\": 2}\n\n"); + EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_THAT(chunk2.toString(), testing::EndsWith("}}")); + + const std::string full = chunk1.toString() + chunk2.toString(); + EXPECT_EQ( + nlohmann::json::parse(full), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"{\"a\": 1}","type":"text"},{"text":"{\"b\": 2}","type":"text"}],"isError":false}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseExceedsMaxResponseBodySize) { + envoy::extensions::filters::http::mcp_json_rest_bridge::v3::McpJsonRestBridge proto_config; + TestUtility::loadFromYaml(R"yaml( + tool_config: + tools: + - name: "get_api_key" + http_rule: + get: "/v1/apiKeys" + text_content_streaming_enabled: true + max_response_body_size: 10 + )yaml", + proto_config); + config_ = std::make_shared(proto_config); + filter_ = std::make_unique(config_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); + + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-type", "text/event-stream"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + EXPECT_CALL( + encoder_callbacks_, + sendLocalReply( + Eq(Http::Code::InternalServerError), + StrEq( + R"json({"error":{"code":-32000,"message":"Response body limit exceeded"},"id":123,"jsonrpc":"2.0"})json"), + _, Eq(Grpc::Status::WellKnownGrpcStatus::Internal), + StrEq("mcp_json_rest_bridge_filter_streaming_payload_preparation_error"))); + + Buffer::OwnedImpl chunk("data: too long payload\n\n"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/false), + Http::FilterDataStatus::StopIterationNoBuffer); +} + class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override { diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc index 788c52e1b32b5..81b09bfb78af5 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc @@ -630,6 +630,91 @@ TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) { EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); } +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallSseStreamingTranscoding) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 321, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers().getMethodValue(), StrEq("POST")); + EXPECT_THAT(upstream_request_->headers().getPathValue(), StrEq("/v1/projects/foo/keys")); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(200); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.TextEventStream); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl chunk1; + chunk1.add("data: {\"a\": 1}\n\n"); + upstream_request_->encodeData(chunk1, false); + + Buffer::OwnedImpl chunk2; + chunk2.add("data: {\"b\": 2}\n\n"); + upstream_request_->encodeData(chunk2, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("200")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 321, + "result": { + "content": [ + { + "type": "text", + "text": "{\"a\": 1}" + }, + { + "type": "text", + "text": "{\"b\": 2}" + } + ], + "isError": false + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) { const std::string config = R"EOF( name: envoy.filters.http.mcp_json_rest_bridge diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc new file mode 100644 index 0000000000000..e1db6d15038d3 --- /dev/null +++ b/test/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor_test.cc @@ -0,0 +1,104 @@ +#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h" + +#include "test/test_common/status_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace McpJsonRestBridge { +namespace { + +using ::Envoy::StatusHelpers::StatusIs; +using ::testing::ElementsAre; +using ::testing::IsEmpty; + +class SseResponseExtractorTest : public testing::Test { +protected: + SseResponseExtractor extractor_; +}; + +TEST_F(SseResponseExtractorTest, ProcessSingleCompleteEvent) { + EXPECT_THAT(*extractor_.processChunk("data: hello world\n\n", /*end_stream=*/false), + ElementsAre("hello world")); +} + +TEST_F(SseResponseExtractorTest, ProcessMultipleEvents) { + EXPECT_THAT( + *extractor_.processChunk("data: first event\n\ndata: second event\n\n", /*end_stream=*/false), + ElementsAre("first event", "second event")); +} + +TEST_F(SseResponseExtractorTest, ProcessIncompleteEvent) { + // First chunk has incomplete event, should return nothing. + EXPECT_THAT(*extractor_.processChunk("data: first", /*end_stream=*/false), IsEmpty()); + + // Second chunk completes the event. + EXPECT_THAT(*extractor_.processChunk(" event\n\n", /*end_stream=*/false), + ElementsAre("first event")); +} + +TEST_F(SseResponseExtractorTest, ProcessMultipleChunks) { + EXPECT_THAT(*extractor_.processChunk("da", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("ta: hello", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk(" world\n", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("\n", /*end_stream=*/false), ElementsAre("hello world")); +} + +TEST_F(SseResponseExtractorTest, ProcessCommentsOnly) { + EXPECT_THAT(*extractor_.processChunk(": this is a comment\n\n", /*end_stream=*/false), IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessNoDataEvent) { + EXPECT_THAT(*extractor_.processChunk("event: ping\nid: 123\n\n", /*end_stream=*/false), + IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessCRLFLineEndings) { + EXPECT_THAT(*extractor_.processChunk("data: crlf test\r\n\r\n", /*end_stream=*/false), + ElementsAre("crlf test")); +} + +TEST_F(SseResponseExtractorTest, ProcessEndStreamWithIncompleteEvent) { + // Without end_stream, incomplete event is buffered. + EXPECT_THAT(*extractor_.processChunk("data: last event", /*end_stream=*/false), IsEmpty()); + EXPECT_THAT(*extractor_.processChunk("", /*end_stream=*/true), IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessMultilineData) { + EXPECT_THAT(*extractor_.processChunk("data: line one\ndata: line two\n\n", /*end_stream=*/false), + ElementsAre("line one\nline two")); +} + +TEST_F(SseResponseExtractorTest, ProcessJSONPayload) { + EXPECT_THAT(*extractor_.processChunk("data: {\"foo\": \"bar\"}\n\n", /*end_stream=*/false), + ElementsAre("{\"foo\": \"bar\"}")); +} + +TEST_F(SseResponseExtractorTest, ProcessEmptyChunk) { + EXPECT_THAT(*extractor_.processChunk("", /*end_stream=*/false), IsEmpty()); +} + +TEST_F(SseResponseExtractorTest, ProcessMixedEvents) { + EXPECT_THAT(*extractor_.processChunk("event: ping\n\ndata: hello\n\n: comment\n\ndata: world\n\n", + /*end_stream=*/false), + ElementsAre("hello", "world")); +} + +TEST(SseResponseExtractorLimitTest, EnforcesLimit) { + SseResponseExtractor limited_extractor(10); + // First chunk is fine (9 bytes) + EXPECT_THAT(*limited_extractor.processChunk("data: foo", /*end_stream=*/false), IsEmpty()); + + // Second chunk exceeds the limit (9 + 2 = 11 > 10) + EXPECT_THAT(limited_extractor.processChunk("\n\n", /*end_stream=*/false), + StatusIs(absl::StatusCode::kInvalidArgument)); +} + +} // namespace +} // namespace McpJsonRestBridge +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy