diff --git a/.changeset/fork-at-message-pin-bumps.md b/.changeset/fork-at-message-pin-bumps.md new file mode 100644 index 0000000000..558dab9efa --- /dev/null +++ b/.changeset/fork-at-message-pin-bumps.md @@ -0,0 +1,7 @@ +--- +'@electric-ax/agents': patch +'@electric-ax/agents-server-conformance-tests': patch +'electric-ax': patch +--- + +Bump `@durable-streams/{client,server,state}` pins in step with `@electric-ax/agents-runtime` and `@electric-ax/agents-server` to pick up the fork-at-pointer (`Stream-Fork-Offset` + `Stream-Fork-Sub-Offset`) wire protocol that the new fork-at-message UX depends on. No other code changes in these packages. diff --git a/.changeset/fork-at-message-runtime.md b/.changeset/fork-at-message-runtime.md new file mode 100644 index 0000000000..66a66ec3d6 --- /dev/null +++ b/.changeset/fork-at-message-runtime.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-runtime': patch +--- + +Add `EventPointer { offset, subOffset }` for addressing single events on a durable stream. Widen `__electricRowOffsets` side-tables on `EntityStreamDB` collections from `Map` to `Map`, with pointers minted along log-entry boundaries (grouped by each item's `headers.offset`) so they round-trip cleanly through `Stream-Fork-Sub-Offset` regardless of how a live read is chunked. diff --git a/.changeset/fork-at-message-server-ui.md b/.changeset/fork-at-message-server-ui.md new file mode 100644 index 0000000000..45525ba76f --- /dev/null +++ b/.changeset/fork-at-message-server-ui.md @@ -0,0 +1,6 @@ +--- +'@electric-ax/agents-server': patch +'@electric-ax/agents-server-ui': patch +--- + +Fork at an earlier message instead of only at HEAD. `POST /_electric/entities///fork` accepts an optional `fork_pointer: { offset, sub_offset }` (snake_case wire) that truncates the new entity's `main` stream up to and including the chosen event; `error` and shared-state streams still clone at HEAD; the root's manifest is filtered so descendants spawned after the pointer are dropped from the fork along with their subtrees. Pointer-forks skip the all-subtree-idle wait on the root (the historical read can't be torn by concurrent writes past the pointer), so the affordance works during the post-run keep-alive window. UI: hover-revealed "Fork from here" button on user-message bubbles in `ChatView`, anchored to the latest preceding completed `runs` row; suppressed on the first message and while a run is in flight. diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index 35ba9131fa..c81c28b3fd 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -2,6 +2,7 @@ import { queryOnce } from '@durable-streams/state' import { assembleContext } from './context-assembly' import { createContextEntriesApi } from './context-entries' import { entityStateSchema } from './entity-schema' +import { formatPointerOrderToken } from './event-pointer' import { createOutboundBridge, loadOutboundIdSeed } from './outbound-bridge' import { createPiAgentAdapter } from './pi-adapter' import { @@ -437,9 +438,13 @@ export function createHandlerContext( function readContextHistoryOffset(row: { key: string }): string | undefined { const contextInserted = config.db.collections.contextInserted - const rowOffset = contextInserted.__electricRowOffsets?.get(row.key) - if (typeof rowOffset === `string`) { - return rowOffset + const pointer = contextInserted.__electricRowOffsets?.get(row.key) + if (pointer) { + // Format the pointer as a stable, sortable string. Matches the + // `_timeline_order` produced by `entity-stream-db` so that + // `loadContextHistory(id, offset)` can round-trip lookups + // against the same row. + return formatPointerOrderToken(pointer) } const seq = Reflect.get(row, `_seq`) diff --git a/packages/agents-runtime/src/entity-stream-db.ts b/packages/agents-runtime/src/entity-stream-db.ts index f737181cee..6370144414 100644 --- a/packages/agents-runtime/src/entity-stream-db.ts +++ b/packages/agents-runtime/src/entity-stream-db.ts @@ -8,6 +8,7 @@ import { isControlEvent, } from '@durable-streams/state' import { builtInCollections, passthrough } from './entity-schema' +import { formatPointerOrderToken, type EventPointer } from './event-pointer' import type { ActionDefinition, ChangeEvent, @@ -45,7 +46,7 @@ interface EntityWriteUtils { type EntityCollectionMeta = { __electricSourceDb?: EntityStreamDBWithActions __electricSourceId?: string - __electricRowOffsets?: Map + __electricRowOffsets?: Map __electricTimelineOrders?: Map } @@ -99,14 +100,6 @@ type EntityStreamDBOptions = { ) const WRITE_TXID_TIMEOUT_MS = 20_000 -const TIMELINE_OFFSET_WIDTH = 24 -const TIMELINE_BATCH_INDEX_WIDTH = 8 - -function formatStreamTimelineOrder(offset: string, index: number): string { - return `stream:${offset.padStart(TIMELINE_OFFSET_WIDTH, `0`)}:${index - .toString() - .padStart(TIMELINE_BATCH_INDEX_WIDTH, `0`)}` -} /** * Create a StreamDB connected to a Electric Agents entity stream. @@ -145,7 +138,10 @@ export function createEntityStreamDB( ...streamCustomState, } const collectionNameByEventType = new Map() - const rowOffsetsByCollection = new Map>() + const rowOffsetsByCollection = new Map< + string, + Map + >() const timelineOrdersByCollection = new Map< string, Map @@ -155,6 +151,12 @@ export function createEntityStreamDB( rowOffsetsByCollection.set(name, new Map()) timelineOrdersByCollection.set(name, new Map()) } + + // Tracks the END offset of the previous batch — i.e. the START + // offset of the next batch's items, which is the anchor we pair + // with each item's sub-offset to form an `EventPointer`. `null` + // before any batch has arrived (anchor at stream start). + let previousBatchOffset: string | null = null // Build a reverse map from TanStack DB collection id to schema key const collIdToSchemaKey: Record = {} for (const name of Object.keys(mergedCollections)) { @@ -285,7 +287,21 @@ export function createEntityStreamDB( onBeforeBatch: (batch) => { opts?.onBeforeBatch?.(batch) replayBatchOffset.current = batch.offset - batch.items.forEach((item, itemIndex) => { + // `Stream-Fork-Sub-Offset` addresses items WITHIN A SINGLE LOG + // ENTRY (the first log entry past the anchor), not items globally + // past the anchor. To mint server-compatible pointers we group + // items in the batch by their `headers.offset` (= the end offset + // of the log entry that produced them, stable at write time). + // Each contiguous group of items sharing an `headers.offset` is + // one log entry; within it sub-offsets are 1..K. The anchor + // offset for that group is the END offset of the PRECEDING log + // entry — either the previous distinct `headers.offset` we saw + // in this batch, or `previousBatchOffset` for the first group in + // a fresh batch. + let currentEntryOffset: string | null = null + let priorEntryOffset: string | null = previousBatchOffset + let positionInEntry = 0 + batch.items.forEach((item) => { if (isControlEvent(item)) { if (item.headers.control === `reset`) { for (const offsets of rowOffsetsByCollection.values()) { @@ -294,14 +310,34 @@ export function createEntityStreamDB( for (const orders of timelineOrdersByCollection.values()) { orders.clear() } + previousBatchOffset = null + currentEntryOffset = null + priorEntryOffset = null + positionInEntry = 0 } return } if (!isChangeEvent(item)) return const collectionName = collectionNameByEventType.get(item.type) if (!collectionName) return - const offset = item.headers.offset ?? batch.offset - rowOffsetsByCollection.get(collectionName)?.set(item.key, offset) + + const itemEntryOffset = + typeof (item.headers as Record).offset === `string` + ? ((item.headers as Record).offset as string) + : null + if (itemEntryOffset !== currentEntryOffset) { + // Boundary into a new log entry. + priorEntryOffset = currentEntryOffset ?? previousBatchOffset + currentEntryOffset = itemEntryOffset + positionInEntry = 0 + } + positionInEntry++ + + const pointer: EventPointer = { + offset: priorEntryOffset, + subOffset: positionInEntry, + } + rowOffsetsByCollection.get(collectionName)?.set(item.key, pointer) if (item.headers.operation === `delete`) return if (typeof item.value !== `object` || item.value === null) return @@ -310,11 +346,16 @@ export function createEntityStreamDB( if (!orders) return let order = orders.get(item.key) if (!order) { - order = formatStreamTimelineOrder(offset, itemIndex) + order = formatPointerOrderToken(pointer) orders.set(item.key, order) } ;(item.value as Record)._timeline_order = order }) + // After processing the batch, advance the anchor for next time. + // `batch.offset` is the `Stream-Next-Offset` for this batch — + // i.e. the cursor that the NEXT batch's items will be addressed + // relative to. + previousBatchOffset = batch.offset }, onBatch: (batch) => { opts?.onBatch?.(batch) @@ -669,11 +710,17 @@ export function createEntityStreamDB( event.value !== null ) { const orders = timelineOrdersByCollection.get(collectionName) + // applyEvent stages an in-process event (not delivered through + // a wire batch). It carries a single event, so the pointer's + // sub-offset is always 1 ("the one item past this anchor"). + // If no real offset is available, synthesize a monotonically- + // increasing `local:...` token so successive applyEvent calls + // still sort in invocation order. const offset = event.headers.offset ?? `local:${Date.now().toString().padStart(13, `0`)}` - const order = - orders?.get(event.key) ?? formatStreamTimelineOrder(offset, 0) + const pointer: EventPointer = { offset, subOffset: 1 } + const order = orders?.get(event.key) ?? formatPointerOrderToken(pointer) orders?.set(event.key, order) ;(event.value as Record)._timeline_order = order } diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index e4c97fc5fc..72bec0e6f5 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -18,6 +18,7 @@ import type { QueryBuilder, } from '@tanstack/db' import type { EntityStreamDB } from './entity-stream-db' +import { formatPointerOrderToken, type EventPointer } from './event-pointer' import type { ChildStatusEntry, MessageReceived, Signal } from './entity-schema' import type { ManifestEntry, Wake, WakeMessage } from './types' @@ -500,7 +501,7 @@ function readRequiredOrderToken( collection: { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map }, row: TRow, index: number @@ -510,9 +511,9 @@ function readRequiredOrderToken( return timelineOrder } - const offset = collection.__electricRowOffsets?.get(row.key) - if (offset) { - return `offset:${offset}` + const pointer = collection.__electricRowOffsets?.get(row.key) + if (pointer) { + return formatPointerOrderToken(pointer) } const inlineSeq = readInlineSeq(row) @@ -526,7 +527,7 @@ function readRequiredOrderToken( function readOptionalOrderToken( collection: { toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map }, row: TRow ): string | undefined { @@ -535,9 +536,9 @@ function readOptionalOrderToken( return timelineOrder } - const offset = collection.__electricRowOffsets?.get(row.key) - if (offset) { - return `offset:${offset}` + const pointer = collection.__electricRowOffsets?.get(row.key) + if (pointer) { + return formatPointerOrderToken(pointer) } const inlineSeq = readInlineSeq(row) @@ -547,7 +548,7 @@ function readOptionalOrderToken( function withOrderToken(collection: { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map }): Array> { return collection.toArray.map((row, index) => ({ ...row, @@ -559,7 +560,7 @@ function withOptionalOrderToken< TRow extends { key: string | number }, >(collection: { toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map }): Array { return collection.toArray.map((row) => { const orderToken = readOptionalOrderToken(collection, row) @@ -574,14 +575,14 @@ function getOrderableCollection( | { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map } | undefined, id: string ): { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map } { if (!collection) { throw new Error( @@ -617,9 +618,12 @@ function withoutOrderToken( } function orderTokenToHistoryOffset(orderToken: string): string { - return orderToken.startsWith(`offset:`) - ? orderToken.slice(`offset:`.length) - : orderToken + // The order token is already a stable, sortable string representation + // of an `EventPointer` (or a `_seq` / `pending` fallback). Round-trip + // semantics are maintained as long as every callsite that produces + // a historyOffset goes through the same formatter — see + // `readContextHistoryOffset` in `context-factory.ts`. + return orderToken } function withOrderFromOrderIndex( @@ -973,7 +977,7 @@ export function buildEntityTimelineData( | { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map } | undefined, `contextInserted` @@ -985,7 +989,7 @@ export function buildEntityTimelineData( | { id?: string toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map } | undefined, `contextRemoved` diff --git a/packages/agents-runtime/src/event-pointer.ts b/packages/agents-runtime/src/event-pointer.ts new file mode 100644 index 0000000000..256ba9a053 --- /dev/null +++ b/packages/agents-runtime/src/event-pointer.ts @@ -0,0 +1,118 @@ +/** + * `EventPointer` addresses a single event on a Durable Stream. + * + * It is the pair the agents-server uses on the wire when forking: it + * maps directly to the `Stream-Fork-Offset` + `Stream-Fork-Sub-Offset` + * headers in the Durable Streams protocol. + * + * Semantics: + * + * - `offset` is the anchor offset on the source stream. `null` means + * "anchor at stream start" (which translates on the wire to + * omitting the `Stream-Fork-Offset` header). + * - `subOffset` is the count of JSON messages past the anchor to + * include in the fork. So sub-offset `N` includes items + * `[0, 1, ... N-1]` of the chunk that starts at `offset`. + * + * A pointer addresses inclusively-through-and-stops-at-item-(subOffset-1). + * For "fork up to and including item at batch-index `j`," set + * `subOffset = j + 1`. + * + * Why this shape and not the wire's opaque offset alone: + * the protocol ships fork-side sub-offsets but does NOT add read-side + * sub-offsets — the wire never tells the client the sub-offset of a + * delivered item. We compute them locally by counting positions within + * each `JsonBatch.items` and pairing them with the batch's start + * offset (= the previous batch's `Stream-Next-Offset`, or `null` for + * the first batch of a fresh read). + * + * Limitation: local counting is correct when reads start from the + * beginning of the stream. If a future read-side feature ever lets us + * resume from an arbitrary mid-chunk cursor, our local counter would + * miscount items that the server already skipped past. When the + * protocol grows wire-provided values for that case, the local + * counter goes away. + */ +export interface EventPointer { + /** Anchor offset on the source stream, or `null` for stream-start. */ + offset: string | null + /** + * Count of JSON messages past `offset` to include — sub-offset N + * includes items `[0 .. N-1]`. For "include item at index j," set + * `subOffset = j + 1`. + */ + subOffset: number +} + +/** + * A pair of widths chosen to keep the order-token format stable. + * + * `OFFSET_WIDTH` is the legacy pad width used by `_timeline_order`. + * Bumping it would invalidate every persisted timeline ordering — so + * if you ever need to widen it, you also need a migration path. + */ +const OFFSET_WIDTH = 24 +const SUB_OFFSET_WIDTH = 8 + +/** + * The token prefix that signals "this row was sourced from a stream + * event" (as opposed to a pending optimistic row, a `_seq`-based + * fallback, etc.). Kept identical to the previous single-offset + * format so existing `like(_timeline_order, 'stream:...')` queries keep + * matching after the upgrade. + */ +export const STREAM_TOKEN_PREFIX = `stream:` + +/** + * Format an `EventPointer` as a lexicographically-sortable token for + * use in `_timeline_order` / `__electricRowOffsets`-derived ordering. + * + * The format is: + * + * `stream::` + * + * - Empty / null offsets zero-pad to all-zeros, which sorts before any + * non-empty offset — correct, since the first batch of a fresh read + * has `offset === null` and its items must sort before later batches. + * - Higher offset → sorts later. Within the same offset, higher + * sub-offset → sorts later. Net: strict monotonic order matching + * the stream's order. + */ +export function formatPointerOrderToken(pointer: EventPointer): string { + const paddedOffset = (pointer.offset ?? ``).padStart(OFFSET_WIDTH, `0`) + const paddedSubOffset = pointer.subOffset + .toString() + .padStart(SUB_OFFSET_WIDTH, `0`) + return `${STREAM_TOKEN_PREFIX}${paddedOffset}:${paddedSubOffset}` +} + +/** + * Compare two pointers in stream order. + * + * - returns `< 0` if `left` precedes `right` + * - returns `> 0` if `left` follows `right` + * - returns `0` if they address the same event + * + * Compares lexicographically on the order-token form so the relation + * agrees with `_timeline_order`'s string sort. + */ +export function comparePointers( + left: EventPointer, + right: EventPointer +): number { + const leftToken = formatPointerOrderToken(left) + const rightToken = formatPointerOrderToken(right) + if (leftToken < rightToken) return -1 + if (leftToken > rightToken) return 1 + return 0 +} + +/** + * The stream-start pointer — zero items past a null anchor. Used as + * the initial accumulator when iterating a fresh stream from the + * beginning before any batches have arrived. + */ +export const STREAM_START_POINTER: EventPointer = { + offset: null, + subOffset: 0, +} diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index a3b6ba67e7..84e57a3fe9 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -334,3 +334,11 @@ export type { export { registerToolProvider, unregisterToolProvider } from './tool-providers' export type { ToolProviderEntry } from './tool-providers' + +export { + comparePointers, + formatPointerOrderToken, + STREAM_START_POINTER, + STREAM_TOKEN_PREFIX, +} from './event-pointer' +export type { EventPointer } from './event-pointer' diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index 6afcb9d0c5..7569725f98 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -5,6 +5,7 @@ import { queryOnce, } from '@durable-streams/state' import { createWakeSession } from './wake-session' +import { comparePointers, type EventPointer } from './event-pointer' import { manifestChildKey, manifestEffectKey, @@ -636,20 +637,20 @@ export function createSetupContext( type OffsetAwareCollection = { toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map } function sortRowsByCollectionOrder( collection: OffsetAwareCollection ): Array { return [...collection.toArray].sort((left, right) => { - const leftOffset = collection.__electricRowOffsets?.get(left.key) - const rightOffset = collection.__electricRowOffsets?.get(right.key) - if (leftOffset && rightOffset) { - return leftOffset.localeCompare(rightOffset) + const leftPointer = collection.__electricRowOffsets?.get(left.key) + const rightPointer = collection.__electricRowOffsets?.get(right.key) + if (leftPointer && rightPointer) { + return comparePointers(leftPointer, rightPointer) } - if (leftOffset) return -1 - if (rightOffset) return 1 + if (leftPointer) return -1 + if (rightPointer) return 1 const leftSeq = Reflect.get(left, `_seq`) const rightSeq = Reflect.get(right, `_seq`) diff --git a/packages/agents-runtime/test/entity-timeline.test.ts b/packages/agents-runtime/test/entity-timeline.test.ts index d44209c5de..f59d13cea9 100644 --- a/packages/agents-runtime/test/entity-timeline.test.ts +++ b/packages/agents-runtime/test/entity-timeline.test.ts @@ -15,6 +15,8 @@ import { buildSections, buildTimelineEntries, } from '../src/use-chat' +import { formatPointerOrderToken } from '../src/event-pointer' +import type { EventPointer } from '../src/event-pointer' import type { EntityTimelineContentItem, EntityTimelineData, @@ -27,8 +29,11 @@ function order(index: number): string { return index.toString().padStart(20, `0`) } -function offset(index: number): string { - return `0000000000000000_${index.toString().padStart(16, `0`)}` +function offset(index: number): EventPointer { + return { + offset: `0000000000000000_${index.toString().padStart(16, `0`)}`, + subOffset: 1, + } } describe(`compareTimelineOrders`, () => { @@ -1517,11 +1522,11 @@ describe(`entity includes query`, () => { string, unknown > & { key: string | number }, - >(id: string, takeOffset: () => string) { + >(id: string, takeOffset: () => EventPointer) { let syncBegin: () => void let syncWrite: (msg: { type: string; value: T }) => void let syncCommit: () => void - const rowOffsets = new Map() + const rowOffsets = new Map() const collection = createCollection({ id, @@ -1539,7 +1544,7 @@ describe(`entity includes query`, () => { gcTime: 0, }) const collectionWithOffsets = collection as typeof collection & { - __electricRowOffsets?: Map + __electricRowOffsets?: Map } collectionWithOffsets.__electricRowOffsets = rowOffsets @@ -1911,15 +1916,11 @@ describe(`entity includes query`, () => { collections: { runs: { toArray: [{ key: `run-1`, status: `completed` }], - __electricRowOffsets: new Map([ - [`run-1`, `0000000000000000_0000000000000002`], - ]), + __electricRowOffsets: new Map([[`run-1`, offset(2)]]), }, texts: { toArray: [{ key: `text-1`, run_id: `run-1`, status: `completed` }], - __electricRowOffsets: new Map([ - [`text-1`, `0000000000000000_0000000000000003`], - ]), + __electricRowOffsets: new Map([[`text-1`, offset(3)]]), }, textDeltas: { toArray: [ @@ -1930,9 +1931,7 @@ describe(`entity includes query`, () => { delta: `hello from Rome`, }, ], - __electricRowOffsets: new Map([ - [`delta-1`, `0000000000000000_0000000000000004`], - ]), + __electricRowOffsets: new Map([[`delta-1`, offset(4)]]), }, toolCalls: { toArray: [], __electricRowOffsets: new Map() }, steps: { toArray: [], __electricRowOffsets: new Map() }, @@ -1946,9 +1945,7 @@ describe(`entity includes query`, () => { timestamp: `2026-03-31T10:00:00.000Z`, }, ], - __electricRowOffsets: new Map([ - [`msg-1`, `0000000000000000_0000000000000001`], - ]), + __electricRowOffsets: new Map([[`msg-1`, offset(1)]]), }, wakes: { toArray: [], __electricRowOffsets: new Map() }, signals: { toArray: [], __electricRowOffsets: new Map() }, @@ -2012,7 +2009,7 @@ describe(`entity includes query`, () => { { key: `context:search:a:1`, order: order(1), - historyOffset: offset(1), + historyOffset: formatPointerOrderToken(offset(1)), id: `search:a`, name: `search_results`, attrs: { query: `a` }, @@ -2024,7 +2021,7 @@ describe(`entity includes query`, () => { { key: `context:search:a:removed:2`, order: order(2), - historyOffset: offset(2), + historyOffset: formatPointerOrderToken(offset(2)), id: `search:a`, name: `search_results`, timestamp: `2026-04-13T00:01:00.000Z`, diff --git a/packages/agents-runtime/test/event-pointer.test.ts b/packages/agents-runtime/test/event-pointer.test.ts new file mode 100644 index 0000000000..929b70cc1d --- /dev/null +++ b/packages/agents-runtime/test/event-pointer.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it } from 'vitest' +import { + STREAM_START_POINTER, + STREAM_TOKEN_PREFIX, + comparePointers, + formatPointerOrderToken, + type EventPointer, +} from '../src/event-pointer' + +describe(`formatPointerOrderToken`, () => { + it(`prefixes every token with the stream marker`, () => { + const token = formatPointerOrderToken({ offset: `abc`, subOffset: 1 }) + expect(token.startsWith(STREAM_TOKEN_PREFIX)).toBe(true) + }) + + it(`zero-pads offset and sub-offset so lexicographic sort matches stream order`, () => { + const earlier = formatPointerOrderToken({ offset: `5`, subOffset: 1 }) + const later = formatPointerOrderToken({ offset: `10`, subOffset: 1 }) + // String "5" > "10" naively, but zero-padding fixes that: + expect(earlier < later).toBe(true) + }) + + it(`treats null offset (stream start) as sorting before any concrete offset`, () => { + const start = formatPointerOrderToken({ offset: null, subOffset: 1 }) + const concrete = formatPointerOrderToken({ offset: `1`, subOffset: 1 }) + expect(start < concrete).toBe(true) + }) + + it(`breaks ties by sub-offset`, () => { + const earlier = formatPointerOrderToken({ offset: `abc`, subOffset: 1 }) + const later = formatPointerOrderToken({ offset: `abc`, subOffset: 2 }) + expect(earlier < later).toBe(true) + }) + + it(`is monotonic across a realistic batch sequence`, () => { + const batches: Array<{ offset: string | null; items: number }> = [ + { offset: null, items: 3 }, // first batch (no prior anchor) + { offset: `aaa`, items: 2 }, // second batch anchored at aaa + { offset: `bbb`, items: 4 }, // third batch + ] + const tokens: Array = [] + for (const batch of batches) { + for (let j = 0; j < batch.items; j += 1) { + tokens.push( + formatPointerOrderToken({ + offset: batch.offset, + subOffset: j + 1, + }) + ) + } + } + const sorted = [...tokens].sort() + expect(sorted).toEqual(tokens) + }) +}) + +describe(`comparePointers`, () => { + const cases: Array<{ + name: string + left: EventPointer + right: EventPointer + expected: -1 | 0 | 1 + }> = [ + { + name: `null offset precedes concrete offset`, + left: { offset: null, subOffset: 5 }, + right: { offset: `1`, subOffset: 1 }, + expected: -1, + }, + { + name: `smaller (zero-padded) offset precedes larger`, + left: { offset: `5`, subOffset: 99 }, + right: { offset: `10`, subOffset: 1 }, + expected: -1, + }, + { + name: `same offset, smaller sub-offset precedes larger`, + left: { offset: `abc`, subOffset: 1 }, + right: { offset: `abc`, subOffset: 2 }, + expected: -1, + }, + { + name: `equal pointers compare zero`, + left: { offset: `abc`, subOffset: 3 }, + right: { offset: `abc`, subOffset: 3 }, + expected: 0, + }, + ] + for (const { name, left, right, expected } of cases) { + it(name, () => { + expect(Math.sign(comparePointers(left, right))).toBe(expected) + // Reverse argument order should flip the sign — except for the + // equal case (use `?? 0` to avoid the `-0` Object.is footgun in + // Vitest's `toBe`). + const reverseExpected = expected === 0 ? 0 : -expected + expect(Math.sign(comparePointers(right, left))).toBe(reverseExpected) + }) + } +}) + +describe(`STREAM_START_POINTER`, () => { + it(`addresses zero items past the stream-start anchor`, () => { + expect(STREAM_START_POINTER).toEqual({ offset: null, subOffset: 0 }) + }) + + it(`sorts strictly before every other pointer`, () => { + const anywhere: EventPointer = { offset: null, subOffset: 1 } + expect(comparePointers(STREAM_START_POINTER, anywhere)).toBeLessThan(0) + }) +}) diff --git a/packages/agents-runtime/test/helpers/context-test-helpers.ts b/packages/agents-runtime/test/helpers/context-test-helpers.ts index 11b50ea9fe..7cd1366bce 100644 --- a/packages/agents-runtime/test/helpers/context-test-helpers.ts +++ b/packages/agents-runtime/test/helpers/context-test-helpers.ts @@ -5,6 +5,7 @@ import { ENTITY_COLLECTIONS, builtInCollections } from '../../src/entity-schema' import { timelineToMessages } from '../../src/timeline-context' import { createLocalOnlyTestCollection } from './local-only' import type { ChangeEvent } from '@durable-streams/state' +import type { EventPointer } from '../../src/event-pointer' import type { ContextEntry, HandlerContext, @@ -40,8 +41,11 @@ type FixtureEvent = { value?: Record } -function offset(at: number): string { - return `0000000000000000_${at.toString().padStart(16, `0`)}` +function offset(at: number): EventPointer { + return { + offset: `0000000000000000_${at.toString().padStart(16, `0`)}`, + subOffset: 1, + } } function rowForFixture(item: FixtureEvent): { @@ -150,7 +154,7 @@ function rowForFixture(item: FixtureEvent): { export function buildStreamFixture(items: Array) { const rowsByCollection = new Map>>() - const offsetsByCollection = new Map>() + const offsetsByCollection = new Map>() for (const [, name] of Object.entries(ENTITY_COLLECTIONS)) { rowsByCollection.set(name, []) diff --git a/packages/agents-runtime/test/runtime-dsl.test.ts b/packages/agents-runtime/test/runtime-dsl.test.ts index fba3882df2..3e5167ce81 100644 --- a/packages/agents-runtime/test/runtime-dsl.test.ts +++ b/packages/agents-runtime/test/runtime-dsl.test.ts @@ -2,6 +2,7 @@ import { afterAll, afterEach, beforeAll, describe, expect, it } from 'vitest' import { queryOnce } from '@durable-streams/state' import { z } from 'zod' import { entity, manifestSourceKey } from '../src/index' +import { comparePointers, type EventPointer } from '../src/event-pointer' import { db } from '../src/observation-sources' import { runtimeTest } from './runtime-dsl' import type { RuntimeHistorySummaryEntry } from './runtime-dsl' @@ -307,16 +308,16 @@ function sortRowsByCollectionOrder< TRow extends { key: string | number }, >(collection: { toArray: Array - __electricRowOffsets?: Map + __electricRowOffsets?: Map }): Array { return [...collection.toArray].sort((left, right) => { - const leftOffset = collection.__electricRowOffsets?.get(left.key) - const rightOffset = collection.__electricRowOffsets?.get(right.key) - if (leftOffset && rightOffset) { - return leftOffset.localeCompare(rightOffset) + const leftPointer = collection.__electricRowOffsets?.get(left.key) + const rightPointer = collection.__electricRowOffsets?.get(right.key) + if (leftPointer && rightPointer) { + return comparePointers(leftPointer, rightPointer) } - if (leftOffset) return -1 - if (rightOffset) return 1 + if (leftPointer) return -1 + if (rightPointer) return 1 const leftSeq = Reflect.get(left, `_seq`) const rightSeq = Reflect.get(right, `_seq`) diff --git a/packages/agents-runtime/test/timeline-context.test.ts b/packages/agents-runtime/test/timeline-context.test.ts index 3400b95e16..3fe19e9ee4 100644 --- a/packages/agents-runtime/test/timeline-context.test.ts +++ b/packages/agents-runtime/test/timeline-context.test.ts @@ -10,13 +10,17 @@ import type { IncludesSignal, IncludesWakeMessage, } from '../src/entity-timeline' +import type { EventPointer } from '../src/event-pointer' function order(index: number): string { return index.toString().padStart(20, `0`) } -function offset(index: number): string { - return `0000000000000000_${index.toString().padStart(16, `0`)}` +function offset(index: number): EventPointer { + return { + offset: `0000000000000000_${index.toString().padStart(16, `0`)}`, + subOffset: 1, + } } describe(`timeline context`, () => { diff --git a/packages/agents-runtime/test/use-context-default-projection.test.ts b/packages/agents-runtime/test/use-context-default-projection.test.ts index 132c36e0a6..1ac8e95d70 100644 --- a/packages/agents-runtime/test/use-context-default-projection.test.ts +++ b/packages/agents-runtime/test/use-context-default-projection.test.ts @@ -64,11 +64,16 @@ describe(`timelineMessages default projection`, () => { ]) const messages = timelineMessages(db) + // Order token format: `stream::` — + // see `formatPointerOrderToken` in `src/event-pointer.ts`. Both + // the `superseded_at_offset` attribute and the `load(...)` arg + // round-trip through the same formatter, so the LLM sees the same + // string in both places. expect(messages[0]!.content).toContain( - `superseded_at_offset="0000000000000000_0000000000000001"` + `superseded_at_offset="stream:0000000000000000_0000000000000001:00000001"` ) expect(messages[0]!.content).toContain( - `load="load_context_history('search:a', '0000000000000000_0000000000000001')"` + `load="load_context_history('search:a', 'stream:0000000000000000_0000000000000001:00000001')"` ) expect(messages[0]!.content).toMatch(/\/>$/) expect(messages[1]!.content).toContain(`>new`) diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 3f9803c14d..78752be0b9 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -790,6 +790,7 @@ const TimelineRow = memo(function TimelineRow({ stopUserMessageKey, stopPending, onStopGeneration, + onForkFromHere, onRunSearchTextChange, }: { row: RenderTimelineRow @@ -805,6 +806,9 @@ const TimelineRow = memo(function TimelineRow({ stopUserMessageKey: string | null stopPending: boolean onStopGeneration?: () => void + /** When set on a user-message row, enables the "Fork from here" hover + * button. Caller pre-resolved the pointer; we just invoke. */ + onForkFromHere?: () => void onRunSearchTextChange: (rowKey: string, text: string) => void }): React.ReactElement { if (row.inbox) { @@ -824,6 +828,7 @@ const TimelineRow = memo(function TimelineRow({ } stopPending={stopPending} onStop={onStopGeneration} + onForkFromHere={onForkFromHere} /> ) } @@ -884,6 +889,7 @@ export function EntityTimeline({ scrollToBottomSignal = 0, stopPending = false, onStopGeneration, + forkFromHereByInboxKey, }: { rows: Array loading: boolean @@ -897,6 +903,13 @@ export function EntityTimeline({ scrollToBottomSignal?: number stopPending?: boolean onStopGeneration?: () => void + /** + * Per-inbox-row click handlers for the "Fork from here" hover button. + * The map is keyed by the row's `$key`; rows not in the map (or when + * the prop is omitted) get no fork affordance. The caller resolves + * the fork pointer and runs the fork → navigate flow. + */ + forkFromHereByInboxKey?: Map void> }): React.ReactElement { const { entitiesCollection } = useElectricAgents() const referencedEntityUrlKey = useMemo( @@ -1545,6 +1558,7 @@ export function EntityTimeline({ stopUserMessageKey={stopUserMessageKey} stopPending={stopPending} onStopGeneration={onStopGeneration} + onForkFromHere={forkFromHereByInboxKey?.get(rowKey)} onRunSearchTextChange={updateRunSearchText} /> diff --git a/packages/agents-server-ui/src/components/UserMessage.module.css b/packages/agents-server-ui/src/components/UserMessage.module.css index 1ccb1d72e5..9dddb40010 100644 --- a/packages/agents-server-ui/src/components/UserMessage.module.css +++ b/packages/agents-server-ui/src/components/UserMessage.module.css @@ -25,6 +25,46 @@ padding-right: 44px; } +/* "Fork from here" hover-revealed button. Sits in the same top-right + * slot as `.stopButton`, but they're mutually exclusive: while a run is + * generating we show stop; otherwise (eligible messages) we show fork. + * Opacity-toggled rather than display-toggled so the layout doesn't + * shift on hover. Always visible to keyboard users via :focus-within. */ +.forkButton { + all: unset; + position: absolute; + top: 8px; + right: 8px; + display: inline-flex; + align-items: center; + justify-content: center; + width: 24px; + height: 24px; + border-radius: var(--ds-radius-full); + background: var(--ds-gray-a3); + color: var(--ds-gray-12); + cursor: pointer; + opacity: 0; + transition: + opacity 0.12s ease, + background 0.12s ease; +} + +.bubble:hover .forkButton, +.bubble:focus-within .forkButton, +.forkButton:focus-visible { + opacity: 1; +} + +.forkButton:hover { + background: var(--ds-gray-a4); +} + +.forkButton:focus-visible { + outline: 2px solid var(--ds-accent-a6); + outline-offset: 2px; +} + .stopButton { all: unset; position: absolute; diff --git a/packages/agents-server-ui/src/components/UserMessage.tsx b/packages/agents-server-ui/src/components/UserMessage.tsx index 958360d2c1..f5b7b06682 100644 --- a/packages/agents-server-ui/src/components/UserMessage.tsx +++ b/packages/agents-server-ui/src/components/UserMessage.tsx @@ -2,6 +2,7 @@ import { memo, useState } from 'react' import { Download, File as FileIcon, + GitFork, Image as ImageIcon, Square, } from 'lucide-react' @@ -35,12 +36,20 @@ export const UserMessage = memo(function UserMessage({ showStop = false, stopPending = false, onStop, + onForkFromHere, }: { section: UserMessageSection attachments?: Array showStop?: boolean stopPending?: boolean onStop?: () => void + /** + * When provided, renders a hover-revealed "Fork from here" button on + * the bubble. The caller is responsible for eligibility — pass + * `undefined` for messages where no preceding completed run anchors a + * fork. + */ + onForkFromHere?: () => void }): React.ReactElement { const sender = formatSender(section.from) @@ -75,6 +84,17 @@ export const UserMessage = memo(function UserMessage({ )} + {!showStop && onForkFromHere && ( + + )} {attachments.length > 0 && (
{attachments.map((attachment) => ( diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 99ca79a8bd..71ab749d03 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -9,6 +9,7 @@ import { useElectricAgents } from '../../lib/ElectricAgentsProvider' import { schemaModelSupportsImageInput } from '../../lib/modelCapabilities' import type { ViewProps } from '../../lib/workspace/viewRegistry' import type { EntityTimelineQueryRow } from '@electric-ax/agents-runtime/client' +import type { EventPointer } from '@electric-ax/agents-runtime' import type { OptimisticInboxMessage } from '../../lib/sendMessage' /** @@ -142,7 +143,8 @@ function GenericChatBody({ loading, error, } = useEntityTimeline(baseUrl || null, entityUrl) - const { signalEntity, entityTypesCollection } = useElectricAgents() + const { signalEntity, forkEntity, entityTypesCollection } = + useElectricAgents() const navigate = useNavigate() const [sentMessageSignal, setSentMessageSignal] = useState(0) const [stopPending, setStopPending] = useState(false) @@ -227,6 +229,42 @@ function GenericChatBody({ }) }, [entityUrl, generationActive, signalEntity, stopPending]) + // "Fork from here" anchor map. For each user-message inbox row, the + // anchor is the LATEST preceding completed `runs` row — its pointer + // identifies "fork up to and including this response, drop everything + // after." Rows without a preceding completed run (first message, + // in-flight run, etc.) get no entry, which suppresses the affordance + // in UserMessage. + const forkFromHereByInboxKey = useMemo(() => { + if (!forkEntity || !entityUrl || !db) return undefined + const runOffsets = db.collections.runs.__electricRowOffsets + if (!runOffsets) return undefined + const map = new Map void>() + let anchor: EventPointer | null = null + for (const row of timelineRowsWithInlinePending) { + if (row.run && row.run.status === `completed`) { + const pointer = runOffsets.get(row.run.key) + if (pointer) anchor = pointer + } + if (row.inbox && anchor) { + const capturedAnchor = anchor + map.set(row.$key, () => { + // forkEntity surfaces failures via a danger toast before + // rejecting, so the caller just needs to swallow the rejection. + void forkEntity(entityUrl, { pointer: capturedAnchor }) + .then((res) => + navigate({ + to: `/entity/$`, + params: { _splat: res.url.replace(/^\//, ``) }, + }) + ) + .catch(() => {}) + }) + } + } + return map + }, [timelineRowsWithInlinePending, db, forkEntity, entityUrl, navigate]) + return ( <> => { - const res = await serverFetch(entityApiUrl(baseUrl, entityUrl, `/fork`), { - method: `POST`, - headers: { 'content-type': `application/json` }, - body: JSON.stringify({}), - }) + return async ( + entityUrl: string, + opts?: { pointer?: EventPointer } + ): Promise<{ url: string }> => { + // Wire convention is snake_case; in-code TS is camelCase. + const body = opts?.pointer + ? { + fork_pointer: { + offset: opts.pointer.offset, + sub_offset: opts.pointer.subOffset, + }, + } + : {} + const url = entityApiUrl(baseUrl, entityUrl, `/fork`) + let res: Response + try { + res = await serverFetch(url, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify(body), + }) + } catch (err) { + showForkFailureToast({ entityUrl, error: err }) + throw err + } if (!res.ok) { const text = await res.text().catch(() => ``) - let message = text || `Fork failed (${res.status})` - try { - const data = JSON.parse(text) as { - error?: { message?: string } - message?: string - } - message = data.error?.message ?? data.message ?? message - } catch { - // Keep the raw response text. - } + showForkFailureToast({ + entityUrl, + status: res.status, + responseText: text, + }) + const message = parseErrorResponse(text) ?? `Fork failed (${res.status})` throw new Error(message) } const data = (await res.json()) as { root?: { url?: string } } if (!data.root?.url) { - throw new Error(`Fork returned an invalid response`) + const message = `Fork returned an invalid response` + showForkFailureToast({ entityUrl, error: message }) + throw new Error(message) } return { url: data.root.url } } diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 184efbeac9..926dc5006d 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -12,6 +12,7 @@ import { manifestSourceKey, resolveCronScheduleSpec, } from '@electric-ax/agents-runtime' +import type { EventPointer } from '@electric-ax/agents-runtime' import { ErrCodeDuplicateURL, ErrCodeEntityPersistFailed, @@ -161,6 +162,16 @@ type ForkSubtreeOptions = { rootInstanceId?: string waitTimeoutMs?: number waitPollMs?: number + /** + * Optional anchor pointing at an event on the source root's `main` stream. + * When set: only events at or before the pointer are kept on the root's + * forked `main`, and the root's manifest is filtered so that descendants + * spawned after the pointer are dropped from the fork (their now-orphan + * subtrees are not forked). The pointer applies only to the root's + * `main` stream — `error` and shared-state streams clone at HEAD + * regardless. + */ + forkPointer?: EventPointer } type ForkEntityPlan = { @@ -873,7 +884,36 @@ export class EntityManager { const writeStreamLocks = new Set() try { - const sourceTree = await this.waitForIdleSubtree(rootUrl, opts, workLocks) + // For pointer-forks we read the source root HISTORICALLY at a + // frozen offset, so concurrent activity on the root past the + // pointer can't tear our snapshot — we don't need to wait for + // the root to be idle (which would block the "click fork right + // after the response landed" case, since the runtime keeps the + // worker warm for `idleTimeout`). We still wait+lock any kept + // descendants below (after `computeEffectiveSubtree` runs), since + // those are HEAD-cloned and need a stable snapshot. For HEAD-forks + // the old all-idle requirement still applies. + let sourceTree: Array + if (opts.forkPointer) { + const rootEntity = await this.registry.getEntity(rootUrl) + if (!rootEntity) { + throw new ElectricAgentsError( + ErrCodeNotFound, + `Entity not found`, + 404 + ) + } + if (isTerminalEntityStatus(rootEntity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Cannot fork terminal entity "${rootEntity.url}"`, + 409 + ) + } + sourceTree = await this.listEntitySubtree(rootEntity) + } else { + sourceTree = await this.waitForIdleSubtree(rootUrl, opts, workLocks) + } const sourceRoot = sourceTree[0]! if (sourceRoot.parent) { throw new ElectricAgentsError( @@ -883,9 +923,107 @@ export class EntityManager { ) } - const snapshot = await this.readForkStateSnapshot(sourceTree) + // When forking at a pointer, pre-read the root's main, validate the + // pointer against the source's true history, and materialise the + // root-at-pointer snapshot fragments. The pointer only applies to + // the root's `main` stream. Descendants kept by the manifest filter + // are forked at HEAD. + // + // Pointer→position translation: the runtime mints pointers as + // `{ offset: previousBatchOffset, subOffset: itemIndex+1 }`, where + // the anchor offset is the END of the delivery batch that + // PRECEDED the targeted event. The durable-streams server + // interprets `{ X, N }` as "from offset X, take N flattened + // messages forward" — independent of how delivery is chunked. We + // mirror that interpretation here by translating the pointer to a + // 1-indexed CUMULATIVE position in the source's flattened + // history, then filtering events with position ≤ that target. + let preFilteredRoot: + | { + manifests: Map> + childStatuses: Map> + replayWatermarks: Map> + sharedStateIds: Set + } + | undefined + if (opts.forkPointer) { + const sourceEvents = await this.streamClient.readJson< + Record + >(sourceRoot.streams.main) + const flat = sourceEvents.flatMap((item) => + Array.isArray(item) ? item : [item] + ) as Array> + const target = this.resolveForkPointerTarget( + flat, + opts.forkPointer, + sourceRoot.streams.main + ) + const filteredEvents = flat.slice(0, target) + const rootManifests = this.reduceStateRows(filteredEvents, `manifest`) + const sharedStateIds = new Set() + for (const manifest of rootManifests.values()) { + this.collectSharedStateIds(manifest, sharedStateIds) + } + preFilteredRoot = { + manifests: rootManifests, + childStatuses: this.reduceStateRows(filteredEvents, `child_status`), + replayWatermarks: this.reduceStateRows( + filteredEvents, + `replay_watermark` + ), + sharedStateIds, + } + } + + const effectiveSubtree = preFilteredRoot + ? this.computeEffectiveSubtree( + sourceTree, + sourceRoot.url, + preFilteredRoot.manifests + ) + : sourceTree + + // For pointer-forks, kept descendants (everything in the effective + // subtree except the root) are HEAD-cloned, so they must be idle + // before we read their snapshots. Wait+lock only those — the root + // was skipped above. + if (opts.forkPointer) { + const descendants = effectiveSubtree.filter( + (entity) => entity.url !== sourceRoot.url + ) + if (descendants.length > 0) { + await this.waitForGivenEntitiesIdle(descendants, opts, workLocks) + } + } + + const snapshot = await this.readForkStateSnapshot( + // Skip the root when we've already pre-filtered it — avoid both a + // wasted HEAD read of main and a re-population that would clobber + // the filtered entries. + preFilteredRoot + ? effectiveSubtree.filter((entity) => entity.url !== sourceRoot.url) + : effectiveSubtree + ) + if (preFilteredRoot) { + snapshot.manifestsByEntity.set( + sourceRoot.url, + preFilteredRoot.manifests + ) + snapshot.childStatusesByEntity.set( + sourceRoot.url, + preFilteredRoot.childStatuses + ) + snapshot.replayWatermarksByEntity.set( + sourceRoot.url, + preFilteredRoot.replayWatermarks + ) + for (const id of preFilteredRoot.sharedStateIds) { + snapshot.sharedStateIds.add(id) + } + } + const suffix = randomUUID().slice(0, 8) - const entityUrlMap = await this.buildForkEntityUrlMap(sourceTree, { + const entityUrlMap = await this.buildForkEntityUrlMap(effectiveSubtree, { suffix, rootUrl, rootInstanceId: opts.rootInstanceId, @@ -896,14 +1034,14 @@ export class EntityManager { ) const stringMap = this.buildForkStringMap(entityUrlMap, sharedStateIdMap) const entityPlans = this.buildForkEntityPlans( - sourceTree, + effectiveSubtree, entityUrlMap, stringMap ) this.addForkLocks( this.forkWriteLockedEntities, - sourceTree.map((entity) => entity.url), + effectiveSubtree.map((entity) => entity.url), writeEntityLocks ) this.addForkLocks( @@ -921,11 +1059,17 @@ export class EntityManager { try { for (const plan of entityPlans) { + const isRoot = plan.source.url === rootUrl await this.streamClient.fork( plan.fork.streams.main, - plan.source.streams.main + plan.source.streams.main, + isRoot && opts.forkPointer + ? { forkPointer: opts.forkPointer } + : undefined ) createdStreams.push(plan.fork.streams.main) + // `error` always clones at HEAD — no canonical mapping + // between main-offset and error-offset. await this.streamClient.fork( plan.fork.streams.error, plan.source.streams.error @@ -1081,6 +1225,73 @@ export class EntityManager { held.clear() } + /** + * Variant of {@link waitForIdleSubtree} that takes an explicit entity + * list instead of walking the registry from `rootUrl`. Used by the + * pointer-fork path to wait+lock only the kept descendants, since + * the root is being forked from history and doesn't need to be idle. + */ + private async waitForGivenEntitiesIdle( + entities: ReadonlyArray, + opts: ForkSubtreeOptions, + workLocks: Set + ): Promise { + if (entities.length === 0) return + + const timeoutMs = opts.waitTimeoutMs ?? DEFAULT_FORK_WAIT_TIMEOUT_MS + const pollMs = opts.waitPollMs ?? DEFAULT_FORK_WAIT_POLL_MS + + const refresh = async (): Promise> => { + const refreshed = await Promise.all( + entities.map((entity) => this.registry.getEntity(entity.url)) + ) + return refreshed.filter( + (entity): entity is ElectricAgentsEntity => !!entity + ) + } + + const deadline = Date.now() + timeoutMs + while (true) { + const present = await refresh() + const stopped = present.find((entity) => + isTerminalEntityStatus(entity.status) + ) + if (stopped) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Cannot fork terminal entity "${stopped.url}"`, + 409 + ) + } + let active = present.filter( + (entity) => entity.status !== `idle` && entity.status !== `paused` + ) + if (active.length === 0) { + this.addForkLocks( + this.forkWorkLockedEntities, + present.map((entity) => entity.url), + workLocks + ) + const reChecked = await refresh() + const reActive = reChecked.filter( + (entity) => entity.status !== `idle` && entity.status !== `paused` + ) + if (reActive.length === 0) return + this.releaseForkLocks(this.forkWorkLockedEntities, workLocks) + active = reActive + } + if (Date.now() >= deadline) { + throw new ElectricAgentsError( + ErrCodeForkWaitTimeout, + `Timed out waiting for descendants to become idle`, + 409, + { active: active.map((entity) => entity.url) } + ) + } + await sleep(Math.min(pollMs, Math.max(0, deadline - Date.now()))) + } + } + private async waitForIdleSubtree( rootUrl: string, opts: ForkSubtreeOptions, @@ -1175,6 +1386,116 @@ export class EntityManager { } } + /** + * Translate `forkPointer` into a 1-indexed CUMULATIVE position in the + * source's flattened history. Throws a 400 if the pointer doesn't + * address a real event. + * + * Semantics (mirroring the durable-streams server interpretation): + * `{ offset: X, subOffset: N }` means "from anchor X, take N flattened + * messages forward." Concretely, the target event is the N-th event + * after the last event whose `headers.offset` is ≤ X. (When `X` is + * `null`, the anchor is the stream start and the target is the N-th + * event from the very beginning.) The returned position is the count + * of events to KEEP — events 1..position survive the filter. + * + * A pointer is valid when: + * - `pointer.offset` is `null` (stream start) OR matches some + * event's `headers.offset` value, AND + * - `pointer.subOffset` is in `[1, total events past the anchor]`. + */ + private resolveForkPointerTarget( + events: ReadonlyArray>, + pointer: EventPointer, + streamPath: string + ): number { + // Count events at-or-before the anchor and validate the anchor exists. + // `pointer.offset === null` is the stream-start anchor — no events + // precede it, so `positionAtAnchor` stays at 0. + let positionAtAnchor = 0 + let anchorSeen = pointer.offset === null + for (const event of events) { + const headers = isRecord(event.headers) ? event.headers : undefined + const eventOffset = + typeof headers?.offset === `string` ? headers.offset : undefined + if (eventOffset === undefined) continue + if (pointer.offset === null) continue + if (eventOffset === pointer.offset) anchorSeen = true + if (eventOffset <= pointer.offset) positionAtAnchor++ + } + if (!anchorSeen) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `fork_pointer.offset (${pointer.offset ?? ``}) does not match any event's Stream-Next-Offset on ${streamPath}`, + 400 + ) + } + const eventsPastAnchor = events.length - positionAtAnchor + if (pointer.subOffset < 1 || pointer.subOffset > eventsPastAnchor) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `fork_pointer.sub_offset ${pointer.subOffset} out of range past anchor on ${streamPath} (valid: 1..${eventsPastAnchor})`, + 400 + ) + } + return positionAtAnchor + pointer.subOffset + } + + /** + * Compute the subset of `sourceTree` that survives the manifest filter + * applied at the root. After filtering the root's manifest at the fork + * pointer, only children whose manifest entries landed at or before the + * pointer remain; those kept children carry their CURRENT (HEAD) subtree + * along with them. Children dropped from the root's manifest, and any + * of their descendants, are excluded. + */ + private computeEffectiveSubtree( + sourceTree: ReadonlyArray, + rootUrl: string, + filteredRootManifests: ReadonlyMap> + ): Array { + const keptChildUrls = new Set() + for (const value of filteredRootManifests.values()) { + if (value.kind === `child` && typeof value.entity_url === `string`) { + keptChildUrls.add(value.entity_url) + } + } + + const childrenByParent = new Map>() + for (const entity of sourceTree) { + if (!entity.parent) continue + const list = childrenByParent.get(entity.parent) ?? [] + list.push(entity) + childrenByParent.set(entity.parent, list) + } + + const rootEntity = sourceTree.find((e) => e.url === rootUrl) + if (!rootEntity) return [] + + const result: Array = [rootEntity] + const queue: Array = [] + for (const child of childrenByParent.get(rootUrl) ?? []) { + if (keptChildUrls.has(child.url)) { + queue.push(child) + } + } + const seen = new Set([rootUrl]) + while (queue.length > 0) { + const entity = queue.shift()! + if (seen.has(entity.url)) continue + seen.add(entity.url) + result.push(entity) + // Below the kept-children level the existing recursive subtree is + // included unchanged — kept descendants are HEAD-cloned. + for (const grandchild of childrenByParent.get(entity.url) ?? []) { + if (!seen.has(grandchild.url)) { + queue.push(grandchild) + } + } + } + return result + } + private async listEntitySubtree( root: ElectricAgentsEntity ): Promise> { diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index b1edb7f11e..15cd501486 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -135,6 +135,15 @@ const inboxMessageBodySchema = Type.Object({ const forkBodySchema = Type.Object({ instance_id: Type.Optional(Type.String()), waitTimeoutMs: Type.Optional(Type.Number()), + // Optional anchor pointing at an event on the source root's `main` + // stream. Wire shape is snake_case; the route handler translates to + // camelCase before forwarding to entity-manager. + fork_pointer: Type.Optional( + Type.Object({ + offset: Type.Union([Type.String(), Type.Null()]), + sub_offset: Type.Number(), + }) + ), }) const setTagBodySchema = Type.Object({ @@ -794,6 +803,12 @@ async function forkEntity( const result = await ctx.entityManager.forkSubtree(entityUrl, { rootInstanceId: parsed.instance_id, waitTimeoutMs: parsed.waitTimeoutMs, + ...(parsed.fork_pointer && { + forkPointer: { + offset: parsed.fork_pointer.offset, + subOffset: parsed.fork_pointer.sub_offset, + }, + }), }) for (const forkedEntity of result.entities) { await linkEntityDispatchSubscription(ctx, forkedEntity) diff --git a/packages/agents-server/src/stream-client.ts b/packages/agents-server/src/stream-client.ts index 3385be3c83..96e92de279 100644 --- a/packages/agents-server/src/stream-client.ts +++ b/packages/agents-server/src/stream-client.ts @@ -4,6 +4,7 @@ import { FetchError, IdempotentProducer, } from '@durable-streams/client' +import type { EventPointer } from '@electric-ax/agents-runtime' import { ErrCodeNotFound } from './electric-agents-types.js' import { ATTR, injectTraceHeaders, withSpan } from './tracing.js' import type { HeadersRecord, MaybePromise } from '@durable-streams/client' @@ -242,7 +243,11 @@ export class StreamClient { }) } - async fork(path: string, sourcePath: string): Promise { + async fork( + path: string, + sourcePath: string, + opts?: { forkPointer?: EventPointer } + ): Promise { return await withSpan(`stream.fork`, async (span) => { span.setAttributes({ [ATTR.STREAM_PATH]: path, @@ -252,6 +257,17 @@ export class StreamClient { 'content-type': `application/json`, 'Stream-Forked-From': new URL(this.streamUrl(sourcePath)).pathname, } + if (opts?.forkPointer) { + // The durable-streams server returns 400 if Stream-Fork-Sub-Offset + // > 0 without an accompanying Stream-Fork-Offset. When our + // pointer's offset is `null` (anchor at stream start), send the + // explicit zero-offset string to satisfy that constraint. + const ZERO_OFFSET = `0000000000000000_0000000000000000` + headers[`Stream-Fork-Offset`] = opts.forkPointer.offset ?? ZERO_OFFSET + if (opts.forkPointer.subOffset > 0) { + headers[`Stream-Fork-Sub-Offset`] = String(opts.forkPointer.subOffset) + } + } injectTraceHeaders(headers) const response = await fetch(this.streamUrl(path), { diff --git a/packages/agents-server/test/stream-client-fork.test.ts b/packages/agents-server/test/stream-client-fork.test.ts index d50acac5a1..40a440def2 100644 --- a/packages/agents-server/test/stream-client-fork.test.ts +++ b/packages/agents-server/test/stream-client-fork.test.ts @@ -53,4 +53,28 @@ describe(`StreamClient.fork`, () => { sourceEvent, ]) }) + + it(`forks at a sub-offset to truncate source history`, async () => { + // The durable-streams TS server treats one POST as one message, + // regardless of content type — so writing the source as a single + // body=JSON.stringify([a,b,c]) creates ONE flattened message of + // three JSON values. Sub-offset 2 then slices that to two values. + await client.create(`/source-sub-offset`, { + contentType: `application/json`, + body: JSON.stringify([ + { key: `a`, value: 1 }, + { key: `b`, value: 2 }, + { key: `c`, value: 3 }, + ]), + }) + + await client.fork(`/fork-truncated`, `/source-sub-offset`, { + forkPointer: { offset: null, subOffset: 2 }, + }) + + await expect(client.readJson(`/fork-truncated`)).resolves.toEqual([ + { key: `a`, value: 1 }, + { key: `b`, value: 2 }, + ]) + }) })