Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 122 additions & 14 deletions extensions/replication/tasks/ReplicateObject.js
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can already check this pr, but should really be reviewed after all the other cascade prs, as changes in these pr would also mean changes here

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const async = require('async');
const { S3Client, GetBucketReplicationCommand, GetObjectCommand } = require('@aws-sdk/client-s3');

const errors = require('arsenal').errors;
const jsutil = require('arsenal').jsutil;
const ObjectMDLocation = require('arsenal').models.ObjectMDLocation;
const { errors, jsutil, models, versioning } = require('arsenal');
const ObjectMDLocation = models.ObjectMDLocation;
const { decode, checkCrrCascadeEvent } = versioning.VersionID;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arsenal is pinned to 8.3.9, but checkCrrCascadeEvent and decode are imported from versioning.VersionID. Neither checkCrrCascadeEvent nor the getMicroVersionId() method (called on source/dest entries throughout this PR) appear to exist in arsenal 8.3.9 or in backbeat's own models. The arsenal dependency likely needs a version bump for this PR to work.

— Claude Code


const ClientManager = require('../../../lib/clients/ClientManager');
const BackbeatMetadataProxy = require('../../../lib/BackbeatMetadataProxy');
Expand All @@ -30,6 +30,9 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
const { authTypeAssumeRole } = require('../../../lib/constants');

const errorAlreadyCompleted = {};
const cascadeLoopDetected = {};
const cascadeDataComplete = {};
const partAlreadyAtDest = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
Expand Down Expand Up @@ -422,11 +425,16 @@ class ReplicateObject extends BackbeatTask {
const mpuConcLimit = this.repConfig.queueProcessor.mpuPartsConcurrency;
return mapLimitWaitPendingIfError(locations, mpuConcLimit, (part, done) => {
this._getAndPutPart(sourceEntry, destEntry, part, log, done);
}, (err, destLocations) => {
}, (err, partResults) => {
const allPartsAlreadyAtDest = !err &&
partResults.length > 0 &&
partResults.every(result => result === partAlreadyAtDest);
const destLocations = allPartsAlreadyAtDest ? [] :
(partResults || []).filter(result => result && result !== partAlreadyAtDest);
if (err) {
return this._deleteOrphans(destEntry, destLocations, log, () => cb(err));
}
return cb(null, destLocations);
return cb(null, destLocations, allPartsAlreadyAtDest);
});
}

