From f1ae17e3b1abe4c615d4f6f5f9c33674046c08aa Mon Sep 17 00:00:00 2001 From: jaydeep-pipaliya <71074587+jaydeep-pipaliya@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:02:41 +0530 Subject: [PATCH 1/2] fix: support chunked Docker container messages from newt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When newt sends large Docker container lists (>20 containers), the WebSocket messages can be dropped by intermediary proxies. This adds support for reassembling chunked container messages. Newt now sends large lists in multiple chunks with chunkIndex and totalChunks metadata. Pangolin accumulates chunks in cache and processes the complete list once all chunks arrive. Backward compatible — non-chunked messages from older newt versions are handled as before. Fixes #2117 --- server/routers/newt/handleSocketMessages.ts | 84 ++++++++++++++++++--- 1 file changed, 74 insertions(+), 10 deletions(-) diff --git a/server/routers/newt/handleSocketMessages.ts b/server/routers/newt/handleSocketMessages.ts index 383ab55418..afa7fab268 100644 --- a/server/routers/newt/handleSocketMessages.ts +++ b/server/routers/newt/handleSocketMessages.ts @@ -33,6 +33,39 @@ export const handleDockerStatusMessage: MessageHandler = async (context) => { return; }; +/** + * Get the cache key for storing in-progress chunked container data. + */ +function getChunkCacheKey(newtId: string): string { + return `${newtId}:dockerContainersChunks`; +} + +/** + * Process a complete container list (either received all at once or after reassembly). + */ +async function processContainerList( + newtId: string, + siteId: number | null, + containers: any[] +) { + logger.info( + `Docker containers for Newt ${newtId}: ${containers.length}` + ); + + if (containers.length > 0) { + await cache.set(`${newtId}:dockerContainers`, containers, 0); + } else { + logger.warn(`Newt ${newtId} does not have Docker containers`); + } + + if (!siteId) { + logger.warn("Newt has no site!"); + return; + } + + await applyNewtDockerBlueprint(siteId, newtId, containers); +} + export const handleDockerContainersMessage: MessageHandler = async ( context ) => { @@ -47,22 +80,53 @@ export const handleDockerContainersMessage: MessageHandler = async ( } logger.info(`Newt ID: ${newt.newtId}, Site ID: ${newt.siteId}`); - const { containers } = message.data; + const { containers, chunkIndex, totalChunks } = message.data; + // Non-chunked message (backward compatible with older newt versions) + if (totalChunks === undefined || totalChunks <= 1) { + await processContainerList( + newt.newtId, + newt.siteId, + containers || [] + ); + return; + } + + // Chunked message — accumulate chunks in cache then process when complete logger.info( - `Docker containers for Newt ${newt.newtId}: ${containers ? containers.length : 0}` + `Received chunk ${chunkIndex + 1}/${totalChunks} for Newt ${newt.newtId} (${containers?.length || 0} containers in this chunk)` ); - if (containers && containers.length > 0) { - await cache.set(`${newt.newtId}:dockerContainers`, containers, 0); - } else { - logger.warn(`Newt ${newt.newtId} does not have Docker containers`); + const chunkKey = getChunkCacheKey(newt.newtId); + const existing = + (await cache.get<{ receivedChunks: number; containers: any[] }>( + chunkKey + )) || { receivedChunks: 0, containers: [] }; + + if (chunkIndex === 0) { + // First chunk — reset accumulator + existing.receivedChunks = 0; + existing.containers = []; } - if (!newt.siteId) { - logger.warn("Newt has no site!"); - return; + if (containers && containers.length > 0) { + existing.containers.push(...containers); } + existing.receivedChunks++; - await applyNewtDockerBlueprint(newt.siteId, newt.newtId, containers); + if (existing.receivedChunks >= totalChunks) { + // All chunks received — process the complete list + logger.info( + `All ${totalChunks} chunks received for Newt ${newt.newtId}, total containers: ${existing.containers.length}` + ); + await cache.del(chunkKey); + await processContainerList( + newt.newtId, + newt.siteId, + existing.containers + ); + } else { + // Store partial data with a TTL so stale chunks don't accumulate forever + await cache.set(chunkKey, existing, 120); + } }; From 5f37315c5319d05ff53e8929b37629f596f02ba6 Mon Sep 17 00:00:00 2001 From: jaydeep-pipaliya <71074587+jaydeep-pipaliya@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:06:04 +0530 Subject: [PATCH 2/2] fix: robust chunk reassembly with batchId, validation, and typed accumulator - Added batchId to distinguish concurrent chunk streams from same newt - New batch automatically supersedes any in-progress batch (no corruption) - Validate chunkIndex/totalChunks/batchId before processing - Typed ChunkAccumulator interface for cache data - Guard against non-array containers field --- server/routers/newt/handleSocketMessages.ts | 81 +++++++++++++++------ 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/server/routers/newt/handleSocketMessages.ts b/server/routers/newt/handleSocketMessages.ts index afa7fab268..eabdf12138 100644 --- a/server/routers/newt/handleSocketMessages.ts +++ b/server/routers/newt/handleSocketMessages.ts @@ -33,8 +33,15 @@ export const handleDockerStatusMessage: MessageHandler = async (context) => { return; }; +interface ChunkAccumulator { + batchId: string; + totalChunks: number; + receivedChunks: number; + containers: any[]; +} + /** - * Get the cache key for storing in-progress chunked container data. + * Cache key for in-progress chunked container data, scoped by newt and batch. */ function getChunkCacheKey(newtId: string): string { return `${newtId}:dockerContainersChunks`; @@ -80,7 +87,7 @@ export const handleDockerContainersMessage: MessageHandler = async ( } logger.info(`Newt ID: ${newt.newtId}, Site ID: ${newt.siteId}`); - const { containers, chunkIndex, totalChunks } = message.data; + const { containers, chunkIndex, totalChunks, batchId } = message.data; // Non-chunked message (backward compatible with older newt versions) if (totalChunks === undefined || totalChunks <= 1) { @@ -92,41 +99,71 @@ export const handleDockerContainersMessage: MessageHandler = async ( return; } - // Chunked message — accumulate chunks in cache then process when complete + // Validate chunk metadata + if ( + typeof chunkIndex !== "number" || + typeof totalChunks !== "number" || + chunkIndex < 0 || + chunkIndex >= totalChunks || + totalChunks > 100 + ) { + logger.warn( + `Invalid chunk metadata from Newt ${newt.newtId}: chunkIndex=${chunkIndex}, totalChunks=${totalChunks}` + ); + return; + } + + if (!batchId || typeof batchId !== "string") { + logger.warn( + `Missing batchId in chunked message from Newt ${newt.newtId}` + ); + return; + } + logger.info( - `Received chunk ${chunkIndex + 1}/${totalChunks} for Newt ${newt.newtId} (${containers?.length || 0} containers in this chunk)` + `Received chunk ${chunkIndex + 1}/${totalChunks} for Newt ${newt.newtId} batch=${batchId} (${containers?.length || 0} containers)` ); const chunkKey = getChunkCacheKey(newt.newtId); - const existing = - (await cache.get<{ receivedChunks: number; containers: any[] }>( - chunkKey - )) || { receivedChunks: 0, containers: [] }; - - if (chunkIndex === 0) { - // First chunk — reset accumulator - existing.receivedChunks = 0; - existing.containers = []; + const existing = await cache.get(chunkKey); + + let accumulator: ChunkAccumulator; + + if (!existing || existing.batchId !== batchId) { + // New batch or different batch — start fresh + // This handles concurrent sends: the newer batch supersedes the old one + if (existing && existing.batchId !== batchId) { + logger.info( + `New batch ${batchId} supersedes in-progress batch ${existing.batchId} for Newt ${newt.newtId}` + ); + } + accumulator = { + batchId, + totalChunks, + receivedChunks: 0, + containers: [] + }; + } else { + accumulator = existing; } - if (containers && containers.length > 0) { - existing.containers.push(...containers); + if (containers && Array.isArray(containers)) { + accumulator.containers.push(...containers); } - existing.receivedChunks++; + accumulator.receivedChunks++; - if (existing.receivedChunks >= totalChunks) { - // All chunks received — process the complete list + if (accumulator.receivedChunks >= accumulator.totalChunks) { logger.info( - `All ${totalChunks} chunks received for Newt ${newt.newtId}, total containers: ${existing.containers.length}` + `All ${totalChunks} chunks received for Newt ${newt.newtId} batch=${batchId}, total containers: ${accumulator.containers.length}` ); await cache.del(chunkKey); await processContainerList( newt.newtId, newt.siteId, - existing.containers + accumulator.containers ); } else { - // Store partial data with a TTL so stale chunks don't accumulate forever - await cache.set(chunkKey, existing, 120); + // Store partial data with a TTL to prevent stale chunks from accumulating + await cache.set(chunkKey, accumulator, 120); } };