Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const tracing = require('../lib/tracing');
tracing.init();

const async = require('async');
const schedule = require('node-schedule');

Expand Down Expand Up @@ -98,6 +101,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
4 changes: 3 additions & 1 deletion extensions/gc/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
const tracing = require('../../lib/tracing');
tracing.init();

const { errors } = require('arsenal');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -101,6 +103,6 @@ initAndStart();
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
garbageCollector.close(() => {
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
4 changes: 3 additions & 1 deletion extensions/lifecycle/bucketProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -133,6 +135,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
bucketProcessor.close(() => {
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
40 changes: 39 additions & 1 deletion extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const {
startCircuitBreakerMetricsExport,
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');
const { context: otelContext, trace, SpanKind, ROOT_CONTEXT } =
require('@opentelemetry/api');
const { traceHeadersFromCurrentContext } =
require('../../../lib/tracing/kafkaTraceContext');
const { isEnabled, endSpan } = require('../../../lib/tracing');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
Expand Down Expand Up @@ -340,7 +345,8 @@ class LifecycleConductor {
}

_taskToMessage(task, taskVersion, log) {
return {
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = {
message: JSON.stringify({
action: 'processObjects',
contextInfo: {
Expand All @@ -355,6 +361,10 @@ class LifecycleConductor {
details: {},
}),
};
if (headers) {
kafkaEntry.headers = headers;
}
return kafkaEntry;
}

_getAccountIds(unknownCanonicalIds, log, cb) {
Expand Down Expand Up @@ -402,6 +412,34 @@ class LifecycleConductor {
}

processBuckets(cb) {
if (!isEnabled()) {
this._processBucketsInternal((err, res) => {
if (cb) {cb(err, res);}
});
return;
}
// Root INTERNAL trace per cron firing (no upstream parent); the
// in-process scan work (Mongo bucket listing) nests under it.
const tracer = trace.getTracer('backbeat');
const span = tracer.startSpan('lifecycle.conductor.scan', {
kind: SpanKind.INTERNAL,
}, ROOT_CONTEXT);
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
const ctx = trace.setSpan(ROOT_CONTEXT, span);
otelContext.with(ctx, () => {
try {
this._processBucketsInternal((err, res) => {
endSpan(span, err);
if (cb) {cb(err, res);}
});
} catch (err) {
// sync throw: end span (don't leak), then rethrow
endSpan(span, err);
throw err;
}
});
Comment thread
delthas marked this conversation as resolved.
}
Comment thread
delthas marked this conversation as resolved.

_processBucketsInternal(cb) {
const log = this.logger.newRequestLogger();
const start = new Date();
let nBucketsQueued = 0;
Expand Down
4 changes: 3 additions & 1 deletion extensions/lifecycle/conductor/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -89,6 +91,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
lcConductor.stop(() => {
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
4 changes: 3 additions & 1 deletion extensions/lifecycle/objectProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -120,6 +122,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
objectProcessor.close(() => {
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
15 changes: 13 additions & 2 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const ReplicationAPI = require('../../replication/ReplicationAPI');
const { LifecycleMetrics, LIFECYCLE_MARKER_METRICS_LOCATION } = require('../LifecycleMetrics');
const locationsConfig = require('../../../conf/locationConfig.json') || {};
const { rulesSupportTransition } = require('../util/rules');
const { traceHeadersFromCurrentContext } = require('../../../lib/tracing/kafkaTraceContext');
const { decode } = versioning.VersionID;

const errorTransitionInProgress = errors.InternalError.
Expand Down Expand Up @@ -121,7 +122,12 @@ class LifecycleTask extends BackbeatTask {
* @return {undefined}
*/
_sendBucketEntry(entry, cb) {
const entries = [{ message: JSON.stringify(entry) }];
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: JSON.stringify(entry) };
if (headers) {
kafkaEntry.headers = headers;
}
const entries = [kafkaEntry];
this.producer.sendToTopic(this.bucketTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'BucketTopic', 'bucket', err, 1);
return cb(err);
Expand Down Expand Up @@ -183,7 +189,12 @@ class LifecycleTask extends BackbeatTask {
location,
Date.now() - entry.getAttribute('transitionTime'));

const entries = [{ message: entry.toKafkaMessage() }];
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: entry.toKafkaMessage() };
if (headers) {
kafkaEntry.headers = headers;
}
const entries = [kafkaEntry];
this.producer.sendToTopic(this.objectTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'ObjectTopic', 'bucket', err, 1);
return cb(err);
Expand Down
6 changes: 5 additions & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const messageUtil = require('./utils/message');
const notifConstants = require('./constants');
const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class NotificationQueuePopulator extends QueuePopulatorExtension {
/**
Expand Down Expand Up @@ -290,13 +291,16 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
eventTime: message.dateTime,
matchingConfig,
});
const traceHeaders = traceHeadersFromEntry(value);
this.publish(topic,
// keeping all messages for same object
// in the same partition to keep the order.
// here we use the object name and not the
// "_id" which also includes the versionId
`${bucket}/${message.key}`,
JSON.stringify(message));
JSON.stringify(message),
undefined,
traceHeaders);
// keep track of internal topics we have pushed to
pushedToTopic[topic] = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,17 @@ class KafkaNotificationDestination extends NotificationDestination {
*/
send(messages, done) {
const starTime = Date.now();
this._notificationProducer.send(messages, error => {
// Trust boundary: strip trace headers before producing to the customer Kafka.
// Delete on a shallow copy, never on `m`, so the caller's messages aren't mutated.
const safeMessages = messages.map(m => {
if (m && m.headers) {
const rest = { ...m };
delete rest.headers;
return rest;
}
return m;
});
this._notificationProducer.send(safeMessages, error => {
if (error) {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
Expand Down
5 changes: 4 additions & 1 deletion extensions/notification/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const assert = require('assert');
const { errors } = require('arsenal');
const async = require('async');
Expand Down Expand Up @@ -108,7 +111,7 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});

5 changes: 5 additions & 0 deletions extensions/replication/ReplicationAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const locations = require('../../conf/locationConfig.json') || {};

const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');
const { traceHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');

let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
Expand Down Expand Up @@ -78,6 +79,10 @@ class ReplicationAPI {
key: `${bucket}/${key}`,
message: action.toKafkaMessage(),
};
const traceHeaders = traceHeadersFromCurrentContext();
if (traceHeaders) {
kafkaEntry.headers = traceHeaders;
}
let topic = dataMoverTopic;
const toLocation = action.getAttribute('toLocation');
const locationConfig = locations[toLocation];
Expand Down
7 changes: 6 additions & 1 deletion extensions/replication/ReplicationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const QueuePopulatorExtension =
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const locationsConfig = require('../../conf/locationConfig.json') || {};
const safeJsonParse = require('../../lib/util/safeJsonParse');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class ReplicationQueuePopulator extends QueuePopulatorExtension {
constructor(params) {
Expand Down Expand Up @@ -112,11 +113,15 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;

const traceHeaders = traceHeadersFromEntry(value);

this.log.trace('publishing object replication entry',
{ entry: queueEntry.getLogInfo() });
this.publish(this.repConfig.topic,
`${queueEntry.getBucket()}/${queueEntry.getObjectKey()}`,
JSON.stringify(publishedEntry));
JSON.stringify(publishedEntry),
undefined,
traceHeaders);
}

/**
Expand Down
5 changes: 4 additions & 1 deletion extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const async = require('async');
const assert = require('assert');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -340,6 +343,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
tracing.close().finally(() => process.exit(0));
});
});
3 changes: 3 additions & 0 deletions extensions/replication/replicationStatusProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
const tracing = require('../../../lib/tracing');
tracing.init();

const werelogs = require('werelogs');

Expand Down Expand Up @@ -70,5 +72,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
tracing.close().finally(() => process.exit(0));
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
});
});
Comment thread
delthas marked this conversation as resolved.
29 changes: 28 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const {
}
} = require('./constants');

Comment thread
delthas marked this conversation as resolved.
const { context: otelContext } = require('@opentelemetry/api');
Comment thread
delthas marked this conversation as resolved.
const { isEnabled, endSpan } = require('./tracing');
const { startLinkedSpanFromKafkaEntry } = require('./tracing/kafkaTraceContext');

const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

Expand Down Expand Up @@ -530,7 +534,30 @@ class BackbeatConsumer extends EventEmitter {
const { topic, partition } = entry;
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);

this._queueProcessor(entry, (err, completionArgs) => done(err, completionArgs, finishProcessingTask));
if (!isEnabled()) {
this._queueProcessor(entry, (err, completionArgs) =>
done(err, completionArgs, finishProcessingTask));
return;
}

Comment thread
delthas marked this conversation as resolved.
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, `${topic}.process`);
span.setAttribute('messaging.system', 'kafka');
span.setAttribute('messaging.destination.name', topic);
span.setAttribute('messaging.destination.partition.id', `${partition}`);
span.setAttribute('messaging.consumer.group.name', this._groupId);

Comment thread
delthas marked this conversation as resolved.
otelContext.with(ctx, () => {
try {
this._queueProcessor(entry, (err, completionArgs) => {
endSpan(span, err);
done(err, completionArgs, finishProcessingTask);
});
} catch (err) {
// sync throw before the callback fired: end span (don't leak), then rethrow
endSpan(span, err);
throw err;
}
});
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
}

/**
Expand Down
3 changes: 2 additions & 1 deletion lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class BackbeatProducer extends EventEmitter {
Buffer.from(item.message), // value
item.key, // key (for keyed partitioning)
Date.now(), // timestamp
sendCtx // opaque
sendCtx, // opaque
item.headers || undefined // Kafka message headers
);
});
} catch (err) {
Expand Down
5 changes: 4 additions & 1 deletion lib/queuePopulator/QueuePopulatorExtension.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class QueuePopulatorExtension {
* @param {Object} [optEntriesToPublish] - optional batch
* @return {undefined}
*/
publish(topic, key, message, optEntriesToPublish) {
publish(topic, key, message, optEntriesToPublish, headers) {
let __batch;
if (optEntriesToPublish) {
__batch = optEntriesToPublish;
Expand All @@ -81,6 +81,9 @@ class QueuePopulatorExtension {
'synchronously from the filter() method.');

const kafkaEntry = { key: encodeURIComponent(key), message };
if (headers) {
kafkaEntry.headers = headers;
}
this.log.trace('queueing kafka entry to topic',
{ key: kafkaEntry.key, topic });
if (__batch[topic] === undefined) {
Expand Down
Loading
Loading