Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ message ToolConfig {
// When enabled, the response body is streamed directly to the client without buffering. Each
// chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix.
//
// For Server-Sent Events (SSE) responses (i.e. ``text/event-stream`` content type), Envoy parses
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to support SSE case when text_content_streaming_enabled: False in the follow-up change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought our plan is to support SSE in non-buffering case. I could add the buffering support for SSE in this PR if that's needed.

// the stream and wraps each extracted complete event payload into a distinct text content item
// within the JSON-RPC ``content`` array on the fly.
//
// Streaming flow:
//
// .. code-block:: text
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/http/mcp_json_rest_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_library(
hdrs = ["mcp_json_rest_bridge_filter.h"],
deps = [
":http_request_builder_lib",
":sse_response_extractor_lib",
"//envoy/http:filter_interface",
"//envoy/server:filter_config_interface",
"//source/common/common:logger_lib",
Expand Down Expand Up @@ -51,3 +52,15 @@ envoy_cc_library(
"@envoy_api//envoy/extensions/filters/http/mcp_json_rest_bridge/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "sse_response_extractor_lib",
srcs = ["sse_response_extractor.cc"],
hdrs = ["sse_response_extractor.h"],
deps = [
"//source/common/http/sse:sse_parser_lib",
"@abseil-cpp//absl/status",
"@abseil-cpp//absl/status:statusor",
"@abseil-cpp//absl/strings",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "source/common/protobuf/utility.h"
#include "source/extensions/filters/common/mcp/constants.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/http_request_builder.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include "absl/base/no_destructor.h"
#include "absl/container/flat_hash_set.h"
Expand All @@ -35,6 +36,13 @@ namespace McpConstants = Envoy::Extensions::Filters::Common::Mcp::McpConstants;
constexpr uint32_t DEFAULT_MAX_REQUEST_BODY_SIZE = 1024 * 64; // 64KB
constexpr uint32_t DEFAULT_MAX_RESPONSE_BODY_SIZE = 1024 * 1024; // 1MB

// Check if content type is text/event-stream, ignoring parameters like charset.
// HTTP Content-Type is case-insensitive.
bool isSseContentType(absl::string_view content_type) {
absl::string_view normalized = StringUtil::trim(StringUtil::cropRight(content_type, ";"));
return absl::EqualsIgnoreCase(normalized, Http::Headers::get().ContentTypeValues.TextEventStream);
}

bool isMcpProtocolVersionSupported(absl::string_view protocol_version) {
static const absl::NoDestructor<absl::flat_hash_set<absl::string_view>> supported_mcp_versions({
McpConstants::LATEST_SUPPORTED_MCP_VERSION,
Expand Down Expand Up @@ -276,6 +284,7 @@ McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers
// (final size is unknown), and let the headers flow through immediately so
// the client can start receiving data without waiting for the full body.
if (mcp_operation_ == McpOperation::ToolsCall && text_content_streaming_enabled_) {
is_sse_response_ = isSseContentType(response_headers.getContentTypeValue());
buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >=
static_cast<int>(Http::Code::BadRequest));
response_headers.removeContentLength();
Expand Down Expand Up @@ -304,40 +313,7 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat
// Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without
// buffering the full response body.
if (!streaming_json_prefix_.empty()) {
uint64_t len = data.length();
// Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream
// is false on the last data frame). It is a no-op here; the suffix will be appended in
// encodeTrailers.
absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len);
// TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if
// it needs buffering.
std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk));

data.drain(len);
// Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted
// in the streaming fast-path due to the stateless nature of chunk processing (which lacks
// a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits).
// If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is,
// which may cause the client to fail parsing the final JSON.
if (is_first_streaming_chunk_) {
ENVOY_STREAM_LOG(debug,
"Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
data.add(streaming_json_prefix_);
is_first_streaming_chunk_ = false;
} else {
ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
}
data.add(escaped_chunk);
// TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call.
// This is body + trailer case where encodeTrailer call represents the end of response body.
// In that case, we should add the streaming_json_suffix at encodeTrailer call.
if (end_stream) {
ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_);
data.add(streaming_json_suffix_);
}
return Http::FilterDataStatus::Continue;
return encodeStreamingData(data, end_stream);
}

const uint32_t max_response_body_size = config_->maxResponseBodySize();
Expand Down Expand Up @@ -382,6 +358,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons
}

void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) {
if (is_sse_response_) {
json ref = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
{McpConstants::ID_FIELD, *session_id_},
{McpConstants::RESULT_FIELD,
{
{McpConstants::CONTENT_FIELD, json::array()},
{McpConstants::IS_ERROR_FIELD, is_error},
}},
};
std::string ref_json = ref.dump();
std::string marker = absl::StrCat("\"", McpConstants::CONTENT_FIELD, "\":[]");
size_t pos = ref_json.find(marker);
if (pos == std::string::npos) {
IS_ENVOY_BUG("JSON-RPC streaming marker not found in serialized envelope");
return;
}
streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1);
streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1);
return;
}

// Build a reference JSON-RPC envelope with an empty text placeholder.
json ref = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
Expand Down Expand Up @@ -700,6 +698,94 @@ void McpJsonRestBridgeFilter::setDynamicMetadata(absl::string_view method,
*decoder_callbacks_, metadata.DebugString());
}

