Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-aee08c2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Fixed a potential deadlock in `AwsCrtHttpClient` that could occur when the request body `InputStream` blocked waiting for data on the CRT event loop thread. This could happen when a blocking stream (e.g., a `BufferedInputStream` wrapping a `ResponseInputStream`) was used as a request body and the read depended on the same event loop thread to deliver data. Request body writing now happens on the caller thread."
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import static software.amazon.awssdk.http.HttpMetric.HTTP_CLIENT_NAME;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.http.HttpException;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamManager;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
Expand All @@ -35,6 +39,7 @@
import software.amazon.awssdk.http.crt.internal.AwsCrtClientBuilderBase;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.crt.internal.CrtRequestExecutor;
import software.amazon.awssdk.http.crt.internal.CrtUtils;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand Down Expand Up @@ -107,6 +112,8 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
}

private static final class CrtHttpRequest implements ExecutableHttpRequest {
private static final int WRITE_BUFFER_SIZE = 16 * 1024;

private final CrtRequestContext context;
private volatile CompletableFuture<SdkHttpFullResponse> responseFuture;

Expand All @@ -119,7 +126,14 @@ public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse.Builder builder = HttpExecuteResponse.builder();

try {
responseFuture = new CrtRequestExecutor().execute(context);
CrtRequestExecutor.ExecutionResult execution = new CrtRequestExecutor().execute(context);
responseFuture = execution.responseFuture();

// Wait for the stream to be acquired, then write the request body from the caller thread.
// This avoids blocking the CRT event loop thread in InputStream.read().
HttpStreamBase stream = CompletableFutureUtils.joinInterruptibly(execution.streamFuture());
writeRequestBody(stream);

SdkHttpFullResponse response = CompletableFutureUtils.joinInterruptibly(responseFuture);
builder.response(response);
builder.responseBody(response.content().orElse(null));
Expand All @@ -140,6 +154,10 @@ public HttpExecuteResponse call() throws IOException {
}

if (cause instanceof HttpException) {
Throwable wrapped = CrtUtils.wrapCrtException(cause);
if (wrapped instanceof IOException) {
throw (IOException) wrapped;
}
throw (HttpException) cause;
}

Expand All @@ -151,6 +169,24 @@ public HttpExecuteResponse call() throws IOException {
}
}

private void writeRequestBody(HttpStreamBase stream) throws IOException {
ContentStreamProvider provider = context.sdkRequest().contentStreamProvider().orElse(null);
if (provider == null) {
return;
}

try (InputStream inputStream = provider.newStream()) {
byte[] buf = new byte[WRITE_BUFFER_SIZE];
int read;

while ((read = inputStream.read(buf, 0, buf.length)) >= 0) {
byte[] chunk = read == buf.length ? buf : Arrays.copyOf(buf, read);
CompletableFutureUtils.joinInterruptibly(stream.writeData(chunk, false));
}
Comment on lines +182 to +185

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the input stream has no data available, do we just keep looping until we find data? Seems like we will end up with hot looping and wasting CPU loops for this case.

And, a bit optimization, when there is no data, it will be better to skip invoking the writeData, so that less work to be done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per InputStream contract, a valid implementation shouldn't return 0 from read() method; it just blocks until data arrives or returns -1 at EOF.

If len is zero, then no bytes are read and 0 is returned; otherwise, there is an attempt to read at least one byte. If no byte is available because the stream is at end of file, the value -1 is returned; otherwise, at least one byte is read and stored into b.

CompletableFutureUtils.joinInterruptibly(stream.writeData(null, true));
}
}

@Override
public void abort() {
if (responseFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,40 @@
@SdkInternalApi
public final class CrtRequestExecutor {

public CompletableFuture<SdkHttpFullResponse> execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> requestFuture = new CompletableFuture<>();
public ExecutionResult execute(CrtRequestContext executionContext) {
CompletableFuture<SdkHttpFullResponse> responseFuture = new CompletableFuture<>();
CompletableFuture<HttpStreamBase> streamFuture;

try {
doExecute(executionContext, requestFuture);
streamFuture = doExecute(executionContext, responseFuture);
} catch (Throwable t) {
requestFuture.completeExceptionally(t);
responseFuture.completeExceptionally(t);
streamFuture = new CompletableFuture<>();
streamFuture.completeExceptionally(t);
}

return requestFuture;
return new ExecutionResult(streamFuture, responseFuture);
}

private void doExecute(CrtRequestContext executionContext, CompletableFuture<SdkHttpFullResponse> requestFuture) {
private CompletableFuture<HttpStreamBase> doExecute(CrtRequestContext executionContext,
CompletableFuture<SdkHttpFullResponse> responseFuture) {
MetricCollector metricCollector = executionContext.metricCollector();
boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);

long acquireStartTime = 0;

if (shouldPublishMetrics) {
// go ahead and get acquireStartTime for the concurrency timer as early as possible,
// so it's as accurate as possible, but only do it in a branch since clock_gettime()
// results in a full sys call barrier (multiple mutexes and a hw interrupt).
acquireStartTime = System.nanoTime();
}

HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(requestFuture);
HttpStreamBaseResponseHandler crtResponseHandler = new InputStreamAdaptingHttpStreamResponseHandler(responseFuture);

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);

boolean hasBody = executionContext.sdkRequest().contentStreamProvider().isPresent();

CompletableFuture<HttpStreamBase> streamFuture =
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler);
executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler, hasBody);

long finalAcquireStartTime = acquireStartTime;

Expand All @@ -73,8 +76,33 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture<Sdk

if (throwable != null) {
Throwable toThrow = wrapCrtException(throwable);
requestFuture.completeExceptionally(toThrow);
responseFuture.completeExceptionally(toThrow);
}
});

