-
-
Notifications
You must be signed in to change notification settings - Fork 35.8k
diagnostics_channel: add opt-in subscriber suppression via suppressedBy and suppressed() #63651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
6a9412f
7b20b8a
a786501
e4aea85
a5c4940
7e415c0
8e6d7cb
07e7e97
9468795
b8194af
66e0cc9
01d9cc0
f9f428a
ddf8179
909f0fc
e630f72
cd5c009
3fdcd8e
716a4eb
baa6ee1
5bc0fb8
1e854d2
f3fc5cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,7 +2,6 @@ | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const { | ||||||||||||||||||||||||||
| ArrayPrototypeAt, | ||||||||||||||||||||||||||
| ArrayPrototypeIndexOf, | ||||||||||||||||||||||||||
| ArrayPrototypePush, | ||||||||||||||||||||||||||
| ArrayPrototypePushApply, | ||||||||||||||||||||||||||
| ArrayPrototypeSlice, | ||||||||||||||||||||||||||
|
|
@@ -15,6 +14,7 @@ const { | |||||||||||||||||||||||||
| ReflectApply, | ||||||||||||||||||||||||||
| SafeFinalizationRegistry, | ||||||||||||||||||||||||||
| SafeMap, | ||||||||||||||||||||||||||
| SafeSet, | ||||||||||||||||||||||||||
| SymbolDispose, | ||||||||||||||||||||||||||
| SymbolHasInstance, | ||||||||||||||||||||||||||
| } = primordials; | ||||||||||||||||||||||||||
|
|
@@ -36,6 +36,32 @@ const { subscribers: subscriberCounts } = dc_binding; | |||||||||||||||||||||||||
| const { WeakReference } = require('internal/util'); | ||||||||||||||||||||||||||
| const { isPromise } = require('internal/util/types'); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Internal only: tracks a Set of active suppression keys for the current async | ||||||||||||||||||||||||||
| // context. Uses a simple stack-based approach to avoid bootstrap issues with | ||||||||||||||||||||||||||
| // async_hooks. This is a simplified implementation that works for typical usage. | ||||||||||||||||||||||||||
| let suppressionStorage = null; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| function getSuppressionsStorage() { | ||||||||||||||||||||||||||
| if (suppressionStorage === null) { | ||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||
| const { AsyncLocalStorage } = require('async_hooks'); | ||||||||||||||||||||||||||
| suppressionStorage = new AsyncLocalStorage(); | ||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||
| // If AsyncLocalStorage fails to initialize (rare), use a fallback | ||||||||||||||||||||||||||
| // that won't provide async context isolation but at least works | ||||||||||||||||||||||||||
| suppressionStorage = false; // Marker for "tried and failed" | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return suppressionStorage || undefined; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
DivyanshuX9 marked this conversation as resolved.
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| function withSuppressionsContext(set, fn, thisArg, args) { | ||||||||||||||||||||||||||
| const storage = getSuppressionsStorage(); | ||||||||||||||||||||||||||
| if (storage) { | ||||||||||||||||||||||||||
| return storage.run(set, () => ReflectApply(fn, thisArg, args)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return ReflectApply(fn, thisArg, args); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Can't delete when weakref count reaches 0 as it could increment again. | ||||||||||||||||||||||||||
| // Only GC can be used as a valid time to clean up the channels map. | ||||||||||||||||||||||||||
| class WeakRefMap extends SafeMap { | ||||||||||||||||||||||||||
|
|
@@ -93,9 +119,17 @@ class RunStoresScope { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Enter stores using withScope | ||||||||||||||||||||||||||
| if (activeChannel._stores) { | ||||||||||||||||||||||||||
| const storage = getSuppressionsStorage(); | ||||||||||||||||||||||||||
| const activeKeys = storage ? storage.getStore() : undefined; | ||||||||||||||||||||||||||
| for (const entry of activeChannel._stores.entries()) { | ||||||||||||||||||||||||||
| const store = entry[0]; | ||||||||||||||||||||||||||
| const transform = entry[1]; | ||||||||||||||||||||||||||
| const { transform, suppressedBy = null } = entry[1]; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Skip this bound store if it opted into suppression and its key | ||||||||||||||||||||||||||
| // is active in the current async context. | ||||||||||||||||||||||||||
| if (suppressedBy !== null && activeKeys?.has(suppressedBy)) { | ||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| let newContext = data; | ||||||||||||||||||||||||||
| if (transform) { | ||||||||||||||||||||||||||
|
|
@@ -127,16 +161,32 @@ class RunStoresScope { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // TODO(qard): should there be a C++ channel interface? | ||||||||||||||||||||||||||
| class ActiveChannel { | ||||||||||||||||||||||||||
| subscribe(subscription) { | ||||||||||||||||||||||||||
| subscribe(subscription, options = {}) { | ||||||||||||||||||||||||||
| validateFunction(subscription, 'subscription'); | ||||||||||||||||||||||||||
| const suppressedBy = options && options.suppressedBy !== undefined ? options.suppressedBy : null; | ||||||||||||||||||||||||||
| if (suppressedBy !== null) { | ||||||||||||||||||||||||||
| const t = typeof suppressedBy; | ||||||||||||||||||||||||||
| if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is duplicating the validation in many spots, we can likely unify it. In the other comments I also suggested to drop function as type and to do a faster validation. I wonder if we even want to allow symbols at all, if we want to allow objects. I personally would prefer a single data type over many. Other opinions?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @BridgeAR , my preference would be Symbol only. Reasons:
const kMyTracer = Symbol('my-tracer') reads cleanly and is Happy to go either way though what's your preference? and should we ask other member too?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@isaacs would you go with objects only? And may I ask for not wanting symbols? I did not check, I guess objects could be faster, but I am unsure and I am curious about your reasoning.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so, think i should wait for further clarification before making any changes.... |
||||||||||||||||||||||||||
| throw new ERR_INVALID_ARG_TYPE('suppressedBy', ['object', 'symbol', 'function'], suppressedBy); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const handler = subscription; | ||||||||||||||||||||||||||
| this._subscribers = ArrayPrototypeSlice(this._subscribers); | ||||||||||||||||||||||||||
| ArrayPrototypePush(this._subscribers, subscription); | ||||||||||||||||||||||||||
| ArrayPrototypePush(this._subscribers, { handler, suppressedBy }); | ||||||||||||||||||||||||||
| channels.incRef(this.name); | ||||||||||||||||||||||||||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| unsubscribe(subscription) { | ||||||||||||||||||||||||||
| const index = ArrayPrototypeIndexOf(this._subscribers, subscription); | ||||||||||||||||||||||||||
| // Find subscriber entry by handler identity. | ||||||||||||||||||||||||||
| let index = -1; | ||||||||||||||||||||||||||
| for (let i = 0; i < (this._subscribers?.length || 0); i++) { | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| if (this._subscribers[i].handler === subscription) { | ||||||||||||||||||||||||||
| index = i; | ||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if (index === -1) return false; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const before = ArrayPrototypeSlice(this._subscribers, 0, index); | ||||||||||||||||||||||||||
|
|
@@ -151,13 +201,21 @@ class ActiveChannel { | |||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| bindStore(store, transform) { | ||||||||||||||||||||||||||
| bindStore(store, transform, options = {}) { | ||||||||||||||||||||||||||
| const suppressedBy = options && options.suppressedBy !== undefined ? options.suppressedBy : null; | ||||||||||||||||||||||||||
| if (suppressedBy !== null) { | ||||||||||||||||||||||||||
| const t = typeof suppressedBy; | ||||||||||||||||||||||||||
| if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { | ||||||||||||||||||||||||||
| throw new ERR_INVALID_ARG_TYPE('suppressedBy', ['object', 'symbol', 'function'], suppressedBy); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const replacing = this._stores.has(store); | ||||||||||||||||||||||||||
| if (!replacing) { | ||||||||||||||||||||||||||
| channels.incRef(this.name); | ||||||||||||||||||||||||||
| if (this._index !== undefined) subscriberCounts[this._index]++; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| this._stores.set(store, transform); | ||||||||||||||||||||||||||
| this._stores.set(store, { transform, suppressedBy }); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| unbindStore(store) { | ||||||||||||||||||||||||||
|
|
@@ -180,10 +238,15 @@ class ActiveChannel { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| publish(data) { | ||||||||||||||||||||||||||
| const subscribers = this._subscribers; | ||||||||||||||||||||||||||
| const storage = getSuppressionsStorage(); | ||||||||||||||||||||||||||
| const activeKeys = storage ? storage.getStore() : undefined; | ||||||||||||||||||||||||||
| for (let i = 0; i < (subscribers?.length || 0); i++) { | ||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||
| const onMessage = subscribers[i]; | ||||||||||||||||||||||||||
| onMessage(data, this.name); | ||||||||||||||||||||||||||
| const { handler, suppressedBy = null } = subscribers[i]; | ||||||||||||||||||||||||||
| if (suppressedBy !== null && activeKeys?.has(suppressedBy)) { | ||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| handler(data, this.name); | ||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||
| process.nextTick(() => { | ||||||||||||||||||||||||||
| triggerUncaughtException(err, false); | ||||||||||||||||||||||||||
|
|
@@ -221,18 +284,18 @@ class Channel { | |||||||||||||||||||||||||
| prototype === ActiveChannel.prototype; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| subscribe(subscription) { | ||||||||||||||||||||||||||
| subscribe(subscription, options) { | ||||||||||||||||||||||||||
| markActive(this); | ||||||||||||||||||||||||||
| this.subscribe(subscription); | ||||||||||||||||||||||||||
| this.subscribe(subscription, options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| unsubscribe() { | ||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| bindStore(store, transform) { | ||||||||||||||||||||||||||
| bindStore(store, transform, options) { | ||||||||||||||||||||||||||
| markActive(this); | ||||||||||||||||||||||||||
| this.bindStore(store, transform); | ||||||||||||||||||||||||||
| this.bindStore(store, transform, options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| unbindStore() { | ||||||||||||||||||||||||||
|
|
@@ -366,12 +429,12 @@ class BoundedChannel { | |||||||||||||||||||||||||
| this.end?.hasSubscribers; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| subscribe(handlers) { | ||||||||||||||||||||||||||
| subscribe(handlers, options) { | ||||||||||||||||||||||||||
| for (let i = 0; i < boundedEvents.length; ++i) { | ||||||||||||||||||||||||||
| const name = boundedEvents[i]; | ||||||||||||||||||||||||||
| if (!handlers[name]) continue; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| this[name]?.subscribe(handlers[name]); | ||||||||||||||||||||||||||
| this[name]?.subscribe(handlers[name], options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -458,26 +521,26 @@ class TracingChannel { | |||||||||||||||||||||||||
| this.error?.hasSubscribers; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| subscribe(handlers) { | ||||||||||||||||||||||||||
| subscribe(handlers, options) { | ||||||||||||||||||||||||||
| // Subscribe to call window (start/end) | ||||||||||||||||||||||||||
| if (handlers.start || handlers.end) { | ||||||||||||||||||||||||||
| this.#callWindow.subscribe({ | ||||||||||||||||||||||||||
| start: handlers.start, | ||||||||||||||||||||||||||
| end: handlers.end, | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
| }, options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Subscribe to continuation window (asyncStart/asyncEnd) | ||||||||||||||||||||||||||
| if (handlers.asyncStart || handlers.asyncEnd) { | ||||||||||||||||||||||||||
| this.#continuationWindow.subscribe({ | ||||||||||||||||||||||||||
| start: handlers.asyncStart, | ||||||||||||||||||||||||||
| end: handlers.asyncEnd, | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
| }, options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Subscribe to error channel | ||||||||||||||||||||||||||
| if (handlers.error) { | ||||||||||||||||||||||||||
| this.error.subscribe(handlers.error); | ||||||||||||||||||||||||||
| this.error.subscribe(handlers.error, options); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
|
|
@@ -633,10 +696,29 @@ function tracingChannel(nameOrChannels) { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| dc_binding.linkNativeChannel((name) => channel(name)); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| function suppressed(key, fn, thisArg, ...args) { | ||||||||||||||||||||||||||
| validateFunction(fn, 'fn'); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (key === null || key === undefined) { | ||||||||||||||||||||||||||
| throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| const t = typeof key; | ||||||||||||||||||||||||||
| if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') { | ||||||||||||||||||||||||||
| throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key); | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const storage = getSuppressionsStorage(); | ||||||||||||||||||||||||||
| const currentSet = storage ? storage.getStore() : undefined; | ||||||||||||||||||||||||||
| const next = currentSet ? new SafeSet(currentSet) : new SafeSet(); | ||||||||||||||||||||||||||
| next.add(key); | ||||||||||||||||||||||||||
| return withSuppressionsContext(next, fn, thisArg, args); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| module.exports = { | ||||||||||||||||||||||||||
| channel, | ||||||||||||||||||||||||||
| hasSubscribers, | ||||||||||||||||||||||||||
| subscribe, | ||||||||||||||||||||||||||
| suppressed, | ||||||||||||||||||||||||||
| tracingChannel, | ||||||||||||||||||||||||||
| unsubscribe, | ||||||||||||||||||||||||||
| boundedChannel, | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.