From 4b52a8c91630bb9c1f89bf10db114e80a34285ce Mon Sep 17 00:00:00 2001 From: phpoh Date: Tue, 7 Apr 2026 16:36:32 +0800 Subject: [PATCH] fix(workflow): prevent concurrent map read/write panic during stream processing When processing streaming output chunks, the code directly used shared map references without deep copying. This caused "concurrent map iteration and map write" panics when other goroutines were still modifying the original maps during JSON marshaling. Changes: - Remove single-chunk shortcut in buildStreamEndEvent that returned shared map reference directly - Remove single-element shortcuts in ConcatStructuredCallbackOutputs and ConcatStructuredCallbackInputs that returned shared references - Always use ConcatMaps to create deep copies of map data, preventing concurrent access issues This fixes the panic reported in issue #2607. Co-Authored-By: Claude Opus 4.6 --- .../workflow/internal/execute/callback.go | 30 ++++++------- .../workflow/internal/nodes/callbacks.go | 43 ++++++------------- 2 files changed, 27 insertions(+), 46 deletions(-) diff --git a/backend/domain/workflow/internal/execute/callback.go b/backend/domain/workflow/internal/execute/callback.go index 6d00da9bbf..cec934075d 100644 --- a/backend/domain/workflow/internal/execute/callback.go +++ b/backend/domain/workflow/internal/execute/callback.go @@ -952,16 +952,14 @@ func buildStreamEndEvent(c *Context, mapChunks []map[string]any, } if len(mapChunks) > 0 { - var outputMap map[string]any - if len(mapChunks) == 1 { - outputMap = mapChunks[0] - } else { - m, err := nodes.ConcatMaps(reflect.ValueOf(mapChunks)) - if err != nil { - return nil, err - } - outputMap = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy of the map chunks. + // This prevents concurrent map read/write issues when the original map + // is still being modified by other goroutines. + m, err := nodes.ConcatMaps(reflect.ValueOf(mapChunks)) + if err != nil { + return nil, err } + outputMap := m.Interface().(map[string]any) e := &Event{ Type: NodeEndStreaming, @@ -975,14 +973,12 @@ func buildStreamEndEvent(c *Context, mapChunks []map[string]any, return e, nil } - var fullStructuredOutput *nodes.StructuredCallbackOutput - if len(structuredChunks) == 1 { - fullStructuredOutput = structuredChunks[0] - } else { - fullStructuredOutput, err = nodes.ConcatStructuredCallbackOutputs(structuredChunks) - if err != nil { - return nil, err - } + // Use ConcatStructuredCallbackOutputs to create a deep copy of the structured chunks. + // This prevents concurrent map read/write issues when the original output map + // is still being modified by other goroutines. + fullStructuredOutput, err := nodes.ConcatStructuredCallbackOutputs(structuredChunks) + if err != nil { + return nil, err } e := &Event{ diff --git a/backend/domain/workflow/internal/nodes/callbacks.go b/backend/domain/workflow/internal/nodes/callbacks.go index 24dec6a0ed..00d1cf65c9 100644 --- a/backend/domain/workflow/internal/nodes/callbacks.go +++ b/backend/domain/workflow/internal/nodes/callbacks.go @@ -47,10 +47,6 @@ func ConcatStructuredCallbackOutputs(outputs []*StructuredCallbackOutput) ( return nil, nil } - if len(outputs) == 1 { - return outputs[0], nil - } - var ( fullOutput map[string]any fullRawOutput *string @@ -109,25 +105,19 @@ func ConcatStructuredCallbackOutputs(outputs []*StructuredCallbackOutput) ( } if len(inputList) > 0 { - if len(inputList) == 1 { - input = inputList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(inputList)); err != nil { - return nil, err - } - input = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(inputList)); err != nil { + return nil, err } + input = m.Interface().(map[string]any) } if len(extraList) > 0 { - if len(extraList) == 1 { - extra = extraList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { - return nil, err - } - extra = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { + return nil, err } + extra = m.Interface().(map[string]any) } if len(answerList) > 0 { @@ -163,10 +153,6 @@ func ConcatStructuredCallbackInputs(inputs []*StructuredCallbackInput) ( return nil, nil } - if len(inputs) == 1 { - return inputs[0], nil - } - var ( extra map[string]any input map[string]any @@ -181,20 +167,19 @@ func ConcatStructuredCallbackInputs(inputs []*StructuredCallbackInput) ( } } + // Use ConcatMaps to create a deep copy, preventing concurrent access issues m, err := ConcatMaps(reflect.ValueOf(inputLists)) if err != nil { return nil, err } + input = m.Interface().(map[string]any) if len(extraList) > 0 { - if len(extraList) == 1 { - extra = extraList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { - return nil, err - } - extra = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { + return nil, err } + extra = m.Interface().(map[string]any) } return &StructuredCallbackInput{