return streamFuture;
}

/**
* Holds the result of submitting a request to CRT: the stream (for writing body data via
* {@code writeData}) and the response future (for reading the response).
*/
public static final class ExecutionResult {
private final CompletableFuture<HttpStreamBase> streamFuture;
private final CompletableFuture<SdkHttpFullResponse> responseFuture;

ExecutionResult(CompletableFuture<HttpStreamBase> streamFuture,
CompletableFuture<SdkHttpFullResponse> responseFuture) {
this.streamFuture = streamFuture;
this.responseFuture = responseFuture;
}

public CompletableFuture<HttpStreamBase> streamFuture() {
return streamFuture;
}

public CompletableFuture<SdkHttpFullResponse> responseFuture() {
return responseFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,7 @@ public static HttpRequest toCrtRequest(CrtRequestContext request) {
HttpHeader[] crtHeaderArray = asArray(createHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest));

String finalEncodedPath = encodedPath + encodedQueryString;
return sdkExecuteRequest.contentStreamProvider()
.map(provider -> new HttpRequest(method,
finalEncodedPath,
crtHeaderArray,
new CrtRequestInputStreamAdapter(provider)))
.orElse(new HttpRequest(method,
finalEncodedPath,
crtHeaderArray, null));
return new HttpRequest(method, finalEncodedPath, crtHeaderArray, null);
}

private static HttpHeader[] asArray(List<HttpHeader> crtHeaderList) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void execute_requestConversionFails_failsFuture() {
.request(HttpExecuteRequest.builder().build())
.build();

CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();

assertThat(executeFuture).hasFailedWithThrowableThat().isInstanceOf(NullPointerException.class);
}
Expand All @@ -98,11 +98,11 @@ public void execute_acquireStreamFails_wrapsWithIOException() {
CrtRequestContext context = crtRequestContext();
CompletableFuture<HttpStreamBase> completableFuture = new CompletableFuture<>();

Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
.thenReturn(completableFuture);
completableFuture.completeExceptionally(exception);

CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();

assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(exception).isInstanceOf(IOException.class);
}
Expand All @@ -113,10 +113,10 @@ public void execute_retryableException_wrapsWithIOException(Throwable throwable)
CrtRequestContext context = crtRequestContext();
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(throwable);

Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
.thenReturn(completableFuture);

CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
assertThat(executeFuture).hasFailedWithThrowableThat().hasCause(throwable).isInstanceOf(IOException.class);
}

Expand All @@ -130,10 +130,10 @@ public void execute_httpException_mapsToCorrectException(Entry<Integer, Class<?
HttpException exception = new HttpException(errorCode);
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(exception);

Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
.thenReturn(completableFuture);

CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCauseInstanceOf(expectedExceptionClass);
}

Expand All @@ -143,10 +143,10 @@ public void execute_nonRetryableHttpException_doesNotWrapWithIOException() {
CrtRequestContext context = crtRequestContext();
CompletableFuture<HttpStreamBase> completableFuture = CompletableFutureUtils.failedFuture(exception);

Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class)))
Mockito.when(streamManager.acquireStream(Mockito.any(HttpRequest.class), Mockito.any(HttpStreamBaseResponseHandler.class), Mockito.anyBoolean()))
.thenReturn(completableFuture);

CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context);
CompletableFuture<SdkHttpFullResponse> executeFuture = requestExecutor.execute(context).responseFuture();
assertThatThrownBy(executeFuture::join).hasCause(exception);
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<rxjava3.version>3.1.5</rxjava3.version>
<commons-codec.verion>1.17.1</commons-codec.verion>
<jmh.version>1.37</jmh.version>
<awscrt.version>0.45.1</awscrt.version>
<awscrt.version>1.0.0-SNAPSHOT</awscrt.version>

@zoewangg zoewangg May 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pending on awslabs/aws-crt-java@main...write-stream to be merged


<!--Test dependencies -->
<junit5.version>5.10.3</junit5.version>
Expand Down