Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/dockerfiles/syntheticbucketd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RUN npm install -g \
typescript@4.9.5
COPY package.json /app
COPY yarn.lock /app
COPY scality-cloudserverclient-v1.0.9.tgz /app
RUN cd /app && yarn install --network-concurrency 1

COPY tests/utils /app
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSI
&& tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz \
&& rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz

COPY package.json yarn.lock /usr/src/app/
COPY package.json yarn.lock scality-cloudserverclient-v1.0.9.tgz /usr/src/app/
RUN yarn install --ignore-engines --frozen-lockfile --production --network-concurrency 1 \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf ~/.node-gyp \
Expand Down
136 changes: 122 additions & 14 deletions extensions/replication/tasks/ReplicateObject.js
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can already check this pr, but should really be reviewed after all the other cascade prs, as changes in these pr would also mean changes here

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const async = require('async');
const { S3Client, GetBucketReplicationCommand, GetObjectCommand } = require('@aws-sdk/client-s3');

const errors = require('arsenal').errors;
const jsutil = require('arsenal').jsutil;
const ObjectMDLocation = require('arsenal').models.ObjectMDLocation;
const { errors, jsutil, models, versioning } = require('arsenal');
const ObjectMDLocation = models.ObjectMDLocation;
const { decode, checkCrrCascadeEvent } = versioning.VersionID;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arsenal is pinned to 8.3.9, but checkCrrCascadeEvent and decode are imported from versioning.VersionID. Neither checkCrrCascadeEvent nor the getMicroVersionId() method (called on source/dest entries throughout this PR) appear to exist in arsenal 8.3.9 or in backbeat's own models. The arsenal dependency likely needs a version bump for this PR to work.

— Claude Code


const ClientManager = require('../../../lib/clients/ClientManager');
const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');
Expand All @@ -30,6 +30,9 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const { authTypeAssumeRole } = require('../../../lib/constants');

const errorAlreadyCompleted = {};
const cascadeLoopDetected = {};
const cascadeDataComplete = {};
const partAlreadyAtDest = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
Expand Down Expand Up @@ -422,11 +425,16 @@ class ReplicateObject extends BackbeatTask {
const mpuConcLimit = this.repConfig.queueProcessor.mpuPartsConcurrency;
return mapLimitWaitPendingIfError(locations, mpuConcLimit, (part, done) => {
this._getAndPutPart(sourceEntry, destEntry, part, log, done);
}, (err, destLocations) => {
}, (err, partResults) => {
const allPartsAlreadyAtDest = !err &&
partResults.length > 0 &&
partResults.every(result => result === partAlreadyAtDest);
const destLocations = allPartsAlreadyAtDest ? [] :
(partResults || []).filter(result => result && result !== partAlreadyAtDest);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: when err is truthy (line 434 returns early), destLocations is used in the _deleteOrphans call, but destLocations was computed on line 432-433 with the !err guard already making allPartsAlreadyAtDest false. The filter on line 433 still runs (partResults || []).filter(...) which accesses partResults — but after an error, partResults from mapLimitWaitPendingIfError may contain partial results mixed with partAlreadyAtDest sentinel values. These sentinel objects (empty {}) will pass the result && result !== partAlreadyAtDest filter and be passed to _deleteOrphans, which tries to read .key and .dataStoreName from them. Since they're empty objects, they'll produce undefined keys — likely harmless because _deleteOrphans filters out falsy .key, but worth a defensive check or comment explaining why this is safe.

— Claude Code

if (err) {
return this._deleteOrphans(destEntry, destLocations, log, () => cb(err));
}
return cb(null, destLocations);
return cb(null, destLocations, allPartsAlreadyAtDest);
});
}

