Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.FileTransformerConfiguration;
Expand Down Expand Up @@ -162,6 +164,28 @@ default SplitResult<ResponseT, ResultT> split(Consumer<SplittingTransformerConfi
return split(conf);
}

/**
* Creates a {@link SplitResult} with a response mapper applied at the upstream {@code onResponse} delivery point.
*/
@SdkInternalApi
default SplitResult<ResponseT, ResultT> split(SplittingTransformerConfiguration splitConfig,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO all public methods in a public API class are inherently public APIs, so we can't really add SdkInternalApi. Should we consider folding responseMapper into SplittingTransformerConfiguration. That way, we don't have to introduce another method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved responseMapper onto SplittingTransformerConfiguration and removed the extra split method

UnaryOperator<ResponseT> responseMapper) {
Validate.notNull(splitConfig, "splitConfig must not be null");
Validate.notNull(responseMapper, "responseMapper must not be null");
CompletableFuture<ResultT> future = new CompletableFuture<>();
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer = SplittingTransformer
.<ResponseT, ResultT>builder()
.upstreamResponseTransformer(this)
.maximumBufferSizeInBytes(splitConfig.bufferSizeInBytes())
.resultFuture(future)
.responseMapper(responseMapper)
.build();
return AsyncResponseTransformer.SplitResult.<ResponseT, ResultT>builder()
.publisher(transformer)
.resultFuture(future)
.build();
}

/**
* Each AsyncResponseTransformer should return a well-formed name that can be used to identify the implementation.
* The Transformer name should only include alphanumeric characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -79,6 +80,19 @@ public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransform
.build();
}

@Override
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(
SplittingTransformerConfiguration splitConfig,
UnaryOperator<ResponseT> responseMapper) {
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
new ByteArraySplittingTransformer<>(this, future, responseMapper);
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
.publisher(transformer)
.resultFuture(future)
.build();
}

@Override
public String name() {
return TransformerType.BYTES.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -84,12 +85,22 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As

private final Map<Integer, ByteBuffer> buffers;

private final UnaryOperator<ResponseT> responseMapper;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: don't we need to update FileAsyncResponseTransfomer as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, file's existing split(config) now reads the mapper from the config like every other transformer, so it's handled automatically


public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
this(upstreamResponseTransformer, resultFuture, UnaryOperator.identity());
}

public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to new ctor? can we just add a new parameter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up, the mapper now comes from the config rather than a separate split(config, mapper),

upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture,
UnaryOperator<ResponseT> responseMapper) {
this.upstreamResponseTransformer = upstreamResponseTransformer;
this.resultFuture = resultFuture;
this.buffers = new ConcurrentHashMap<>();
this.responseMapper = responseMapper;
}

@Override
Expand Down Expand Up @@ -181,7 +192,7 @@ private void handleSubscriptionCancel() {
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare();
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture);

upstreamResponseTransformer.onResponse(responseT.get());
upstreamResponseTransformer.onResponse(responseMapper.apply(responseT.get()));

int totalPartCount = nextPartNumber.get() - 1;
if (buffers.size() != totalPartCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -112,16 +113,18 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As

private final Object cancelLock = new Object();

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
private final UnaryOperator<ResponseT> responseMapper;

private SplittingTransformer(Builder<ResponseT, ResultT> builder) {
this.upstreamResponseTransformer = Validate.paramNotNull(
upstreamResponseTransformer, "upstreamResponseTransformer");
this.resultFuture = Validate.paramNotNull(
resultFuture, "resultFuture");
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
builder.upstreamResponseTransformer, "upstreamResponseTransformer");
this.resultFuture = Validate.paramNotNull(builder.returnFuture, "resultFuture");
Validate.notNull(builder.maximumBufferSize, "maximumBufferSizeInBytes");
this.maximumBufferInBytes = Validate.isPositive(
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
builder.maximumBufferSize, "maximumBufferSizeInBytes");
this.responseMapper = builder.responseMapper != null
? builder.responseMapper
: UnaryOperator.identity();

this.resultFuture.whenComplete((r, e) -> {
if (e == null) {
Expand Down Expand Up @@ -296,7 +299,7 @@ public CompletableFuture<ResponseT> prepare() {
public void onResponse(ResponseT response) {
if (onResponseCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onResponse on the upstream transformer");
upstreamResponseTransformer.onResponse(response);
upstreamResponseTransformer.onResponse(responseMapper.apply(response));
}
this.response = response;
}
Expand Down Expand Up @@ -393,6 +396,7 @@ public static final class Builder<ResponseT, ResultT> {
private Long maximumBufferSize;
private CompletableFuture<ResultT> returnFuture;
private AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;
private UnaryOperator<ResponseT> responseMapper;

private Builder() {
}
Expand Down Expand Up @@ -437,10 +441,13 @@ public Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> retur
return this;
}

public Builder<ResponseT, ResultT> responseMapper(UnaryOperator<ResponseT> responseMapper) {
this.responseMapper = responseMapper;
return this;
}

public SplittingTransformer<ResponseT, ResultT> build() {
return new SplittingTransformer<>(this.upstreamResponseTransformer,
this.maximumBufferSize,
this.returnFuture);
return new SplittingTransformer<>(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest
}

private GetObjectRequest attachSdkAttribute(GetObjectRequest request,
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
request.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
Expand Down Expand Up @@ -650,11 +650,18 @@ public final <ResultT> Download<ResultT> downloadWithPresignedUrl(
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(presignedDownloadRequest, null);
progressUpdater.transferInitiated();

responseTransformer = isS3ClientMultipartEnabled()
&& presignedDownloadRequest.presignedUrlDownloadRequest().range() == null
? progressUpdater.wrapForNonSerialFileDownload(
responseTransformer, GetObjectRequest.builder().build())
: progressUpdater.wrapResponseTransformer(responseTransformer);
if (isS3ClientMultipartEnabled()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes test failure for bytesTransferred not firing for presigned toBytes multipart downloads.
That path was routed to wrapForNonSerialFileDownload, which only counts bytes inside its split() override, but the serial download splits and drives onStream directly, bypassing it. Now routed by parallelSplitSupported() so serial toBytes uses wrapResponseTransformerForMultipartDownload (counts in onStream), mirroring the regular download path

&& presignedDownloadRequest.presignedUrlDownloadRequest().range() == null) {
if (responseTransformer.split(b -> b.bufferSizeInBytes(1L)).parallelSplitSupported()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that invoking responseTransformer.split may have implications, for example, involving a service call (they are harmless in ou implementations today, but we can't guarantee future implementations or custom implementations).

Is there another way?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, removed. With the mapper on the config, the wrapper's own split() handles both the rewrite and byte counting.

responseTransformer = progressUpdater.wrapForNonSerialFileDownload(
responseTransformer, GetObjectRequest.builder().build());
} else {
responseTransformer = progressUpdater.wrapResponseTransformerForMultipartDownload(
responseTransformer, GetObjectRequest.builder().build());
}
} else {
responseTransformer = progressUpdater.wrapResponseTransformer(responseTransformer);
}
progressUpdater.registerCompletion(returnFuture);

try {
Expand Down
Loading
Loading