-
Notifications
You must be signed in to change notification settings - Fork 0
fix(session): dedupe compaction execution #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: qinglin-dev
Are you sure you want to change the base?
Changes from 9 commits
90e627d
8b423b2
44ed114
9862059
b0891fc
a96b544
29db465
fc39bc6
ed6b00e
bba7165
8269451
eb04273
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import { RuntimeFlags } from "@/effect/runtime-flags" | |
| import { EventV2 } from "@opencode-ai/core/event" | ||
| import { EventV2Bridge } from "@/event-v2-bridge" | ||
| import { SessionEvent } from "@opencode-ai/core/session-event" | ||
| import { Database } from "@/storage/db" | ||
| import { MessageTable, PartTable } from "./session.sql" | ||
|
|
||
| const log = Log.create({ service: "session.compaction" }) | ||
|
|
||
|
|
@@ -122,6 +124,23 @@ function completedCompactions(messages: MessageV2.WithParts[]) { | |
| }) | ||
| } | ||
|
|
||
| function activeCompactionMarker(messages: MessageV2.WithParts[]) { | ||
| const completed = new Set<MessageID>() | ||
| for (const msg of messages) { | ||
| if (msg.info.role !== "assistant") continue | ||
| if (msg.info.agent !== "compaction") continue | ||
| if (!msg.info.summary || !msg.info.finish) continue | ||
| completed.add(msg.info.parentID) | ||
| } | ||
|
|
||
| return messages.find( | ||
| (msg) => | ||
| msg.info.role === "user" && | ||
| msg.parts.some((part) => part.type === "compaction") && | ||
| !completed.has(msg.info.id), | ||
| ) | ||
| } | ||
|
|
||
| function buildPrompt(input: { previousSummary?: string; context: string[] }) { | ||
| const anchor = input.previousSummary | ||
| ? [ | ||
|
|
@@ -354,31 +373,60 @@ export const layer = Layer.effect( | |
| if (!parent || parent.info.role !== "user") { | ||
| throw new Error(`Compaction parent must be a user message: ${input.parentID}`) | ||
| } | ||
| const existing = Array.from(MessageV2.stream(input.sessionID)).find( | ||
| (msg) => | ||
| msg.info.role === "assistant" && | ||
| msg.info.agent === "compaction" && | ||
| msg.info.summary && | ||
| msg.info.parentID === input.parentID && | ||
| !msg.info.error, | ||
| ) | ||
| let inputMessages = input.messages | ||
| if (existing?.info.role === "assistant") { | ||
| if (existing.info.finish) return "continue" | ||
| const interrupted: MessageV2.Assistant = { | ||
| ...existing.info, | ||
| finish: "error", | ||
| error: MessageV2.fromError(new DOMException("Compaction interrupted before completion", "AbortError"), { | ||
| providerID: existing.info.providerID, | ||
| aborted: true, | ||
| }), | ||
| time: { | ||
| ...existing.info.time, | ||
| completed: existing.info.time.completed ?? Date.now(), | ||
| }, | ||
| } | ||
| yield* session.updateMessage(interrupted) | ||
| inputMessages = input.messages.map((msg) => | ||
| msg.info.id === interrupted.id ? { ...msg, info: interrupted } : msg, | ||
| ) | ||
| } | ||
|
|
||
| const userMessage = parent.info | ||
| const compactionPart = parent.parts.find((part): part is MessageV2.CompactionPart => part.type === "compaction") | ||
|
|
||
| let messages = input.messages | ||
| let messages = inputMessages | ||
| let replay: | ||
| | { | ||
| info: MessageV2.User | ||
| parts: MessageV2.Part[] | ||
| } | ||
| | undefined | ||
| if (input.overflow) { | ||
| const idx = input.messages.findIndex((m) => m.info.id === input.parentID) | ||
| const idx = inputMessages.findIndex((m) => m.info.id === input.parentID) | ||
| for (let i = idx - 1; i >= 0; i--) { | ||
| const msg = input.messages[i] | ||
| const msg = inputMessages[i] | ||
| if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) { | ||
| replay = { info: msg.info, parts: msg.parts } | ||
| messages = input.messages.slice(0, i) | ||
| messages = inputMessages.slice(0, i) | ||
| break | ||
| } | ||
| } | ||
| const hasContent = | ||
| replay && messages.some((m) => m.info.role === "user" && !m.parts.some((p) => p.type === "compaction")) | ||
| if (!hasContent) { | ||
| replay = undefined | ||
| messages = input.messages | ||
| messages = inputMessages | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -590,22 +638,43 @@ export const layer = Layer.effect( | |
| auto: boolean | ||
| overflow?: boolean | ||
| }) { | ||
| const msg = yield* session.updateMessage({ | ||
| id: MessageID.ascending(), | ||
| role: "user", | ||
| model: input.model, | ||
| sessionID: input.sessionID, | ||
| agent: input.agent, | ||
| time: { created: Date.now() }, | ||
| }) | ||
| yield* session.updatePart({ | ||
| id: PartID.ascending(), | ||
| messageID: msg.id, | ||
| sessionID: msg.sessionID, | ||
| type: "compaction", | ||
| auto: input.auto, | ||
| overflow: input.overflow, | ||
| }) | ||
| const created = yield* Effect.sync(() => | ||
| Database.transaction( | ||
| (tx) => { | ||
| const messages = Array.from(MessageV2.stream(input.sessionID)).reverse() | ||
| if (activeCompactionMarker(messages)) return undefined | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If a session already contains an unfinished compaction marker that is older than the latest finished assistant (for example from a crash or from the previous duplicate-compaction behavior), Useful? React with 👍 / 👎. |
||
|
|
||
| const now = Date.now() | ||
| const msg: MessageV2.User = { | ||
| id: MessageID.ascending(), | ||
| role: "user", | ||
| model: input.model, | ||
| sessionID: input.sessionID, | ||
| agent: input.agent, | ||
| time: { created: now }, | ||
| } | ||
| const part: MessageV2.CompactionPart = { | ||
| id: PartID.ascending(), | ||
| messageID: msg.id, | ||
| sessionID: msg.sessionID, | ||
| type: "compaction", | ||
| auto: input.auto, | ||
| overflow: input.overflow, | ||
| } | ||
| const { id, sessionID, ...info } = msg | ||
| const { id: partID, messageID, sessionID: partSessionID, ...partData } = part | ||
| tx.insert(MessageTable).values({ id, session_id: sessionID, time_created: now, data: info }).run() | ||
| tx.insert(PartTable) | ||
| .values({ id: partID, message_id: messageID, session_id: partSessionID, time_created: now, data: partData }) | ||
| .run() | ||
|
Comment on lines
+699
to
+702
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This direct insert bypasses Useful? React with 👍 / 👎. |
||
| return { msg, part, time: now } | ||
| }, | ||
| { behavior: "immediate" }, | ||
| ), | ||
| ) | ||
| if (!created) return | ||
| yield* session.updateMessage(created.msg) | ||
| yield* session.updatePart(created.part) | ||
| if (flags.experimentalEventSystem) { | ||
| yield* events.publish(SessionEvent.Compaction.Started, { | ||
| sessionID: input.sessionID, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a compaction retry sees a finished summary but the previous run crashed after the assistant was finalized and before the marker bookkeeping below ran, this early return prevents the retry from setting
tail_start_idor creating the auto-continue follow-up. In that recovery window the compacted session can drop the preserved tail from future context, and automatic compaction can stop without resuming the user's request; handle the already-finished case by completing the missing marker/follow-up work before returning.Useful? React with 👍 / 👎.