Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -79,6 +80,18 @@ public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransform
.build();
}

public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(
SplittingTransformerConfiguration splitConfig,
UnaryOperator<ResponseT> responseMapper) {
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
new ByteArraySplittingTransformer<>(this, future, responseMapper);
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
.publisher(transformer)
.resultFuture(future)
.build();
}

@Override
public String name() {
return TransformerType.BYTES.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -84,12 +85,22 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As

private final Map<Integer, ByteBuffer> buffers;

private final UnaryOperator<ResponseT> responseMapper;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: don't we need to update FileAsyncResponseTransfomer as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, file's existing split(config) now reads the mapper from the config like every other transformer, so it's handled automatically


public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
this(upstreamResponseTransformer, resultFuture, UnaryOperator.identity());
}

public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to new ctor? can we just add a new parameter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up, the mapper now comes from the config rather than a separate split(config, mapper),

upstreamResponseTransformer,
CompletableFuture<ResponseBytes<ResponseT>> resultFuture,
UnaryOperator<ResponseT> responseMapper) {
this.upstreamResponseTransformer = upstreamResponseTransformer;
this.resultFuture = resultFuture;
this.buffers = new ConcurrentHashMap<>();
this.responseMapper = responseMapper;
}

@Override
Expand Down Expand Up @@ -181,7 +192,7 @@ private void handleSubscriptionCancel() {
CompletableFuture<ResponseBytes<ResponseT>> upstreamPrepareFuture = upstreamResponseTransformer.prepare();
CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture);

upstreamResponseTransformer.onResponse(responseT.get());
upstreamResponseTransformer.onResponse(responseMapper.apply(responseT.get()));