Expand Down Expand Up @@ -540,6 +548,7 @@ class ReplicateObject extends BackbeatTask {
// destination bucket has to be versioning enabled.
VersioningRequired: true,
RequestUids: log.getSerializedUids(),
VersionId: sourceEntry.getEncodedVersionId(),
});
addContentLengthMiddleware(
putCommand,
Expand All @@ -548,6 +557,73 @@ class ReplicateObject extends BackbeatTask {
const writeStartTime = Date.now();
return this.backbeatDest.send(putCommand, { abortSignal: abortController.signal })
.then(data => {
const { ExistingMicroVersionId } = data;
switch (ExistingMicroVersionId) {
case undefined:
case null:
break; // VersionId did not match on putData
case '': // Existing object with no microVersionId (cloudserver version pre crr cascade)
log.info('cascade putData: data at destination, ' +
'old object without microVersionId, proceeding with putmetadata',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
default: {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When cascadeLoopDetected or cascadeDataComplete is returned as an error from a part, this error flows through the retry wrapper in _getAndPutPart. The BackbeatTask.retry() method checks err.retryable === undefined and then inspects err.code, err.name, and err.message to detect network errors. Since these cascade signals are plain objects ({}), none of those properties exist, so no mutation occurs today. But if a future change accidentally adds such a property to these sentinels, retry() would mutate the shared singleton (setting retryable = true) and corrupt all subsequent cascade checks. Consider using Object.freeze() on the sentinel objects at line 33-35 to prevent accidental mutation.

— Claude Code

// microVersionId provided, check its value to detect loop or stale events
// to determine if we need to update the metadata
const destinationMicroVersionId = decode(ExistingMicroVersionId);
if (destinationMicroVersionId instanceof Error) {
log.error('failed to decode ExistingMicroVersionId from putData', {
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
error: destinationMicroVersionId.message,
});
return doneOnce(destinationMicroVersionId);
}
const sourceMicroVersionId = sourceEntry.getMicroVersionId();
if (!sourceMicroVersionId) {
log.info('cascade putData: data at destination, ' +
'source has no microVersionId, proceeding with putmetadata',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
}
const event = checkCrrCascadeEvent(sourceMicroVersionId, destinationMicroVersionId);
if (event === 'loop') {
log.info('cascade loop detected on putData: ' +
'destination already has this exact revision, skipping',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(cascadeLoopDetected);
}
if (event === 'stale') {
log.info('cascade stale on putData: ' +
'destination already has a newer revision',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(cascadeDataComplete);
}
// proceed: source is newer, skip data write and update metadata
log.info('cascade putData: data already at destination, ' +
'proceeding with metadata update',
{
method: 'ReplicateObject._getAndPutPartOnce',
entry: destEntry.getLogInfo(),
});
return doneOnce(null, partAlreadyAtDest);
}
}

// ExistingMicroVersionId is absent :
// data was freshly written at the returned Location.
partObj.setDataLocation(data.Location[0]);

// Set encryption parameters that were used to encrypt the
Expand Down Expand Up @@ -641,10 +717,20 @@ class ReplicateObject extends BackbeatTask {
// destination bucket has to be versioning enabled.
VersioningRequired: true,
RequestUids: log.getSerializedUids(),
MicroVersionId: entry.getMicroVersionId(),
});
const writeStartTime = Date.now();
return this.backbeatDest.send(command)
.then(data => {
if (data.MicroVersionIdExists) {
log.info('cascade loop detected on putMetadata: ' +
'microVersionId already at destination',
{
method: 'ReplicateObject._putMetadataOnce',
entry: entry.getLogInfo(),
});
return cbOnce(cascadeLoopDetected);
}
this._publishMetadataWriteMetrics(mdBlob, writeStartTime);
return cbOnce(null, data);
})
Expand All @@ -654,6 +740,15 @@ class ReplicateObject extends BackbeatTask {
if (err.ObjNotFound || err.name === 'ObjNotFound') {
return cbOnce(err);
}
if (err.$metadata?.httpStatusCode === 409) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any 409 from the destination is assumed to be a cascade-stale scenario and the replication is silently marked COMPLETED. If cloudserver ever returns 409 for a different reason, the object would never be replicated. Consider checking for a more specific signal (e.g. a response body field or custom error code) rather than relying solely on the HTTP status code.

— Claude Code

log.info('cascade stale on putMetadata: ' +
'destination has a newer revision, marking COMPLETED',
{
method: 'ReplicateObject._putMetadataOnce',
entry: entry.getLogInfo(),
});
return cbOnce(cascadeDataComplete);
}
log.error('an error occurred when putting metadata to S3',
{
method: 'ReplicateObject._putMetadataOnce',
Expand Down Expand Up @@ -889,13 +984,14 @@ class ReplicateObject extends BackbeatTask {
return this._getAndPutData(sourceEntry, destEntry, log,
next);
}
return next(null, []);
return next(null, [], false);
},
// update location, replication status and put metadata in
// target bucket
(destLocations, next) => {
(destLocations, allPartsAlreadyAtDest, next) => {
const localMdOnly = mdOnly || allPartsAlreadyAtDest;
destEntry.setLocation(destLocations);
this._putMetadata(destEntry, mdOnly, log, err => {
this._putMetadata(destEntry, localMdOnly, log, err => {
if (err) {
return this._deleteOrphans(
destEntry, destLocations, log, () => next(err));
Expand All @@ -915,20 +1011,30 @@ class ReplicateObject extends BackbeatTask {
next => this._getAndPutData(sourceEntry, destEntry, log, next),
// update location, replication status and put metadata in
// target bucket
(location, next) => {
(location, allPartsAlreadyAtDest, next) => {
destEntry.setLocation(location);
this._putMetadata(destEntry, false, log, next);
this._putMetadata(destEntry, allPartsAlreadyAtDest, log, next);
},
], err => this._handleReplicationOutcome(
err, sourceEntry, destEntry, kafkaEntry, log, done));
}

_handleReplicationOutcome(err, sourceEntry, destEntry, kafkaEntry,
log, done) {
if (!err) {
log.debug('replication succeeded for object, publishing ' +
'replication status as COMPLETED',
{ entry: sourceEntry.getLogInfo() });
if (!err || err === cascadeLoopDetected || err === cascadeDataComplete) {
if (err === cascadeLoopDetected) {
log.info('replication completed via cascade loop: ' +
'object already at destination with the same revision',
{ entry: sourceEntry.getLogInfo() });
} else if (err === cascadeDataComplete) {
log.info('replication completed: destination already holds ' +
'this version with an equal or newer revision',
{ entry: sourceEntry.getLogInfo() });
} else {
log.debug('replication succeeded for object, publishing ' +
'replication status as COMPLETED',
{ entry: sourceEntry.getLogInfo() });
}
this._publishReplicationStatus(
sourceEntry, 'COMPLETED', { kafkaEntry, log });
return done(null, { committable: false });
Expand Down Expand Up @@ -989,3 +1095,5 @@ class ReplicateObject extends BackbeatTask {
}

module.exports = ReplicateObject;
// Exported for tests only
module.exports._cascadeSignals = { cascadeLoopDetected, cascadeDataComplete, partAlreadyAtDest };
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"@aws-sdk/client-s3": "^3.921.0",
"@aws-sdk/client-sts": "^3.921.0",
"@aws-sdk/credential-providers": "^3.921.0",
"@scality/cloudserverclient": "^1.0.8",
"@scality/cloudserverclient": "file:../cloudserverclient/scality-cloudserverclient-v1.0.9.tgz",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scality/cloudserverclient points to a local file path (file:../cloudserverclient/scality-cloudserverclient-v1.0.9.tgz). This will break CI and other developers' builds. Must be changed to a proper registry version or git-pinned tag before merge.

Suggested change
"@scality/cloudserverclient": "file:../cloudserverclient/scality-cloudserverclient-v1.0.9.tgz",
"@scality/cloudserverclient": "^1.0.9",

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scality/cloudserverclient is pinned to a local file path (file:../cloudserverclient/scality-cloudserverclient-v1.0.9.tgz). This will break CI and other developers' builds. Before merging, this needs to be changed to a proper npm version (e.g., ^1.0.9) or a git tag reference consistent with the other git-based deps in this project.

Suggested change
"@scality/cloudserverclient": "file:../cloudserverclient/scality-cloudserverclient-v1.0.9.tgz",
"@scality/cloudserverclient": "^1.0.9",

— Claude Code

"@smithy/node-http-handler": "^3.3.3",
"JSONStream": "^1.3.5",
"arsenal": "git+https://github.com/scality/arsenal#8.3.9",
Expand Down
Loading
Loading