Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ message ToolConfig {
// When enabled, the response body is streamed directly to the client without buffering. Each
// chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix.
//
// For Server-Sent Events (SSE) responses (i.e. ``text/event-stream`` content type), Envoy parses
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to support SSE case when text_content_streaming_enabled: False in the follow-up change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought our plan is to support SSE in non-buffering case. I could add the buffering support for SSE in this PR if that's needed.

// the stream and wraps each extracted complete event payload into a distinct text content item
// within the JSON-RPC ``content`` array on the fly.
//
// Streaming flow:
//
// .. code-block:: text
Expand Down
11 changes: 11 additions & 0 deletions source/extensions/filters/http/mcp_json_rest_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_library(
hdrs = ["mcp_json_rest_bridge_filter.h"],
deps = [
":http_request_builder_lib",
":sse_response_extractor_lib",
"//envoy/http:filter_interface",
"//envoy/server:filter_config_interface",
"//source/common/common:logger_lib",
Expand Down Expand Up @@ -51,3 +52,13 @@ envoy_cc_library(
"@envoy_api//envoy/extensions/filters/http/mcp_json_rest_bridge/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "sse_response_extractor_lib",
srcs = ["sse_response_extractor.cc"],
hdrs = ["sse_response_extractor.h"],
deps = [
"//source/common/http/sse:sse_parser_lib",
"@abseil-cpp//absl/strings",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "source/common/protobuf/utility.h"
#include "source/extensions/filters/common/mcp/constants.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/http_request_builder.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include "absl/base/no_destructor.h"
#include "absl/container/flat_hash_set.h"
Expand All @@ -35,6 +36,13 @@ namespace McpConstants = Envoy::Extensions::Filters::Common::Mcp::McpConstants;
constexpr uint32_t DEFAULT_MAX_REQUEST_BODY_SIZE = 1024 * 64; // 64KB
constexpr uint32_t DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024 * 1024; // 1MB

// Check if content type is text/event-stream, ignoring parameters like charset.
// HTTP Content-Type is case-insensitive.
bool isSseContentType(absl::string_view content_type) {
absl::string_view normalized = StringUtil::trim(StringUtil::cropRight(content_type, ";"));
return absl::EqualsIgnoreCase(normalized, Http::Headers::get().ContentTypeValues.TextEventStream);
}

bool isMcpProtocolVersionSupported(absl::string_view protocol_version) {
static const absl::NoDestructor<absl::flat_hash_set<absl::string_view>> supported_mcp_versions({
McpConstants::LATEST_SUPPORTED_MCP_VERSION,
Expand Down Expand Up @@ -276,6 +284,7 @@ McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers
// (final size is unknown), and let the headers flow through immediately so
// the client can start receiving data without waiting for the full body.
if (mcp_operation_ == McpOperation::ToolsCall && text_content_streaming_enabled_) {
is_sse_response_ = isSseContentType(response_headers.getContentTypeValue());
buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >=
static_cast<int>(Http::Code::BadRequest));
response_headers.removeContentLength();
Expand Down Expand Up @@ -309,9 +318,26 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat
// is false on the last data frame). It is a no-op here; the suffix will be appended in
// encodeTrailers.
absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len);
// TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if
// it needs buffering.
std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk));

std::string output_to_add;
if (is_sse_response_) {
Comment thread
guoyilin42 marked this conversation as resolved.
Outdated
std::vector<std::string> event_payloads =
sse_response_extractor_.processChunk(chunk, end_stream);
for (const auto& event_payload : event_payloads) {
json content_item;
content_item[McpConstants::TYPE_FIELD] = McpConstants::TEXT_FIELD;
content_item[McpConstants::TEXT_FIELD] = event_payload;
std::string serialized_item = content_item.dump();
Comment thread
guoyilin42 marked this conversation as resolved.
Outdated
if (!is_first_sse_event_) {
absl::StrAppend(&output_to_add, ",");
} else {
is_first_sse_event_ = false;
}
absl::StrAppend(&output_to_add, serialized_item);
}
} else {
output_to_add = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk));
}

data.drain(len);
// Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted
Expand All @@ -322,14 +348,16 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat
if (is_first_streaming_chunk_) {
ENVOY_STREAM_LOG(debug,
"Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
*encoder_callbacks_, len, output_to_add.size());
data.add(streaming_json_prefix_);
is_first_streaming_chunk_ = false;
} else {
ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
*encoder_callbacks_, len, output_to_add.size());
}
data.add(escaped_chunk);

data.add(output_to_add);

// TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call.
// This is body + trailer case where encodeTrailer call represents the end of response body.
// In that case, we should add the streaming_json_suffix at encodeTrailer call.
Expand Down Expand Up @@ -382,6 +410,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons
}

void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) {
if (is_sse_response_) {
json ref = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
{McpConstants::ID_FIELD, *session_id_},
{McpConstants::RESULT_FIELD,
{
{McpConstants::CONTENT_FIELD, json::array()},
{McpConstants::IS_ERROR_FIELD, is_error},
}},
};
std::string ref_json = ref.dump();
std::string marker = absl::StrCat("\"", McpConstants::CONTENT_FIELD, "\":[]");
size_t pos = ref_json.find(marker);
if (pos == std::string::npos) {
IS_ENVOY_BUG("JSON-RPC streaming marker not found in serialized envelope");
return;
}
streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1);
streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1);
return;
}

// Build a reference JSON-RPC envelope with an empty text placeholder.
json ref = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include "absl/container/flat_hash_map.h"
#include "absl/status/statusor.h"
Expand Down Expand Up @@ -143,6 +144,11 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
std::string streaming_json_suffix_;
bool is_first_streaming_chunk_ = true;