int totalPartCount = nextPartNumber.get() - 1;
if (buffers.size() != totalPartCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.UnaryOperator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand Down Expand Up @@ -112,16 +113,27 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As

private final Object cancelLock = new Object();

private final UnaryOperator<ResponseT> responseMapper;

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
this(upstreamResponseTransformer, maximumBufferSizeInBytes, resultFuture,
UnaryOperator.identity());
}

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update this ctor to take a Builder parameter? That way, we don't need to create a new ctor.

Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture,
UnaryOperator<ResponseT> responseMapper) {
this.upstreamResponseTransformer = Validate.paramNotNull(
upstreamResponseTransformer, "upstreamResponseTransformer");
this.resultFuture = Validate.paramNotNull(
resultFuture, "resultFuture");
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
this.maximumBufferInBytes = Validate.isPositive(
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
this.responseMapper = responseMapper;

this.resultFuture.whenComplete((r, e) -> {
if (e == null) {
Expand Down Expand Up @@ -296,7 +308,7 @@ public CompletableFuture<ResponseT> prepare() {
public void onResponse(ResponseT response) {
if (onResponseCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onResponse on the upstream transformer");
upstreamResponseTransformer.onResponse(response);
upstreamResponseTransformer.onResponse(responseMapper.apply(response));
}
this.response = response;
}
Expand Down Expand Up @@ -393,6 +405,7 @@ public static final class Builder<ResponseT, ResultT> {
private Long maximumBufferSize;
private CompletableFuture<ResultT> returnFuture;
private AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer;
private UnaryOperator<ResponseT> responseMapper;

private Builder() {
}
Expand Down Expand Up @@ -437,10 +450,18 @@ public Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> retur
return this;
}

public Builder<ResponseT, ResultT> responseMapper(UnaryOperator<ResponseT> responseMapper) {
this.responseMapper = responseMapper;
return this;
}

public SplittingTransformer<ResponseT, ResultT> build() {
return new SplittingTransformer<>(this.upstreamResponseTransformer,
this.maximumBufferSize,
this.returnFuture);
this.returnFuture,
this.responseMapper != null
? this.responseMapper
: UnaryOperator.identity());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.multipart;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.ChecksumMode;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

/**
* Integration test verifying that a custom AsyncResponseTransformer receives
* correct full-object response metadata when used with the multipart client.
*/
@Timeout(value = 5, unit = TimeUnit.MINUTES)
public class CustomTransformerMultipartIntegrationTest extends S3IntegrationTestBase {

private static final String BUCKET = temporaryBucketName(CustomTransformerMultipartIntegrationTest.class);
private static final int MIB = 1024 * 1024;
private static final int PART_SIZE = 5 * MIB;
private static final String LARGE_KEY = "large-object.dat";
private static final String MPU_CHECKSUM_KEY = "mpu-checksum-object.dat";
private static final long LARGE_OBJECT_SIZE = 3L * PART_SIZE; // 15MB → 3 parts

private static S3AsyncClient multipartClient;

@BeforeAll
static void setup() throws Exception {
setUp();
createBucket(BUCKET);

multipartClient = S3AsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.multipartEnabled(true)
.multipartConfiguration(c -> c.minimumPartSizeInBytes((long) PART_SIZE))
.build();

// Upload a large object via multipart
byte[] data = new byte[(int) LARGE_OBJECT_SIZE];
new Random(42).nextBytes(data);
multipartClient.putObject(r -> r.bucket(BUCKET).key(LARGE_KEY),
AsyncRequestBody.fromBytes(data)).join();

// Upload MPU object with CRC32 checksum
uploadMpuWithChecksum();
}

@AfterAll
static void tearDown() {
deleteBucketAndAllContents(BUCKET);
if (multipartClient != null) {
multipartClient.close();
}
}

@Test
void customTransformer_receivesFullObjectMetadata() {
AsyncResponseTransformer<GetObjectResponse, String> customTransformer =
new AsyncResponseTransformer<GetObjectResponse, String>() {
private CompletableFuture<String> future;
private GetObjectResponse response;

@Override
public CompletableFuture<String> prepare() {
future = new CompletableFuture<>();
return future;
}

@Override
public void onResponse(GetObjectResponse r) {
this.response = r;
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new Subscriber<ByteBuffer>() {
@Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); }
@Override public void onNext(ByteBuffer b) { }
@Override public void onError(Throwable t) { future.completeExceptionally(t); }
@Override public void onComplete() {
future.complete(
"contentLength=" + response.contentLength()
+ "|contentRange=" + response.contentRange());
}
});
}

@Override
public void exceptionOccurred(Throwable error) {
future.completeExceptionally(error);
}
};

String result = multipartClient.getObject(
GetObjectRequest.builder().bucket(BUCKET).key(LARGE_KEY).build(),
customTransformer).join();

assertThat(result).contains("contentLength=" + LARGE_OBJECT_SIZE);
assertThat(result).contains("contentRange=bytes 0-" + (LARGE_OBJECT_SIZE - 1) + "/" + LARGE_OBJECT_SIZE);
}

@Test
void customTransformer_mpuWithChecksumMode_checksumNulled() {
AsyncResponseTransformer<GetObjectResponse, String> customTransformer =
new AsyncResponseTransformer<GetObjectResponse, String>() {
private CompletableFuture<String> future;
private GetObjectResponse response;

@Override public CompletableFuture<String> prepare() { future = new CompletableFuture<>(); return future; }
@Override public void onResponse(GetObjectResponse r) { this.response = r; }
@Override public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(new Subscriber<ByteBuffer>() {
@Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); }
@Override public void onNext(ByteBuffer b) { }
@Override public void onError(Throwable t) { future.completeExceptionally(t); }
@Override public void onComplete() {
future.complete("contentLength=" + response.contentLength()
+ "|checksumType=" + response.checksumType()
+ "|checksumCRC32=" + response.checksumCRC32());
}
});
}
@Override public void exceptionOccurred(Throwable error) { future.completeExceptionally(error); }
};

String result = multipartClient.getObject(
GetObjectRequest.builder().bucket(BUCKET).key(MPU_CHECKSUM_KEY)
.checksumMode(ChecksumMode.ENABLED).build(),
customTransformer).join();

long expectedSize = 2L * PART_SIZE;
assertThat(result).contains("contentLength=" + expectedSize);
assertThat(result).contains("checksumType=COMPOSITE");
assertThat(result).contains("checksumCRC32=null");
}

private static void uploadMpuWithChecksum() {
S3Client syncClient = S3Client.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION).build();

CreateMultipartUploadResponse createResp = syncClient.createMultipartUpload(b -> b.bucket(BUCKET)
.key(MPU_CHECKSUM_KEY).checksumAlgorithm(ChecksumAlgorithm.CRC32));
String uploadId = createResp.uploadId();
List<CompletedPart> parts = new ArrayList<>();

for (int i = 1; i <= 2; i++) {
byte[] data = new byte[PART_SIZE];
new Random(i).nextBytes(data);
final int partNum = i;
UploadPartResponse uploadResp = syncClient.uploadPart(
b -> b.bucket(BUCKET).key(MPU_CHECKSUM_KEY).uploadId(uploadId).partNumber(partNum)
.checksumAlgorithm(ChecksumAlgorithm.CRC32),
RequestBody.fromBytes(data));
parts.add(CompletedPart.builder()
.partNumber(partNum).eTag(uploadResp.eTag()).checksumCRC32(uploadResp.checksumCRC32()).build());
}

syncClient.completeMultipartUpload(b -> b.bucket(BUCKET).key(MPU_CHECKSUM_KEY).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build()));
syncClient.close();
}
}
Loading
Loading