Http::FilterDataStatus McpJsonRestBridgeFilter::encodeStreamingData(Buffer::Instance& data,
bool end_stream) {
uint64_t len = data.length();
// Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream
// is false on the last data frame). It is a no-op here; the suffix will be appended in
// encodeTrailers.
absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len);

absl::StatusOr<std::string> payload = prepareStreamingPayload(chunk, end_stream);
if (!payload.ok()) {
ENVOY_STREAM_LOG(error, "Streaming payload preparation error: {}", *encoder_callbacks_,
payload.status());
json error_json = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
{McpConstants::ID_FIELD, session_id_.has_value() ? *session_id_ : json(nullptr)},
{McpConstants::ERROR_FIELD, generateErrorJsonResponse(-32000, payload.status().message())}};
encoder_callbacks_->sendLocalReply(
Http::Code::InternalServerError, error_json.dump(),
[](Http::ResponseHeaderMap& headers) {
headers.setContentType(Http::Headers::get().ContentTypeValues.Json);
},
Grpc::Status::WellKnownGrpcStatus::Internal,
"mcp_json_rest_bridge_filter_streaming_payload_preparation_error");
return Http::FilterDataStatus::StopIterationNoBuffer;
}
std::string output_to_add = *std::move(payload);

data.drain(len);
// Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted
// in the streaming fast-path due to the stateless nature of chunk processing (which lacks
// a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits).
// If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is,
// which may cause the client to fail parsing the final JSON.
if (is_first_streaming_chunk_) {
ENVOY_STREAM_LOG(debug,
"Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, output_to_add.size());
data.add(streaming_json_prefix_);
is_first_streaming_chunk_ = false;
} else {
ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, output_to_add.size());
}

data.add(output_to_add);

// TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call.
// This is body + trailer case where encodeTrailer call represents the end of response body.
// In that case, we should add the streaming_json_suffix at encodeTrailer call.
if (end_stream) {
ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_);
data.add(streaming_json_suffix_);
}
return Http::FilterDataStatus::Continue;
}

absl::StatusOr<std::string> McpJsonRestBridgeFilter::processSseResponse(absl::string_view chunk,
bool end_stream) {
absl::StatusOr<std::vector<std::string>> event_payloads =
sse_response_extractor_.processChunk(chunk, end_stream);
if (!event_payloads.ok()) {
return event_payloads.status();
}
std::string output;
for (const auto& event_payload : *event_payloads) {
std::string escaped_payload =
JsonEscaper::escapeString(event_payload, JsonEscaper::extraSpace(event_payload));
std::string serialized_item =
absl::StrCat("{\"", McpConstants::TYPE_FIELD, "\":\"", McpConstants::TEXT_FIELD, "\",\"",
McpConstants::TEXT_FIELD, "\":\"", escaped_payload, "\"}");
if (!is_first_sse_event_) {
absl::StrAppend(&output, ",");
} else {
is_first_sse_event_ = false;
}
absl::StrAppend(&output, serialized_item);
}
return output;
}

absl::StatusOr<std::string>
McpJsonRestBridgeFilter::prepareStreamingPayload(absl::string_view chunk, bool end_stream) {
if (is_sse_response_) {
return processSseResponse(chunk, end_stream);
}
return JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk));
}

