Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core;

import java.util.Objects;
import java.util.function.UnaryOperator;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.SplittingTransformer;
Expand All @@ -35,9 +36,11 @@ public final class SplittingTransformerConfiguration implements ToCopyableBuilde
SplittingTransformerConfiguration> {

private final Long bufferSizeInBytes;
private final UnaryOperator<SdkResponse> responseMapper;

private SplittingTransformerConfiguration(DefaultBuilder builder) {
this.bufferSizeInBytes = Validate.paramNotNull(builder.bufferSize, "bufferSize");
this.responseMapper = builder.responseMapper;
}

/**
Expand All @@ -54,6 +57,13 @@ public Long bufferSizeInBytes() {
return bufferSizeInBytes;
}

/**
* @return the response mapper, or null if not set
*/
public UnaryOperator<SdkResponse> responseMapper() {
return responseMapper;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -65,12 +75,15 @@ public boolean equals(Object o) {

SplittingTransformerConfiguration that = (SplittingTransformerConfiguration) o;

return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes);
return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes)
&& Objects.equals(responseMapper, that.responseMapper);
}

@Override
public int hashCode() {
return bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0;
int result = bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0;
result = 31 * result + (responseMapper != null ? responseMapper.hashCode() : 0);
return result;
}

@Override
Expand All @@ -94,13 +107,25 @@ public interface Builder extends CopyableBuilder<Builder, SplittingTransformerCo
* @return This object for method chaining.
*/
Builder bufferSizeInBytes(Long bufferSize);

/**
* Configures a response mapper that will be applied to the response before it is delivered to the
* upstream transformer's {@code onResponse} callback. Used internally by the S3 multipart download
* to rewrite per-part metadata to full-object metadata.
*
* @param responseMapper a function to transform the response before delivery, or null for no mapping
* @return This object for method chaining.
*/
Builder responseMapper(UnaryOperator<SdkResponse> responseMapper);
}

private static final class DefaultBuilder implements Builder {
private Long bufferSize;
private UnaryOperator<SdkResponse> responseMapper;

private DefaultBuilder(SplittingTransformerConfiguration configuration) {
this.bufferSize = configuration.bufferSizeInBytes;
this.responseMapper = configuration.responseMapper;
}

private DefaultBuilder() {
Expand All @@ -112,6 +137,12 @@ public Builder bufferSizeInBytes(Long bufferSize) {
return this;
}

@Override
public Builder responseMapper(UnaryOperator<SdkResponse> responseMapper) {
this.responseMapper = responseMapper;
return this;
}

@Override
public SplittingTransformerConfiguration build() {
return new SplittingTransformerConfiguration(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ default SplitResult<ResponseT, ResultT> split(SplittingTransformerConfiguration
.<ResponseT, ResultT>builder()
.upstreamResponseTransformer(this)
.maximumBufferSizeInBytes(splitConfig.bufferSizeInBytes())
.responseMapper(splitConfig.responseMapper())
.resultFuture(future)
.build();
return AsyncResponseTransformer.SplitResult.<ResponseT, ResultT>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void exceptionOccurred(Throwable throwable) {
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransformerConfiguration splitConfig) {
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
new ByteArraySplittingTransformer<>(this, future);
new ByteArraySplittingTransformer<>(this, future, splitConfig.responseMapper());
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
.publisher(transformer)
.resultFuture(future)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
Expand Down Expand Up @@ -84,12 +86,30 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As

private final Map<Integer, ByteBuffer> buffers;

private final UnaryOperator<SdkResponse> responseMapper;

public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
this(upstreamResponseTransformer, resultFuture, UnaryOperator.identity());
}

public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to new ctor? can we just add a new parameter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up, the mapper now comes from the config rather than a separate split(config, mapper),

upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture,
UnaryOperator<SdkResponse> responseMapper) {
this.upstreamResponseTransformer = upstreamResponseTransformer;
this.resultFuture = resultFuture;
this.buffers = new ConcurrentHashMap<>();
this.responseMapper = responseMapper != null ? responseMapper : UnaryOperator.identity();
}

@SuppressWarnings("unchecked")
private ResponseT mapResponse(ResponseT response) {
if (!(response instanceof SdkResponse)) {
return response;
}
return (ResponseT) responseMapper.apply((SdkResponse) response);
}

@Override
Expand Down Expand Up @@ -181,7 +201,7 @@ private void handleSubscriptionCancel() {
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare();
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture);

upstreamResponseTransformer.onResponse(responseT.get());
upstreamResponseTransformer.onResponse(mapResponse(responseT.get()));

int totalPartCount = nextPartNumber.get() - 1;
if (buffers.size() != totalPartCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
Expand Down Expand Up @@ -112,16 +114,18 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As

private final Object cancelLock = new Object();

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
private final UnaryOperator<SdkResponse> responseMapper;

private SplittingTransformer(Builder<ResponseT, ResultT> builder) {
this.upstreamResponseTransformer = Validate.paramNotNull(
upstreamResponseTransformer, "upstreamResponseTransformer");
this.resultFuture = Validate.paramNotNull(
resultFuture, "resultFuture");
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
builder.upstreamResponseTransformer, "upstreamResponseTransformer");
this.resultFuture = Validate.paramNotNull(builder.returnFuture, "resultFuture");
Validate.notNull(builder.maximumBufferSize, "maximumBufferSizeInBytes");
this.maximumBufferInBytes = Validate.isPositive(
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
builder.maximumBufferSize, "maximumBufferSizeInBytes");
this.responseMapper = builder.responseMapper != null
? builder.responseMapper
: UnaryOperator.identity();

this.resultFuture.whenComplete((r, e) -> {
if (e == null) {
Expand All @@ -133,6 +137,14 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
});
}

@SuppressWarnings("unchecked")
private ResponseT mapResponse(ResponseT response) {
if (!(response instanceof SdkResponse)) {
return response;
}
return (ResponseT) responseMapper.apply((SdkResponse) response);
}

/**
* @param downstreamSubscriber the {@link Subscriber} to the individual AsyncResponseTransformer
*/
Expand Down Expand Up @@ -296,7 +308,7 @@ public CompletableFuture<ResponseT> prepare() {
public void onResponse(ResponseT response) {
if (onResponseCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onResponse on the upstream transformer");
upstreamResponseTransformer.onResponse(response);
upstreamResponseTransformer.onResponse(mapResponse(response));
}
this.response = response;
}
Expand Down Expand Up @@ -393,6 +405,7 @@ public static final class Builder<ResponseT, ResultT> {
private Long maximumBufferSize;
private CompletableFuture<ResultT> returnFuture;
private AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;
private UnaryOperator<SdkResponse> responseMapper;

private Builder() {
}
Expand Down Expand Up @@ -437,10 +450,13 @@ public Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> retur
return this;
}

public Builder<ResponseT, ResultT> responseMapper(UnaryOperator<SdkResponse> responseMapper) {
this.responseMapper = responseMapper;
return this;
}

public SplittingTransformer<ResponseT, ResultT> build() {
return new SplittingTransformer<>(this.upstreamResponseTransformer,
this.maximumBufferSize,
this.returnFuture);
return new SplittingTransformer<>(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest
}

private GetObjectRequest attachSdkAttribute(GetObjectRequest request,
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
request.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
Expand Down
Loading
Loading