Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ absl::Status ProcessorState::handleHeaderContinue() {
} else if (body_mode_ == ProcessingMode::STREAMED ||
body_mode_ == ProcessingMode::FULL_DUPLEX_STREAMED) {
sendBufferedDataInStreamedMode(false);
continueIfNecessary();
return absl::OkStatus();
} else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) {
return handleBufferedPartialMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,143 @@ TEST_P(ExtProcIntegrationTest,
EXPECT_THAT(response->headers(), ContainsHeader("x-new-header_1", "new_1"));
}

// With trailers, both directions, server buffers one chunks of body before sending back the
// response.
TEST_P(ExtProcIntegrationTest, TwoExtProcFiltersBothDuplexInBothDirectionWithTrailerRandom) {
twoExtProcFiltersFullDuplexConfig();

const std::string body_sent(10 * 1024, 's');
IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, false, false);
Http::TestRequestTrailerMapImpl request_trailers{{"x-request-trailer-foo", "yes"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);

// The ext_proc_server_0 receives the headers.
ProcessingRequest header_request;
serverReceiveHeaderReq(header_request);

// The ext_proc_server_0 receives one body chunk.
ProcessingRequest body_request;
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request));
EXPECT_TRUE(body_request.has_request_body());

// The ext_proc_server_0 sends back the header response.
serverSendHeaderResp();
// The ext_proc_server_0 sends back one body chunk.
ProcessingResponse response_body;
BodyResponse* body_resp;
body_resp = response_body.mutable_request_body();
auto* body_mut = body_resp->mutable_response()->mutable_body_mutation();
auto* streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("r");
processor_stream_->sendGrpcMessage(response_body);

// The ext_proc_server_0 receives the body and trailers.
uint32_t total_req_body_msg =
serverReceiveBodyDuplexStreamed("", processor_stream_, false, false);

// The ext_proc_server_0 sends back the body and trailers.
serverSendBodyRespDuplexStreamed(total_req_body_msg, processor_stream_, /*end_stream*/ false,
false, "");
serverSendTrailerRespDuplexStreamed(processor_stream_, false);

// The ext_proc_server_1 receives the headers.
server1ReceiveHeaderReq(header_request);

// The ext_proc_server_1 receives one body chunk.
EXPECT_TRUE(processor_stream_1_->waitForGrpcMessage(*dispatcher_, body_request));
EXPECT_TRUE(body_request.has_request_body());

// The ext_proc_server_1 sends back the header response.
server1SendHeaderResp();
// The ext_proc_server_1 sends back one body chunk.
response_body.Clear();
body_resp = response_body.mutable_request_body();
body_mut = body_resp->mutable_response()->mutable_body_mutation();
streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("r");
processor_stream_1_->sendGrpcMessage(response_body);

// The ext_proc_server_1 receives the body and trailers.
total_req_body_msg = serverReceiveBodyDuplexStreamed("", processor_stream_1_, false, false);

// The ext_proc_server_1 sends back the body and trailers.
const std::string body_upstream(total_req_body_msg + 1, 'r');
serverSendBodyRespDuplexStreamed(total_req_body_msg, processor_stream_1_, /*end_stream*/ false,
false, "");
serverSendTrailerRespDuplexStreamed(processor_stream_1_, false);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
for (int i = 0; i < 10; ++i) {
upstream_request_->encodeData(1024, false);
}
upstream_request_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"x-test-trailers", "Yes"}});

EXPECT_THAT(upstream_request_->headers(), ContainsHeader("x-new-header", "new"));
EXPECT_THAT(upstream_request_->headers(), ContainsHeader("x-new-header_1", "new_1"));
EXPECT_EQ(upstream_request_->body().toString(), body_upstream);

ASSERT_NE(upstream_request_->trailers(), nullptr);
EXPECT_THAT(*upstream_request_->trailers(), ContainsHeader("x-new-trailer", "new"));
EXPECT_THAT(*upstream_request_->trailers(), ContainsHeader("x-new-trailer_1", "new_1"));