// Whether the response is SSE.
bool is_sse_response_ = false;
bool is_first_sse_event_ = true;
SseResponseExtractor sse_response_extractor_;

McpJsonRestBridgeFilterConfigSharedPtr config_;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include <string>
#include <utility>
#include <vector>

#include "source/common/http/sse/sse_parser.h"

#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace McpJsonRestBridge {

std::vector<std::string> SseResponseExtractor::processChunk(absl::string_view chunk,
bool end_stream) {
std::vector<std::string> event_payloads;
buffer_.append(chunk.data(), chunk.size());
Comment thread
guoyilin42 marked this conversation as resolved.

absl::string_view buffer_view = buffer_;
const uint64_t length = buffer_.size();

while (!buffer_view.empty()) {
// Safely handles chunk boundaries and all line-ending formats
auto result = Http::Sse::SseParser::findEventEnd(buffer_view, end_stream);
Comment thread
guoyilin42 marked this conversation as resolved.
Outdated

// npos means the event hasn't reached a double blank line yet
if (result.event_start == absl::string_view::npos) {
break;
}

absl::string_view event_str =
buffer_view.substr(result.event_start, result.event_end - result.event_start);

Http::Sse::SseParser::ParsedEvent event = Http::Sse::SseParser::parseEvent(event_str);

if (event.data.has_value()) {
event_payloads.push_back(*std::move(event.data));
}

buffer_view = buffer_view.substr(result.next_start);
}

buffer_.erase(0, length - buffer_view.size());

return event_payloads;
}

} // namespace McpJsonRestBridge
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <string>
#include <vector>

#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace McpJsonRestBridge {

class SseResponseExtractor {
public:
SseResponseExtractor() = default;

// Processes an incoming chunk of an SSE stream and returns any completed event payloads.
// Set end_stream to true if this is the final chunk of the response.
std::vector<std::string> processChunk(absl::string_view chunk, bool end_stream = false);
Comment thread
guoyilin42 marked this conversation as resolved.
Outdated

private:
std::string buffer_;
};

} // namespace McpJsonRestBridge
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
9 changes: 9 additions & 0 deletions test/extensions/filters/http/mcp_json_rest_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ envoy_cc_test(
"@nlohmann_json//:json",
],
)

envoy_cc_test(
name = "sse_response_extractor_test",
srcs = ["sse_response_extractor_test.cc"],
deps = [
"//source/extensions/filters/http/mcp_json_rest_bridge:sse_response_extractor_lib",
"//test/test_common:utility_lib",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,29 @@ TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) {
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json"));
}

TEST_F(McpJsonRestBridgeStreamingFilterTest, SseResponseStreaming) {
sendToolsCallRequest();

response_headers_ = {{":status", "200"}, {"content-type", "text/event-stream"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);
EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json"));

Buffer::OwnedImpl chunk1("data: {\"a\": 1}\n\n");
EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue);
EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":123,"));

Buffer::OwnedImpl chunk2("data: {\"b\": 2}\n\n");
EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/true), Http::FilterDataStatus::Continue);
EXPECT_THAT(chunk2.toString(), testing::EndsWith("}}"));

const std::string full = chunk1.toString() + chunk2.toString();
EXPECT_EQ(
nlohmann::json::parse(full),
nlohmann::json::parse(
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"{\"a\": 1}","type":"text"},{"text":"{\"b\": 2}","type":"text"}],"isError":false}})json"));
}

class McpHttpMethodFilterTest : public testing::TestWithParam<std::string> {
public:
void SetUp() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,91 @@ TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) {
EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response));
}

TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallSseStreamingTranscoding) {
const std::string config = R"EOF(
name: envoy.filters.http.mcp_json_rest_bridge
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge
tool_config:
tools:
- name: "create_api_key"
http_rule:
post: "/v1/{parent=projects/*}/keys"
body: "key"
text_content_streaming_enabled: true
)EOF";

initializeFilter(config);

codec_client_ = makeHttpConnection(lookupPort("http"));

const std::string request_body = R"({
"jsonrpc": "2.0",
"id": 321,
"method": "tools/call",
"params": {
"name": "create_api_key",
"arguments": {
"parent": "projects/foo",
"key": {
"displayName": "bar"
}
}
}
})";

auto response = codec_client_->makeRequestWithBody(
Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/mcp"},
{":scheme", "http"},
{":authority", "host"},
{"content-type", "application/json"}},
request_body);

waitForNextUpstreamRequest();
EXPECT_THAT(upstream_request_->headers().getMethodValue(), StrEq("POST"));
EXPECT_THAT(upstream_request_->headers().getPathValue(), StrEq("/v1/projects/foo/keys"));

Http::TestResponseHeaderMapImpl response_headers;
response_headers.setStatus(200);
response_headers.setContentType(Http::Headers::get().ContentTypeValues.TextEventStream);
upstream_request_->encodeHeaders(response_headers, false);

Buffer::OwnedImpl chunk1;
chunk1.add("data: {\"a\": 1}\n\n");
upstream_request_->encodeData(chunk1, false);

Buffer::OwnedImpl chunk2;
chunk2.add("data: {\"b\": 2}\n\n");
upstream_request_->encodeData(chunk2, true);

ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(upstream_request_->complete());

EXPECT_THAT(response->headers().getStatusValue(), StrEq("200"));
EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json"));
EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty());

const std::string expected_rpc_response = R"({
"jsonrpc": "2.0",
"id": 321,
"result": {
"content": [
{
"type": "text",
"text": "{\"a\": 1}"
},
{
"type": "text",
"text": "{\"b\": 2}"
}
],
"isError": false
}
})";
EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response));
}

TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) {
const std::string config = R"EOF(
name: envoy.filters.http.mcp_json_rest_bridge
Expand Down
Loading
Loading