diff --git a/backend/domain/workflow/internal/execute/callback.go b/backend/domain/workflow/internal/execute/callback.go index 6d00da9bbf..cec934075d 100644 --- a/backend/domain/workflow/internal/execute/callback.go +++ b/backend/domain/workflow/internal/execute/callback.go @@ -952,16 +952,14 @@ func buildStreamEndEvent(c *Context, mapChunks []map[string]any, } if len(mapChunks) > 0 { - var outputMap map[string]any - if len(mapChunks) == 1 { - outputMap = mapChunks[0] - } else { - m, err := nodes.ConcatMaps(reflect.ValueOf(mapChunks)) - if err != nil { - return nil, err - } - outputMap = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy of the map chunks. + // This prevents concurrent map read/write issues when the original map + // is still being modified by other goroutines. + m, err := nodes.ConcatMaps(reflect.ValueOf(mapChunks)) + if err != nil { + return nil, err } + outputMap := m.Interface().(map[string]any) e := &Event{ Type: NodeEndStreaming, @@ -975,14 +973,12 @@ func buildStreamEndEvent(c *Context, mapChunks []map[string]any, return e, nil } - var fullStructuredOutput *nodes.StructuredCallbackOutput - if len(structuredChunks) == 1 { - fullStructuredOutput = structuredChunks[0] - } else { - fullStructuredOutput, err = nodes.ConcatStructuredCallbackOutputs(structuredChunks) - if err != nil { - return nil, err - } + // Use ConcatStructuredCallbackOutputs to create a deep copy of the structured chunks. + // This prevents concurrent map read/write issues when the original output map + // is still being modified by other goroutines. + fullStructuredOutput, err := nodes.ConcatStructuredCallbackOutputs(structuredChunks) + if err != nil { + return nil, err } e := &Event{ diff --git a/backend/domain/workflow/internal/nodes/callbacks.go b/backend/domain/workflow/internal/nodes/callbacks.go index 24dec6a0ed..00d1cf65c9 100644 --- a/backend/domain/workflow/internal/nodes/callbacks.go +++ b/backend/domain/workflow/internal/nodes/callbacks.go @@ -47,10 +47,6 @@ func ConcatStructuredCallbackOutputs(outputs []*StructuredCallbackOutput) ( return nil, nil } - if len(outputs) == 1 { - return outputs[0], nil - } - var ( fullOutput map[string]any fullRawOutput *string @@ -109,25 +105,19 @@ func ConcatStructuredCallbackOutputs(outputs []*StructuredCallbackOutput) ( } if len(inputList) > 0 { - if len(inputList) == 1 { - input = inputList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(inputList)); err != nil { - return nil, err - } - input = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(inputList)); err != nil { + return nil, err } + input = m.Interface().(map[string]any) } if len(extraList) > 0 { - if len(extraList) == 1 { - extra = extraList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { - return nil, err - } - extra = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { + return nil, err } + extra = m.Interface().(map[string]any) } if len(answerList) > 0 { @@ -163,10 +153,6 @@ func ConcatStructuredCallbackInputs(inputs []*StructuredCallbackInput) ( return nil, nil } - if len(inputs) == 1 { - return inputs[0], nil - } - var ( extra map[string]any input map[string]any @@ -181,20 +167,19 @@ func ConcatStructuredCallbackInputs(inputs []*StructuredCallbackInput) ( } } + // Use ConcatMaps to create a deep copy, preventing concurrent access issues m, err := ConcatMaps(reflect.ValueOf(inputLists)) if err != nil { return nil, err } + input = m.Interface().(map[string]any) if len(extraList) > 0 { - if len(extraList) == 1 { - extra = extraList[0] - } else { - if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { - return nil, err - } - extra = m.Interface().(map[string]any) + // Use ConcatMaps to create a deep copy, preventing concurrent access issues + if m, err = ConcatMaps(reflect.ValueOf(extraList)); err != nil { + return nil, err } + extra = m.Interface().(map[string]any) } return &StructuredCallbackInput{