absl::Status McpJsonRestBridgeFilter::validateJsonRpcIdAndMethod(const nlohmann::json& json_rpc) {
absl::StatusOr<nlohmann::json> session_id = getSessionId(json_rpc);
if (session_id.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include "absl/container/flat_hash_map.h"
#include "absl/status/statusor.h"
Expand Down Expand Up @@ -73,7 +74,7 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
public Logger::Loggable<Logger::Id::filter> {
public:
explicit McpJsonRestBridgeFilter(McpJsonRestBridgeFilterConfigSharedPtr config)
: config_(config) {}
: sse_response_extractor_(config->maxResponseBodySize()), config_(config) {}

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
Expand Down Expand Up @@ -110,6 +111,15 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
// Builds streaming_json_prefix_ and streaming_json_suffix_ for the tools/call streaming path.
void buildStreamingPrefixAndSuffix(bool is_error);

// Encodes incoming data chunks for streaming MCP tool calls.
Http::FilterDataStatus encodeStreamingData(Buffer::Instance& data, bool end_stream);

// Processes an SSE response chunk and returns the serialized JSON event payloads.
absl::StatusOr<std::string> processSseResponse(absl::string_view chunk, bool end_stream);

// Prepares the escaped/formatted payload string for streaming.
absl::StatusOr<std::string> prepareStreamingPayload(absl::string_view chunk, bool end_stream);

enum class McpOperation {
Unspecified = 0,
// Received the "/mcp" URL but has not parsed the request body yet.
Expand Down Expand Up @@ -143,6 +153,11 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
std::string streaming_json_suffix_;
bool is_first_streaming_chunk_ = true;

// Whether the response is SSE.
bool is_sse_response_ = false;
bool is_first_sse_event_ = true;
SseResponseExtractor sse_response_extractor_;

McpJsonRestBridgeFilterConfigSharedPtr config_;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "source/extensions/filters/http/mcp_json_rest_bridge/sse_response_extractor.h"

#include <string>
#include <utility>
#include <vector>

#include "source/common/http/sse/sse_parser.h"

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace McpJsonRestBridge {

absl::StatusOr<std::vector<std::string>> SseResponseExtractor::processChunk(absl::string_view chunk,
bool end_stream) {
if (max_response_body_size_ > 0 && (buffer_.size() + chunk.size()) > max_response_body_size_) {
return absl::InvalidArgumentError("Response body limit exceeded");
}
std::vector<std::string> event_payloads;
buffer_.append(chunk.data(), chunk.size());
Comment thread
guoyilin42 marked this conversation as resolved.

absl::string_view buffer_view = buffer_;
const uint64_t length = buffer_.size();

while (!buffer_view.empty()) {
// Safely handles chunk boundaries and all line-ending formats
Http::Sse::SseParser::FindEventEndResult result =
Http::Sse::SseParser::findEventEnd(buffer_view, end_stream);

// npos means the event hasn't reached a double blank line yet
if (result.event_start == absl::string_view::npos) {
break;
}

absl::string_view event_str =
buffer_view.substr(result.event_start, result.event_end - result.event_start);

Http::Sse::SseParser::ParsedEvent event = Http::Sse::SseParser::parseEvent(event_str);

if (event.data.has_value()) {
event_payloads.push_back(*std::move(event.data));
}

buffer_view = buffer_view.substr(result.next_start);
}

buffer_.erase(0, length - buffer_view.size());

return event_payloads;
}

} // namespace McpJsonRestBridge
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <string>
#include <vector>

#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace McpJsonRestBridge {

class SseResponseExtractor {
public:
explicit SseResponseExtractor(uint64_t max_response_body_size = 0)
: max_response_body_size_(max_response_body_size) {}

// Processes an incoming chunk of an SSE stream and returns any completed event payloads.
// Set end_stream to true if this is the final chunk of the response.
absl::StatusOr<std::vector<std::string>> processChunk(absl::string_view chunk, bool end_stream);

private:
std::string buffer_;
const uint64_t max_response_body_size_;
};

} // namespace McpJsonRestBridge
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
10 changes: 10 additions & 0 deletions test/extensions/filters/http/mcp_json_rest_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ envoy_cc_test(
"@nlohmann_json//:json",
],
)

envoy_cc_test(
name = "sse_response_extractor_test",
srcs = ["sse_response_extractor_test.cc"],
deps = [
"//source/extensions/filters/http/mcp_json_rest_bridge:sse_response_extractor_lib",
"//test/test_common:status_utility_lib",
"//test/test_common:utility_lib",
],
)
Loading
Loading