Expand Down Expand Up @@ -540,6 +548,7 @@ class ReplicateObject extends BackbeatTask {
// destination bucket has to be versioning enabled.
VersioningRequired: true,
RequestUids: log.getSerializedUids(),
VersionId: sourceEntry.getEncodedVersionId(),
});
addContentLengthMiddleware(
putCommand,
Expand All @@ -548,6 +557,73 @@ class ReplicateObject extends BackbeatTask {
const writeStartTime = Date.now();
return this.backbeatDest.send(putCommand, { abortSignal: abortController.signal })
.then(data => {
const { ExistingMicroVersionId } = data;
switch (ExistingMicroVersionId) {
case undefined:
case null:
break; // VersionId did not match on putData
case '': // Existing object with no microVersionId (cloudserver version pre crr cascade)
log.info('cascade putData: data at destination, ' +
'old object without microVersionId, proceeding with putmetadata',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
default: {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When cascadeLoopDetected or cascadeDataComplete is returned as an error from a part, this error flows through the retry wrapper in _getAndPutPart. The BackbeatTask.retry() method checks err.retryable === undefined and then inspects err.code, err.name, and err.message to detect network errors. Since these cascade signals are plain objects ({}), none of those properties exist, so no mutation occurs today. But if a future change accidentally adds such a property to these sentinels, retry() would mutate the shared singleton (setting retryable = true) and corrupt all subsequent cascade checks. Consider using Object.freeze() on the sentinel objects at line 33-35 to prevent accidental mutation.

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing whitespace on this line (after the opening brace {). Minor, but some CI lint checks catch this.

— Claude Code

// microVersionId provided, check its value to detect loop or stale events
// to determine if we need to update the metadata
const destinationMicroVersionId = decode(ExistingMicroVersionId);
if (destinationMicroVersionId instanceof Error) {
log.error('failed to decode ExistingMicroVersionId from putData', {
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
error: destinationMicroVersionId.message,
});
return doneOnce(destinationMicroVersionId);
}
const sourceMicroVersionId = sourceEntry.getMicroVersionId();
if (!sourceMicroVersionId) {
log.info('cascade putData: data at destination, ' +
'source has no microVersionId, proceeding with putmetadata',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
}
const event = checkCrrCascadeEvent(sourceMicroVersionId, destinationMicroVersionId);
if (event === 'loop') {
log.info('cascade loop detected on putData: ' +
'destination already has this exact revision, skipping',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cascadeLoopDetected is passed as the err argument via doneOnce(cascadeLoopDetected). This sentinel is an empty object {}, so upstream retry logic in BackbeatTask._retry will see it as a truthy error and may retry the operation before _handleReplicationOutcome gets to check for it. Confirm that _getAndPutPart (the retry wrapper around _getAndPutPartOnce) won't retry on this sentinel — if it does, the loop detection is bypassed and the part is retried unnecessarily.

— Claude Code

return doneOnce(cascadeLoopDetected);
}
if (event === 'stale') {
log.info('cascade stale on putData: ' +
'destination already has a newer revision',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(cascadeDataComplete);
}
// proceed: source is newer, skip data write and update metadata
log.info('cascade putData: data already at destination, ' +
'proceeding with metadata update',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
}
}

// ExistingMicroVersionId is absent :
// data was freshly written at the returned Location.
partObj.setDataLocation(data.Location[0]);

// Set encryption parameters that were used to encrypt the
Expand Down Expand Up @@ -641,10 +717,20 @@ class ReplicateObject extends BackbeatTask {
// destination bucket has to be versioning enabled.
VersioningRequired: true,
RequestUids: log.getSerializedUids(),
MicroVersionId: entry.getMicroVersionId(),
});
const writeStartTime = Date.now();
return this.backbeatDest.send(command)
.then(data => {
if (data.MicroVersionIdExists) {
log.info('cascade loop detected on putMetadata: ' +
'microVersionId already at destination',
{
method: 'ReplicateObject._putMetadataOnce',
entry: entry.getLogInfo(),
});
return cbOnce(cascadeLoopDetected);
}
this._publishMetadataWriteMetrics(mdBlob, writeStartTime);
return cbOnce(null, data);
})
Expand All @@ -654,6 +740,15 @@ class ReplicateObject extends BackbeatTask {
if (err.ObjNotFound || err.name === 'ObjNotFound') {
return cbOnce(err);
}
if (err.$metadata?.httpStatusCode === 409) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any 409 from the destination is assumed to be a cascade-stale scenario and the replication is silently marked COMPLETED. If cloudserver ever returns 409 for a different reason, the object would never be replicated. Consider checking for a more specific signal (e.g. a response body field or custom error code) rather than relying solely on the HTTP status code.

— Claude Code

log.info('cascade stale on putMetadata: ' +
'destination has a newer revision, marking COMPLETED',
{
method: 'ReplicateObject._putMetadataOnce',
entry: entry.getLogInfo(),
});
return cbOnce(cascadeDataComplete);
}
log.error('an error occurred when putting metadata to S3',
{
method: 'ReplicateObject._putMetadataOnce',
Expand Down Expand Up @@ -889,13 +984,14 @@ class ReplicateObject extends BackbeatTask {
return this._getAndPutData(sourceEntry, destEntry, log,
next);
}
return next(null, []);
return next(null, [], false);
},
// update location, replication status and put metadata in
// target bucket
(destLocations, next) => {
(destLocations, allPartsAlreadyAtDest, next) => {
const localMdOnly = mdOnly || allPartsAlreadyAtDest;
destEntry.setLocation(destLocations);
this._putMetadata(destEntry, mdOnly, log, err => {
this._putMetadata(destEntry, localMdOnly, log, err => {
if (err) {
return this._deleteOrphans(
destEntry, destLocations, log, () => next(err));
Expand All @@ -915,20 +1011,30 @@ class ReplicateObject extends BackbeatTask {
next => this._getAndPutData(sourceEntry, destEntry, log, next),
// update location, replication status and put metadata in
// target bucket
(location, next) => {
(location, allPartsAlreadyAtDest, next) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _processQueueEntryRetryFull, the second waterfall stage now passes allPartsAlreadyAtDest as the mdOnly argument to _putMetadata. This means a full retry where all parts happened to already exist at the destination will send ReplicationContent: 'METADATA' in the putMetadata call. Verify this is the intended behavior — it changes the semantics from "always full replication on retry" to "maybe metadata-only on retry if cascade detected all parts present." If a retry was triggered because metadata was stale but data was fine, this seems correct; if the retry was triggered for a data integrity reason, skipping the data write could mask the issue.

— Claude Code

destEntry.setLocation(location);
this._putMetadata(destEntry, false, log, next);
this._putMetadata(destEntry, allPartsAlreadyAtDest, log, next);
},
], err => this._handleReplicationOutcome(
err, sourceEntry, destEntry, kafkaEntry, log, done));
}

_handleReplicationOutcome(err, sourceEntry, destEntry, kafkaEntry,
log, done) {
if (!err) {
log.debug('replication succeeded for object, publishing ' +
'replication status as COMPLETED',
{ entry: sourceEntry.getLogInfo() });
if (!err || err === cascadeLoopDetected || err === cascadeDataComplete) {
if (err === cascadeLoopDetected) {
log.info('replication completed via cascade loop: ' +
'object already at destination with the same revision',
{ entry: sourceEntry.getLogInfo() });
} else if (err === cascadeDataComplete) {
log.info('replication completed: destination already holds ' +
'this version with an equal or newer revision',
{ entry: sourceEntry.getLogInfo() });
} else {
log.debug('replication succeeded for object, publishing ' +
'replication status as COMPLETED',
{ entry: sourceEntry.getLogInfo() });
}
this._publishReplicationStatus(
sourceEntry, 'COMPLETED', { kafkaEntry, log });
return done(null, { committable: false });
Expand Down Expand Up @@ -989,3 +1095,5 @@ class ReplicateObject extends BackbeatTask {
}

module.exports = ReplicateObject;
// Exported for tests only
module.exports._cascadeSignals = { cascadeLoopDetected, cascadeDataComplete, partAlreadyAtDest };
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
"@aws-sdk/client-s3": "^3.921.0",
"@aws-sdk/client-sts": "^3.921.0",
"@aws-sdk/credential-providers": "^3.921.0",
"@scality/cloudserverclient": "^1.0.8",
"@scality/cloudserverclient": "file:./scality-cloudserverclient-v1.0.9.tgz",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vendoring a .tgz into the repo adds a binary blob to git history that can never be garbage-collected. Prefer publishing @scality/cloudserverclient@1.0.9 to the registry (or using a git tag reference like the other deps) and pin to it here. If vendoring is intentional for CI reasons, it should be called out in the PR description.

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A binary .tgz is committed to the repo and referenced via file: in package.json. This permanently inflates the git history (binary blobs can't be diffed or garbage-collected), prevents code review of the dependency contents, and bypasses the normal package registry workflow. Consider publishing @scality/cloudserverclient@1.0.9 to the npm registry (or a private registry) and referencing it as a versioned dependency instead.

— Claude Code

"@smithy/node-http-handler": "^3.3.3",
"JSONStream": "^1.3.5",
"arsenal": "git+https://github.com/scality/arsenal#8.3.9",
"arsenal": "git+https://github.com/scality/Arsenal#2c429ab35a5ac82c3dafa5a0296a49a23a9c8a4a",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arsenal dependency is pinned to a raw commit hash (2c429ab) instead of a release tag. All other git-based deps in this file use tags (e.g. breakbeat#v1.0.3, bucketclient#8.2.4). Pin to a tagged release once the arsenal PR is merged to keep the dependency auditable and consistent with the project convention.

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arsenal is pinned to a raw commit hash (2c429ab...) instead of a semantic version tag. Per project conventions, git-based deps (arsenal, vaultclient, etc.) should pin to tags (e.g. #8.x.y). Commit hashes are opaque — it's unclear which features/fixes are included, and there's no semver contract. This also makes it harder for reviewers and operators to reason about what changed.

— Claude Code

"async": "^2.3.0",
"backo": "^1.1.0",
"breakbeat": "scality/breakbeat#v1.0.3",
Expand Down
Binary file added scality-cloudserverclient-v1.0.9.tgz
Binary file not shown.
Loading
Loading