Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,51 @@ var (
configFile string
)

// maxContainersPerChunk is the maximum number of containers per WebSocket message.
// Each container with full metadata (labels, ports, networks) can be 1-5KB of JSON.
// 15 containers ≈ 15-75KB which is well within typical proxy frame limits.
const maxContainersPerChunk = 15

// sendContainerList sends a list of Docker containers to the server, chunking if necessary
// to avoid exceeding WebSocket message size limits that can cause data loss through proxies.
// Each chunked batch includes a unique batchId so the server can distinguish interleaved sends
// (e.g., a manual fetch and a Docker event firing concurrently).
func sendContainerList(client *websocket.Client, containers []docker.Container) error {
if len(containers) <= maxContainersPerChunk {
// Small list — send in a single non-chunked message (backward compatible)
return client.SendMessage("newt/socket/containers", map[string]interface{}{
"containers": containers,
})
}

// Generate a unique batch ID so the server can handle concurrent chunk streams
batchId := generateChainId()

totalChunks := (len(containers) + maxContainersPerChunk - 1) / maxContainersPerChunk
logger.Info("Sending %d containers in %d chunks (batch=%s)", len(containers), totalChunks, batchId)

for i := 0; i < len(containers); i += maxContainersPerChunk {
end := i + maxContainersPerChunk
if end > len(containers) {
end = len(containers)
}
chunkIndex := i / maxContainersPerChunk

err := client.SendMessage("newt/socket/containers", map[string]interface{}{
"containers": containers[i:end],
"chunkIndex": chunkIndex,
"totalChunks": totalChunks,
"batchId": batchId,
})
if err != nil {
return fmt.Errorf("failed to send chunk %d/%d (batch=%s): %w", chunkIndex+1, totalChunks, batchId, err)
}
logger.Debug("Sent chunk %d/%d (%d containers, batch=%s)", chunkIndex+1, totalChunks, end-i, batchId)
}

return nil
}

// generateChainId generates a random chain ID for deduplicating round-trip messages.
func generateChainId() string {
b := make([]byte, 8)
Expand Down Expand Up @@ -1462,15 +1507,8 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
return
}

// Send container list back to server
err = client.SendMessage("newt/socket/containers", map[string]interface{}{
"containers": containers,
})
if err != nil {
logger.Error("Failed to send registration message: %v", err)
}

if err != nil {
// Send container list back to server (chunked for large lists)
if err := sendContainerList(client, containers); err != nil {
logger.Error("Failed to send Docker container list: %v", err)
} else {
logger.Debug("Docker container list sent, count: %d", len(containers))
Expand Down Expand Up @@ -1883,12 +1921,9 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
if dockerSocket != "" {
logger.Debug("Initializing Docker event monitoring")
dockerEventMonitor, err = docker.NewEventMonitor(dockerSocket, dockerEnforceNetworkValidationBool, func(containers []docker.Container) {
// Send updated container list via websocket when Docker events occur
// Send updated container list via websocket when Docker events occur (chunked for large lists)
logger.Debug("Docker event detected, sending updated container list (%d containers)", len(containers))
err := client.SendMessage("newt/socket/containers", map[string]interface{}{
"containers": containers,
})
if err != nil {
if err := sendContainerList(client, containers); err != nil {
logger.Error("Failed to send updated container list after Docker event: %v", err)
} else {
logger.Debug("Updated container list sent successfully")
Expand Down