// Now the response processing. In this direction, filter-1 sees the message first.
ProcessingRequest header_response;
server1ReceiveHeaderReq(header_response, false, true);
// The ext_proc_server_1 receives one body chunk.
EXPECT_TRUE(processor_stream_1_->waitForGrpcMessage(*dispatcher_, body_request));
EXPECT_TRUE(body_request.has_response_body());

// The ext_proc_server_1 sends back the header response.
server1SendHeaderResp(false, true);
// The ext_proc_server_1 sends back one body chunk.
response_body.Clear();
body_resp = response_body.mutable_response_body();
body_mut = body_resp->mutable_response()->mutable_body_mutation();
streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("n");
processor_stream_1_->sendGrpcMessage(response_body);

(void)serverReceiveBodyDuplexStreamed("", processor_stream_1_, true, false);

uint32_t total_resp_body_msg = 10;
serverSendBodyRespDuplexStreamed(total_resp_body_msg, processor_stream_1_, /*end_stream*/ false,
/*response*/ true, "m");
serverSendTrailerRespDuplexStreamed(processor_stream_1_, true);

// Now the ext_proc_server_0 receives the message.
serverReceiveHeaderReq(header_response, false, true);

// The ext_proc_server_0 receives one body chunk.
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request));
EXPECT_TRUE(body_request.has_response_body());

// The ext_proc_server_0 sends back the header response.
serverSendHeaderResp(false, true);
// The ext_proc_server_0 sends back one body chunk.
response_body.Clear();
body_resp = response_body.mutable_response_body();
body_mut = body_resp->mutable_response()->mutable_body_mutation();
streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("n");
processor_stream_->sendGrpcMessage(response_body);

(void)serverReceiveBodyDuplexStreamed("", processor_stream_, true, false);
total_resp_body_msg = 10;
const std::string body_downstream(total_resp_body_msg + 1, 'n');
serverSendBodyRespDuplexStreamed(total_resp_body_msg, processor_stream_, /*end_stream*/ false,
/*response*/ true, "n");
serverSendTrailerRespDuplexStreamed(processor_stream_, true);

verifyDownstreamResponse(*response, 200);
EXPECT_EQ(body_downstream, response->body());
EXPECT_THAT(response->headers(), ContainsHeader("x-new-header", "new"));
EXPECT_THAT(response->headers(), ContainsHeader("x-new-header_1", "new_1"));
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) {
bool encoding_watermarked = false;
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl want_response_body;
Buffer::OwnedImpl got_response_body;
Expand Down Expand Up @@ -210,7 +210,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) {
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
// Server sending headers response without waiting for body.
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl want_response_body;
Buffer::OwnedImpl got_response_body;
Expand Down Expand Up @@ -368,7 +368,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithFilterConfigMissing)
bool encoding_watermarked = false;
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

for (int i = 0; i < 4; i++) {
// 4 request chunks are sent to the ext_proc server.
Expand Down Expand Up @@ -417,7 +417,7 @@ TEST_F(HttpFilterTest, FullDuplexFailCloseWithDataOutbound) {
bool encoding_watermarked = false;
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl resp_chunk;
TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100);
Expand Down Expand Up @@ -487,7 +487,7 @@ TEST_F(HttpFilterTest, FullDuplexFailCloseWithDataInbound) {
request_headers_.addCopy(LowerCaseString("content-type"), "text/plain");

EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);
Buffer::OwnedImpl req_data("foo");
EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(req_data, false));

