Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions lib/app_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,55 @@ try {
consoleLogger.error(err);
throw err;
}

// IPC logging — register after `new Application` so that app constructors which throw synchronously
// (e.g. fixtures that trigger framework errors) do not pay the extra require/listener cost nor
// perturb the timing of events that were racing with master-side teardown before this change.
const { ipcLogger, formatIpcMessage, internalIpcLogEnabled } = require('./utils/ipc_logger');

// D. master -> app (recv): log every IPC message delivered to this worker via the cluster channel.
// Handle is present when master forwards a net.Socket (sticky-session mode).
// This listener is read-only; other `process.on('message')` listeners (framework, sticky handler,
// etc.) are unaffected.
process.on('message', (msg, handle) => {
const body = typeof msg === 'string' ? { action: msg } : msg;
ipcLogger.info(formatIpcMessage(`app#${process.pid}<-master`, body, handle));
});

// F. master -> app internal NODE_CLUSTER messages (newconn with fd, disconnect, suicide, ...).
// `internalMessage` is an undocumented but stable Node.js event.
// Opt-in via EGG_CLUSTER_IPC_LOG because `newconn` fires once per HTTP request.
if (internalIpcLogEnabled) {
process.on('internalMessage', (msg, handle) => {
if (!msg || msg.cmd !== 'NODE_CLUSTER') return;
const label = msg.act ? `cluster:${msg.act}` : `cluster:ack#${msg.ack != null ? msg.ack : '?'}`;
ipcLogger.info(formatIpcMessage(
`app#${process.pid}<-master`,
{ action: label, data: msg },
handle
));
});
}

const clusterConfig = app.config.cluster || /* istanbul ignore next */ {};
const listenConfig = clusterConfig.listen || /* istanbul ignore next */ {};
const httpsOptions = Object.assign({}, clusterConfig.https, options.https);
const port = options.port = options.port || listenConfig.port;
const protocol = (httpsOptions.key && httpsOptions.cert) ? 'https' : 'http';

process.send({
// C. app -> master (send): there is exactly one direct process.send() in the app worker — `realport`.
// All other master-side lifecycle info (app-start / app-exit) is derived from cluster events, not from
// explicit app-to-master sends.
const realportMessage = {
to: 'master',
action: 'realport',
data: {
port,
protocol,
},
});
};
ipcLogger.info(formatIpcMessage(`app#${process.pid}->master`, realportMessage));
process.send(realportMessage);

app.ready(startServer);

Expand Down
32 changes: 31 additions & 1 deletion lib/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const Manager = require('./utils/manager');
const parseOptions = require('./utils/options');
const Messenger = require('./utils/messenger');
const terminate = require('./utils/terminate');
const { ipcLogger, formatIpcMessage, internalIpcLogEnabled } = require('./utils/ipc_logger');

