diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 81bb91df8f071..4cb533ce87798 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -259,7 +259,6 @@ absl::Status ProcessorState::handleHeaderContinue() { } else if (body_mode_ == ProcessingMode::STREAMED || body_mode_ == ProcessingMode::FULL_DUPLEX_STREAMED) { sendBufferedDataInStreamedMode(false); - continueIfNecessary(); return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) { return handleBufferedPartialMode(); diff --git a/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc index 98f51a867bf6f..e11edda0917f1 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_full_duplex_integration_test.cc @@ -987,6 +987,143 @@ TEST_P(ExtProcIntegrationTest, EXPECT_THAT(response->headers(), ContainsHeader("x-new-header_1", "new_1")); } +// With trailers, both directions, server buffers one chunks of body before sending back the +// response. +TEST_P(ExtProcIntegrationTest, TwoExtProcFiltersBothDuplexInBothDirectionWithTrailerRandom) { + twoExtProcFiltersFullDuplexConfig(); + + const std::string body_sent(10 * 1024, 's'); + IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, false, false); + Http::TestRequestTrailerMapImpl request_trailers{{"x-request-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + // The ext_proc_server_0 receives the headers. + ProcessingRequest header_request; + serverReceiveHeaderReq(header_request); + + // The ext_proc_server_0 receives one body chunk. + ProcessingRequest body_request; + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request)); + EXPECT_TRUE(body_request.has_request_body()); + + // The ext_proc_server_0 sends back the header response. + serverSendHeaderResp(); + // The ext_proc_server_0 sends back one body chunk. + ProcessingResponse response_body; + BodyResponse* body_resp; + body_resp = response_body.mutable_request_body(); + auto* body_mut = body_resp->mutable_response()->mutable_body_mutation(); + auto* streamed_response = body_mut->mutable_streamed_response(); + streamed_response->set_body("r"); + processor_stream_->sendGrpcMessage(response_body); + + // The ext_proc_server_0 receives the body and trailers. + uint32_t total_req_body_msg = + serverReceiveBodyDuplexStreamed("", processor_stream_, false, false); + + // The ext_proc_server_0 sends back the body and trailers. + serverSendBodyRespDuplexStreamed(total_req_body_msg, processor_stream_, /*end_stream*/ false, + false, ""); + serverSendTrailerRespDuplexStreamed(processor_stream_, false); + + // The ext_proc_server_1 receives the headers. + server1ReceiveHeaderReq(header_request); + + // The ext_proc_server_1 receives one body chunk. + EXPECT_TRUE(processor_stream_1_->waitForGrpcMessage(*dispatcher_, body_request)); + EXPECT_TRUE(body_request.has_request_body()); + + // The ext_proc_server_1 sends back the header response. + server1SendHeaderResp(); + // The ext_proc_server_1 sends back one body chunk. + response_body.Clear(); + body_resp = response_body.mutable_request_body(); + body_mut = body_resp->mutable_response()->mutable_body_mutation(); + streamed_response = body_mut->mutable_streamed_response(); + streamed_response->set_body("r"); + processor_stream_1_->sendGrpcMessage(response_body); + + // The ext_proc_server_1 receives the body and trailers. + total_req_body_msg = serverReceiveBodyDuplexStreamed("", processor_stream_1_, false, false); + + // The ext_proc_server_1 sends back the body and trailers. + const std::string body_upstream(total_req_body_msg + 1, 'r'); + serverSendBodyRespDuplexStreamed(total_req_body_msg, processor_stream_1_, /*end_stream*/ false, + false, ""); + serverSendTrailerRespDuplexStreamed(processor_stream_1_, false); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + for (int i = 0; i < 10; ++i) { + upstream_request_->encodeData(1024, false); + } + upstream_request_->encodeTrailers(Http::TestResponseTrailerMapImpl{{"x-test-trailers", "Yes"}}); + + EXPECT_THAT(upstream_request_->headers(), ContainsHeader("x-new-header", "new")); + EXPECT_THAT(upstream_request_->headers(), ContainsHeader("x-new-header_1", "new_1")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + + ASSERT_NE(upstream_request_->trailers(), nullptr); + EXPECT_THAT(*upstream_request_->trailers(), ContainsHeader("x-new-trailer", "new")); + EXPECT_THAT(*upstream_request_->trailers(), ContainsHeader("x-new-trailer_1", "new_1")); + + // Now the response processing. In this direction, filter-1 sees the message first. + ProcessingRequest header_response; + server1ReceiveHeaderReq(header_response, false, true); + // The ext_proc_server_1 receives one body chunk. + EXPECT_TRUE(processor_stream_1_->waitForGrpcMessage(*dispatcher_, body_request)); + EXPECT_TRUE(body_request.has_response_body()); + + // The ext_proc_server_1 sends back the header response. + server1SendHeaderResp(false, true); + // The ext_proc_server_1 sends back one body chunk. + response_body.Clear(); + body_resp = response_body.mutable_response_body(); + body_mut = body_resp->mutable_response()->mutable_body_mutation(); + streamed_response = body_mut->mutable_streamed_response(); + streamed_response->set_body("n"); + processor_stream_1_->sendGrpcMessage(response_body); + + (void)serverReceiveBodyDuplexStreamed("", processor_stream_1_, true, false); + + uint32_t total_resp_body_msg = 10; + serverSendBodyRespDuplexStreamed(total_resp_body_msg, processor_stream_1_, /*end_stream*/ false, + /*response*/ true, "m"); + serverSendTrailerRespDuplexStreamed(processor_stream_1_, true); + + // Now the ext_proc_server_0 receives the message. + serverReceiveHeaderReq(header_response, false, true); + + // The ext_proc_server_0 receives one body chunk. + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request)); + EXPECT_TRUE(body_request.has_response_body()); + + // The ext_proc_server_0 sends back the header response. + serverSendHeaderResp(false, true); + // The ext_proc_server_0 sends back one body chunk. + response_body.Clear(); + body_resp = response_body.mutable_response_body(); + body_mut = body_resp->mutable_response()->mutable_body_mutation(); + streamed_response = body_mut->mutable_streamed_response(); + streamed_response->set_body("n"); + processor_stream_->sendGrpcMessage(response_body); + + (void)serverReceiveBodyDuplexStreamed("", processor_stream_, true, false); + total_resp_body_msg = 10; + const std::string body_downstream(total_resp_body_msg + 1, 'n'); + serverSendBodyRespDuplexStreamed(total_resp_body_msg, processor_stream_, /*end_stream*/ false, + /*response*/ true, "n"); + serverSendTrailerRespDuplexStreamed(processor_stream_, true); + + verifyDownstreamResponse(*response, 200); + EXPECT_EQ(body_downstream, response->body()); + EXPECT_THAT(response->headers(), ContainsHeader("x-new-header", "new")); + EXPECT_THAT(response->headers(), ContainsHeader("x-new-header_1", "new_1")); +} + } // namespace ExternalProcessing } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc b/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc index d6e9ca41afebc..0e838e2d60767 100644 --- a/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_full_duplex_test.cc @@ -117,7 +117,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) { bool encoding_watermarked = false; setUpEncodingWatermarking(encoding_watermarked); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl want_response_body; Buffer::OwnedImpl got_response_body; @@ -210,7 +210,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) { setUpEncodingWatermarking(encoding_watermarked); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); // Server sending headers response without waiting for body. - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl want_response_body; Buffer::OwnedImpl got_response_body; @@ -368,7 +368,7 @@ TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithFilterConfigMissing) bool encoding_watermarked = false; setUpEncodingWatermarking(encoding_watermarked); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); for (int i = 0; i < 4; i++) { // 4 request chunks are sent to the ext_proc server. @@ -417,7 +417,7 @@ TEST_F(HttpFilterTest, FullDuplexFailCloseWithDataOutbound) { bool encoding_watermarked = false; setUpEncodingWatermarking(encoding_watermarked); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl resp_chunk; TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); @@ -487,7 +487,7 @@ TEST_F(HttpFilterTest, FullDuplexFailCloseWithDataInbound) { request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); Buffer::OwnedImpl req_data("foo"); EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->decodeData(req_data, false)); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 34e2e437593b3..530d05faf1213 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -1736,7 +1736,7 @@ TEST_F(HttpFilterTest, StreamingDataSmallChunk) { request_headers_.setMethod("POST"); EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); const uint32_t chunk_number = 20; sendChunkRequestData(chunk_number, true); @@ -1786,7 +1786,7 @@ TEST_F(HttpFilterTest, StreamingSendRequestDataGrpcFail) { request_headers_.setMethod("POST"); EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); Buffer::OwnedImpl req_data("foo"); const uint32_t chunk_number = 20; @@ -1838,7 +1838,7 @@ TEST_F(HttpFilterTest, StreamingSendResponseDataGrpcFail) { request_headers_.setMethod("POST"); EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); const uint32_t chunk_number = 20; sendChunkRequestData(chunk_number, true); @@ -2031,7 +2031,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) { EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); EXPECT_TRUE(last_request_.has_protocol_config()); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); EXPECT_EQ(0, config_->stats().streams_closed_.value()); // Test content-length header is removed in request in streamed mode. EXPECT_EQ(request_headers_.ContentLength(), nullptr); @@ -2066,7 +2066,7 @@ TEST_F(HttpFilterTest, PostStreamingBodies) { EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); EXPECT_EQ(0, config_->stats().streams_closed_.value()); EXPECT_FALSE(last_request_.has_protocol_config()); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); EXPECT_EQ(0, config_->stats().streams_closed_.value()); // Test content-length header is removed in response in streamed mode. EXPECT_EQ(response_headers_.ContentLength(), nullptr); @@ -2139,7 +2139,7 @@ TEST_F(HttpFilterTest, PostStreamingBodiesDifferentOrder) { EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); bool decoding_watermarked = false; setUpDecodingWatermarking(decoding_watermarked); @@ -2183,7 +2183,7 @@ TEST_F(HttpFilterTest, PostStreamingBodiesDifferentOrder) { response_buffer.move(resp_chunk); } - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); EXPECT_EQ(0, response_buffer.length()); EXPECT_FALSE(encoding_watermarked); got_response_body.move(response_buffer); @@ -2255,7 +2255,7 @@ TEST_F(HttpFilterTest, GetStreamingBodyAndChangeMode) { setUpEncodingWatermarking(encoding_watermarked); EXPECT_CALL(encoder_callbacks_, encodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl want_response_body; Buffer::OwnedImpl got_response_body; @@ -2343,7 +2343,7 @@ TEST_F(HttpFilterTest, GetStreamingBodyAndChangeModeDifferentOrder) { setUpEncodingWatermarking(encoding_watermarked); EXPECT_CALL(encoder_callbacks_, encodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl want_response_body; Buffer::OwnedImpl got_response_body; @@ -3552,7 +3552,7 @@ TEST_F(HttpFilterTest, StreamedBodyCallbackWithEmptyQueue) { EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); // Handle headers response to establish the gRPC stream and initialize stream_callbacks_. - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); // Manually transition decoding_state_ to StreamedBodyCallback while the chunk_queue_ is empty. auto& decoding_state = const_cast(filter_->decodingState()); @@ -4465,7 +4465,7 @@ TEST_F(HttpFilterTest, HeaderRespReceivedBeforeBody) { EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); // Header response arrives before any body data. - processResponseHeaders(false, absl::nullopt); + processResponseHeaders(true, absl::nullopt); Buffer::OwnedImpl want_response_body; Buffer::OwnedImpl got_response_body; @@ -5540,7 +5540,7 @@ TEST_F(HttpFilterTest, SaveResponseTrailers) { request_headers_.setMethod("POST"); EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); const uint32_t chunk_number = 20; sendChunkRequestData(chunk_number, true); @@ -5664,7 +5664,7 @@ TEST_F(HttpFilterTest, DontSaveProcessingResponse) { request_headers_.setMethod("POST"); EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillRepeatedly(Return(nullptr)); EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); - processRequestHeaders(false, absl::nullopt); + processRequestHeaders(true, absl::nullopt); const uint32_t chunk_number = 20; sendChunkRequestData(chunk_number, true);