Expand Down
26 changes: 13 additions & 13 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ TEST_F(HttpFilterTest, StreamingDataSmallChunk) {
request_headers_.setMethod("POST");
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

const uint32_t chunk_number = 20;
sendChunkRequestData(chunk_number, true);
Expand Down Expand Up @@ -1786,7 +1786,7 @@ TEST_F(HttpFilterTest, StreamingSendRequestDataGrpcFail) {
request_headers_.setMethod("POST");
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

Buffer::OwnedImpl req_data("foo");
const uint32_t chunk_number = 20;
Expand Down Expand Up @@ -1838,7 +1838,7 @@ TEST_F(HttpFilterTest, StreamingSendResponseDataGrpcFail) {
request_headers_.setMethod("POST");
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

const uint32_t chunk_number = 20;
sendChunkRequestData(chunk_number, true);
Expand Down Expand Up @@ -2031,7 +2031,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) {
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
EXPECT_TRUE(last_request_.has_protocol_config());
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
// Test content-length header is removed in request in streamed mode.
EXPECT_EQ(request_headers_.ContentLength(), nullptr);
Expand Down Expand Up @@ -2066,7 +2066,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) {
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
EXPECT_EQ(0, config_->stats().streams_closed_.value());
EXPECT_FALSE(last_request_.has_protocol_config());
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);
EXPECT_EQ(0, config_->stats().streams_closed_.value());
// Test content-length header is removed in response in streamed mode.
EXPECT_EQ(response_headers_.ContentLength(), nullptr);
Expand Down Expand Up @@ -2139,7 +2139,7 @@ TEST_F(HttpFilterTest, PostStreamingBodiesDifferentOrder) {

EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

bool decoding_watermarked = false;
setUpDecodingWatermarking(decoding_watermarked);
Expand Down Expand Up @@ -2183,7 +2183,7 @@ TEST_F(HttpFilterTest, PostStreamingBodiesDifferentOrder) {
response_buffer.move(resp_chunk);
}

processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);
EXPECT_EQ(0, response_buffer.length());
EXPECT_FALSE(encoding_watermarked);
got_response_body.move(response_buffer);
Expand Down Expand Up @@ -2255,7 +2255,7 @@ TEST_F(HttpFilterTest, GetStreamingBodyAndChangeMode) {
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_CALL(encoder_callbacks_, encodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl want_response_body;
Buffer::OwnedImpl got_response_body;
Expand Down Expand Up @@ -2343,7 +2343,7 @@ TEST_F(HttpFilterTest, GetStreamingBodyAndChangeModeDifferentOrder) {
setUpEncodingWatermarking(encoding_watermarked);
EXPECT_CALL(encoder_callbacks_, encodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl want_response_body;
Buffer::OwnedImpl got_response_body;
Expand Down Expand Up @@ -3552,7 +3552,7 @@ TEST_F(HttpFilterTest, StreamedBodyCallbackWithEmptyQueue) {
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));

// Handle headers response to establish the gRPC stream and initialize stream_callbacks_.
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

// Manually transition decoding_state_ to StreamedBodyCallback while the chunk_queue_ is empty.
auto& decoding_state = const_cast<ProcessorState&>(filter_->decodingState());
Expand Down Expand Up @@ -4465,7 +4465,7 @@ TEST_F(HttpFilterTest, HeaderRespReceivedBeforeBody) {
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false));

// Header response arrives before any body data.
processResponseHeaders(false, absl::nullopt);
processResponseHeaders(true, absl::nullopt);

Buffer::OwnedImpl want_response_body;
Buffer::OwnedImpl got_response_body;
Expand Down Expand Up @@ -5540,7 +5540,7 @@ TEST_F(HttpFilterTest, SaveResponseTrailers) {
request_headers_.setMethod("POST");
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

const uint32_t chunk_number = 20;
sendChunkRequestData(chunk_number, true);
Expand Down Expand Up @@ -5664,7 +5664,7 @@ TEST_F(HttpFilterTest, DontSaveProcessingResponse) {
request_headers_.setMethod("POST");
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr));
EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false));
processRequestHeaders(false, absl::nullopt);
processRequestHeaders(true, absl::nullopt);

const uint32_t chunk_number = 20;
sendChunkRequestData(chunk_number, true);
Expand Down
Loading