const PROTOCOL = Symbol('Master#protocol');
const REAL_PORT = Symbol('Master#real_port');
Expand Down Expand Up @@ -217,6 +218,13 @@ class Master extends EventEmitter {
connection.destroy();
} else {
const worker = this.stickyWorker(connection.remoteAddress);
// A'. master -> app sticky-session direct send (bypasses messenger.sendToAppWorker),
// carries a net.Socket handle — safe-printed by formatIpcMessage's replacer.
ipcLogger.info(formatIpcMessage(
`master->app#${worker.process.pid}`,
{ action: 'sticky-session:connection' },
connection
));
worker.send('sticky-session:connection', connection);
}
}).listen(this[REAL_PORT], cb);
Expand Down Expand Up @@ -319,7 +327,11 @@ class Master extends EventEmitter {
cluster.on('fork', worker => {
worker.disableRefork = true;
this.workerManager.setWorker(worker);
worker.on('message', msg => {
// B. master <- app (recv). Handle is present when master receives a net.Socket
// (e.g. forwarded sticky-session connection).
// Log AFTER forwarding to avoid adding log-serialization latency to the forward path
// (some tests rely on tight timing of app->master->agent round-trips).
worker.on('message', (msg, handle) => {
if (typeof msg === 'string') {
msg = {
action: msg,
Expand All @@ -328,7 +340,25 @@ class Master extends EventEmitter {
}
msg.from = 'app';
this.messenger.send(msg);
ipcLogger.info(formatIpcMessage(`master<-app#${worker.process.pid}`, msg, handle));
});

// E. cluster internal NODE_CLUSTER messages from worker to master:
// listening / online / queryServer / accepted (fd ack) / close / ...
// Must hook on `worker.process` (ChildProcess) — `cluster.Worker` doesn't forward `internalMessage`.
// `internalMessage` is undocumented but stable across Node.js versions.
// Opt-in via EGG_CLUSTER_IPC_LOG because this is verbose under load.
if (internalIpcLogEnabled) {
worker.process.on('internalMessage', (msg, handle) => {
if (!msg || msg.cmd !== 'NODE_CLUSTER') return;
const label = msg.act ? `cluster:${msg.act}` : `cluster:ack#${msg.ack != null ? msg.ack : '?'}`;
ipcLogger.info(formatIpcMessage(
`master<-app#${worker.process.pid}`,
{ action: label, data: msg },
handle
));
});
Comment on lines +351 to +360
}
this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',
worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers));

Expand Down
88 changes: 88 additions & 0 deletions lib/utils/ipc_logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict';

const net = require('net');
const { getLogger } = require('onelogger');

/**
* onelogger instance for Node.js cluster module IPC traffic between master and app workers.
* Users can override the underlying sink via `onelogger.setLogger()` / `setCustomLogger()`.
*/
const ipcLogger = getLogger('egg-cluster:ipc');

/**
* Whether internal-level cluster IPC logs (NODE_CLUSTER newconn/accepted/listening/...) are enabled.
* These are very verbose (one `cluster:newconn` per HTTP request), so they are opt-in
* via the `EGG_CLUSTER_IPC_LOG` environment variable (any truthy value enables).
*/
const internalIpcLogEnabled = !!process.env.EGG_CLUSTER_IPC_LOG;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The environment variable check !!process.env.EGG_CLUSTER_IPC_LOG will evaluate to true even if the variable is set to the string "0" or "false". In the context of environment variables, users often expect these values to disable a feature. It is safer to explicitly check for truthy values or exclude known falsy strings.

Suggested change
const internalIpcLogEnabled = !!process.env.EGG_CLUSTER_IPC_LOG;
const internalIpcLogEnabled = !!(process.env.EGG_CLUSTER_IPC_LOG && process.env.EGG_CLUSTER_IPC_LOG !== '0' && process.env.EGG_CLUSTER_IPC_LOG !== 'false');


const MAX_STRING_LEN = 200;
const MAX_TOTAL_LEN = 1024;

function describeHandle(handle) {
if (handle == null) return '';
if (handle instanceof net.Socket) {
const fd = handle._handle && handle._handle.fd;
return fd != null ? `<Socket fd=${fd}>` : '<Socket>';
}
if (handle instanceof net.Server) {
return '<Server>';
}
const ctor = handle.constructor && handle.constructor.name;
return ctor ? `<${ctor}>` : '<handle>';
}

function makeReplacer() {
const seen = new WeakSet();
return function replacer(_key, value) {
if (value && typeof value === 'object') {
if (seen.has(value)) return '<Circular>';
seen.add(value);
if (value instanceof net.Socket) return describeHandle(value);
if (value instanceof net.Server) return '<Server>';
if (Buffer.isBuffer(value)) return `<Buffer len=${value.length}>`;
}
if (typeof value === 'string' && value.length > MAX_STRING_LEN) {
return `${value.slice(0, MAX_STRING_LEN)}...(truncated)`;
}
return value;
};
}

function stringifyData(data) {
let out;
try {
out = JSON.stringify(data, makeReplacer());
} catch (err) {
return `<unserializable: ${err.message}>`;
}
if (out && out.length > MAX_TOTAL_LEN) {
out = `${out.slice(0, MAX_TOTAL_LEN)}...(truncated)`;
}
return out;
}
Comment on lines +52 to +63
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The stringifyData function eagerly performs JSON.stringify on the message data. Since this is called by formatIpcMessage before passing the result to ipcLogger.info, the stringification overhead is incurred for every IPC message even if the logger is configured to ignore INFO level messages. For the "always on" application layer logs (Points A, B, C, D), this could impact performance under high IPC load. Consider making these logs opt-in or using a lower log level like debug to minimize default overhead.


/**
* Format a single IPC message into a one-line log string.
* @param {string} direction e.g. 'master->app#12345' / 'app#12345<-master'
* @param {Object} msg the message body (supports cluster internal msgs via `action: 'cluster:<act>'`)
* @param {*} [handle] optional handle (net.Socket / net.Server / TCP) attached to the IPC message
* @return {string}
*/
function formatIpcMessage(direction, msg, handle) {
const action = (msg && msg.action) || '<unknown>';
let out = `[${direction}] action=${action}`;
if (msg && msg.data !== undefined) {
out += ` data=${stringifyData(msg.data)}`;
}
if (handle) {
out += ` +handle=${describeHandle(handle)}`;
}
return out;
Comment on lines +52 to +81
}

module.exports = {
ipcLogger,
internalIpcLogEnabled,
formatIpcMessage,
};
3 changes: 3 additions & 0 deletions lib/utils/messenger.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const cluster = require('cluster');
const sendmessage = require('sendmessage');
const debug = require('debug')('egg-cluster:messenger');
const { ipcLogger, formatIpcMessage } = require('./ipc_logger');


/**
Expand Down Expand Up @@ -162,6 +163,8 @@ class Messenger {
if (data.receiverPid && data.receiverPid !== String(worker.process.pid)) {
continue;
}
// A. master -> app (send)
ipcLogger.info(formatIpcMessage(`master->app#${worker.process.pid}`, data));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Logging inside this loop is particularly expensive because formatIpcMessage (and its internal JSON.stringify) is called for every worker in the cluster. If a message is broadcast to many workers, the same data object is stringified repeatedly. This can cause significant CPU spikes in the Master process. It is recommended to pre-calculate the data string once outside the loop or use a more efficient logging strategy for broadcasts.

sendmessage(worker, data);
}
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
"ps-tree": "^1.2.0",
"semver": "^5.6.0",
"sendmessage": "^1.1.0",
"utility": "^1.15.0"
"utility": "^1.15.0",
"onelogger": "^1.0.1"
},
"devDependencies": {
"address": "^1.0.3",
Expand Down