diff --git a/backend/application/workflow/chatflow.go b/backend/application/workflow/chatflow.go index ce308b164f..f33eccd752 100644 --- a/backend/application/workflow/chatflow.go +++ b/backend/application/workflow/chatflow.go @@ -52,7 +52,6 @@ import ( "github.com/coze-dev/coze-studio/backend/pkg/logs" "github.com/coze-dev/coze-studio/backend/pkg/safego" "github.com/coze-dev/coze-studio/backend/pkg/sonic" - "github.com/coze-dev/coze-studio/backend/pkg/taskgroup" "github.com/coze-dev/coze-studio/backend/types/consts" "github.com/coze-dev/coze-studio/backend/types/errno" ) @@ -642,6 +641,22 @@ func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workfl sectionID = sID } + // 先处理历史消息(additional_messages 中除最后一条外的所有消息) + // 确保历史消息的 CreatedAt 早于当前用户消息,使 prefetchChatHistory 的跳过逻辑正确工作 + messageClient := crossmessage.DefaultSVC() + historyMessages, err := makeChatFlowHistoryMessages(ctx, bizID, conversationID, userID, sectionID, connectorID, messages[:len(req.GetAdditionalMessages())-1]) + if err != nil { + return nil, err + } + + if len(historyMessages) > 0 { + _, err := messageClient.BatchCreate(ctx, historyMessages) + if err != nil { + return nil, err + } + } + + // 再创建当前轮次的 runRecord 和用户消息 runRecord, err := crossagentrun.DefaultSVC().Create(ctx, &agententity.AgentRunMeta{ AgentID: bizID, ConversationID: conversationID, @@ -660,7 +675,6 @@ func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workfl return nil, err } - messageClient := crossmessage.DefaultSVC() _, err = messageClient.Create(ctx, userMessage) if err != nil { return nil, err @@ -743,28 +757,6 @@ func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workfl Cancellable: isDebug, } - historyMessages, err := makeChatFlowHistoryMessages(ctx, bizID, conversationID, userID, sectionID, connectorID, messages[:len(req.GetAdditionalMessages())-1]) - if err != nil { - return nil, err - } - - if len(historyMessages) > 0 { - g := taskgroup.NewTaskGroup(ctx, len(historyMessages)) - for _, hm := range historyMessages { - hMsg := hm - g.Go(func() error { - _, err := messageClient.Create(ctx, hMsg) - if err != nil { - return err - } - return nil - }) - } - err = g.Wait() - if err != nil { - logs.CtxWarnf(ctx, "create history message failed, err=%v", err) - } - } parameters[vo.UserInputKey], err = w.makeChatFlowUserInput(ctx, lastUserMessage) if err != nil { return nil, err @@ -1226,8 +1218,9 @@ func makeChatFlowHistoryMessages(ctx context.Context, bizID, conversationID, use ) historyMessages := make([]*message.Message, 0, len(messages)) + baseTimestamp := time.Now().UnixMilli() - int64(len(messages)*1000) - for _, msg := range messages { + for i, msg := range messages { if msg.Role == userRole { runRecord, err = crossagentrun.DefaultSVC().Create(ctx, &agententity.AgentRunMeta{ AgentID: bizID, @@ -1253,6 +1246,7 @@ func makeChatFlowHistoryMessages(ctx context.Context, bizID, conversationID, use return nil, err } + m.CreatedAt = baseTimestamp + int64(i*100) historyMessages = append(historyMessages, m) } @@ -1359,7 +1353,7 @@ func toConversationMessage(ctx context.Context, bizID, cid, userID, roundID, sec } if msg.ContentType == "text" { return &message.Message{ - Role: schema.User, + Role: schema.RoleType(msg.Role), ConversationID: cid, AgentID: bizID, RunID: roundID, @@ -1378,7 +1372,7 @@ func toConversationMessage(ctx context.Context, bizID, cid, userID, roundID, sec } m := &message.Message{ - Role: schema.User, + Role: schema.RoleType(msg.Role), MessageType: messageType, ConversationID: cid, AgentID: bizID,