Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fork-at-message-pin-bumps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@electric-ax/agents': patch
'@electric-ax/agents-server-conformance-tests': patch
'electric-ax': patch
---

Bump `@durable-streams/{client,server,state}` pins in step with `@electric-ax/agents-runtime` and `@electric-ax/agents-server` to pick up the fork-at-pointer (`Stream-Fork-Offset` + `Stream-Fork-Sub-Offset`) wire protocol that the new fork-at-message UX depends on. No other code changes in these packages.
5 changes: 5 additions & 0 deletions .changeset/fork-at-message-runtime.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-runtime': patch
---

Add `EventPointer { offset, subOffset }` for addressing single events on a durable stream. Widen `__electricRowOffsets` side-tables on `EntityStreamDB` collections from `Map<key, string>` to `Map<key, EventPointer>`, with pointers minted along log-entry boundaries (grouped by each item's `headers.offset`) so they round-trip cleanly through `Stream-Fork-Sub-Offset` regardless of how a live read is chunked.
6 changes: 6 additions & 0 deletions .changeset/fork-at-message-server-ui.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@electric-ax/agents-server': patch
'@electric-ax/agents-server-ui': patch
---

Fork at an earlier message instead of only at HEAD. `POST /_electric/entities/<type>/<id>/fork` accepts an optional `fork_pointer: { offset, sub_offset }` (snake_case wire) that truncates the new entity's `main` stream up to and including the chosen event; `error` and shared-state streams still clone at HEAD; the root's manifest is filtered so descendants spawned after the pointer are dropped from the fork along with their subtrees. Pointer-forks skip the all-subtree-idle wait on the root (the historical read can't be torn by concurrent writes past the pointer), so the affordance works during the post-run keep-alive window. UI: hover-revealed "Fork from here" button on user-message bubbles in `ChatView`, anchored to the latest preceding completed `runs` row; suppressed on the first message and while a run is in flight.
11 changes: 8 additions & 3 deletions packages/agents-runtime/src/context-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { queryOnce } from '@durable-streams/state'
import { assembleContext } from './context-assembly'
import { createContextEntriesApi } from './context-entries'
import { entityStateSchema } from './entity-schema'
import { formatPointerOrderToken } from './event-pointer'
import { createOutboundBridge, loadOutboundIdSeed } from './outbound-bridge'
import { createPiAgentAdapter } from './pi-adapter'
import {
Expand Down Expand Up @@ -437,9 +438,13 @@ export function createHandlerContext<TState extends StateProxy = StateProxy>(

function readContextHistoryOffset(row: { key: string }): string | undefined {
const contextInserted = config.db.collections.contextInserted
const rowOffset = contextInserted.__electricRowOffsets?.get(row.key)
if (typeof rowOffset === `string`) {
return rowOffset
const pointer = contextInserted.__electricRowOffsets?.get(row.key)
if (pointer) {
// Format the pointer as a stable, sortable string. Matches the
// `_timeline_order` produced by `entity-stream-db` so that
// `loadContextHistory(id, offset)` can round-trip lookups
// against the same row.
return formatPointerOrderToken(pointer)
}

const seq = Reflect.get(row, `_seq`)
Expand Down
79 changes: 63 additions & 16 deletions packages/agents-runtime/src/entity-stream-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
isControlEvent,
} from '@durable-streams/state'
import { builtInCollections, passthrough } from './entity-schema'
import { formatPointerOrderToken, type EventPointer } from './event-pointer'
import type {
ActionDefinition,
ChangeEvent,
Expand Down Expand Up @@ -45,7 +46,7 @@ interface EntityWriteUtils {
type EntityCollectionMeta = {
__electricSourceDb?: EntityStreamDBWithActions
__electricSourceId?: string
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
__electricTimelineOrders?: Map<string | number, string>
}

Expand Down Expand Up @@ -99,14 +100,6 @@ type EntityStreamDBOptions = {
)

const WRITE_TXID_TIMEOUT_MS = 20_000
const TIMELINE_OFFSET_WIDTH = 24
const TIMELINE_BATCH_INDEX_WIDTH = 8

function formatStreamTimelineOrder(offset: string, index: number): string {
return `stream:${offset.padStart(TIMELINE_OFFSET_WIDTH, `0`)}:${index
.toString()
.padStart(TIMELINE_BATCH_INDEX_WIDTH, `0`)}`
}

/**
* Create a StreamDB connected to a Electric Agents entity stream.
Expand Down Expand Up @@ -145,7 +138,10 @@ export function createEntityStreamDB(
...streamCustomState,
}
const collectionNameByEventType = new Map<string, string>()
const rowOffsetsByCollection = new Map<string, Map<string | number, string>>()
const rowOffsetsByCollection = new Map<
string,
Map<string | number, EventPointer>
>()
const timelineOrdersByCollection = new Map<
string,
Map<string | number, string>
Expand All @@ -155,6 +151,12 @@ export function createEntityStreamDB(
rowOffsetsByCollection.set(name, new Map())
timelineOrdersByCollection.set(name, new Map())
}

// Tracks the END offset of the previous batch — i.e. the START
// offset of the next batch's items, which is the anchor we pair
// with each item's sub-offset to form an `EventPointer`. `null`
// before any batch has arrived (anchor at stream start).
let previousBatchOffset: string | null = null
// Build a reverse map from TanStack DB collection id to schema key
const collIdToSchemaKey: Record<string, string> = {}
for (const name of Object.keys(mergedCollections)) {
Expand Down Expand Up @@ -285,7 +287,21 @@ export function createEntityStreamDB(
onBeforeBatch: (batch) => {
opts?.onBeforeBatch?.(batch)
replayBatchOffset.current = batch.offset
batch.items.forEach((item, itemIndex) => {
// `Stream-Fork-Sub-Offset` addresses items WITHIN A SINGLE LOG
// ENTRY (the first log entry past the anchor), not items globally
// past the anchor. To mint server-compatible pointers we group
// items in the batch by their `headers.offset` (= the end offset
// of the log entry that produced them, stable at write time).
// Each contiguous group of items sharing an `headers.offset` is
// one log entry; within it sub-offsets are 1..K. The anchor
// offset for that group is the END offset of the PRECEDING log
// entry — either the previous distinct `headers.offset` we saw
// in this batch, or `previousBatchOffset` for the first group in
// a fresh batch.
let currentEntryOffset: string | null = null
let priorEntryOffset: string | null = previousBatchOffset
let positionInEntry = 0
batch.items.forEach((item) => {
if (isControlEvent(item)) {
if (item.headers.control === `reset`) {
for (const offsets of rowOffsetsByCollection.values()) {
Expand All @@ -294,14 +310,34 @@ export function createEntityStreamDB(
for (const orders of timelineOrdersByCollection.values()) {
orders.clear()
}
previousBatchOffset = null
currentEntryOffset = null
priorEntryOffset = null
positionInEntry = 0
}
return
}
if (!isChangeEvent(item)) return
const collectionName = collectionNameByEventType.get(item.type)
if (!collectionName) return
const offset = item.headers.offset ?? batch.offset
rowOffsetsByCollection.get(collectionName)?.set(item.key, offset)

const itemEntryOffset =
typeof (item.headers as Record<string, unknown>).offset === `string`
? ((item.headers as Record<string, unknown>).offset as string)
: null
if (itemEntryOffset !== currentEntryOffset) {
// Boundary into a new log entry.
priorEntryOffset = currentEntryOffset ?? previousBatchOffset
currentEntryOffset = itemEntryOffset
positionInEntry = 0
}
positionInEntry++

const pointer: EventPointer = {
offset: priorEntryOffset,
subOffset: positionInEntry,
}
rowOffsetsByCollection.get(collectionName)?.set(item.key, pointer)

if (item.headers.operation === `delete`) return
if (typeof item.value !== `object` || item.value === null) return
Expand All @@ -310,11 +346,16 @@ export function createEntityStreamDB(
if (!orders) return
let order = orders.get(item.key)
if (!order) {
order = formatStreamTimelineOrder(offset, itemIndex)
order = formatPointerOrderToken(pointer)
orders.set(item.key, order)
}
;(item.value as Record<string, unknown>)._timeline_order = order
})
// After processing the batch, advance the anchor for next time.
// `batch.offset` is the `Stream-Next-Offset` for this batch —
// i.e. the cursor that the NEXT batch's items will be addressed
// relative to.
previousBatchOffset = batch.offset
},
onBatch: (batch) => {
opts?.onBatch?.(batch)
Expand Down Expand Up @@ -669,11 +710,17 @@ export function createEntityStreamDB(
event.value !== null
) {
const orders = timelineOrdersByCollection.get(collectionName)
// applyEvent stages an in-process event (not delivered through
// a wire batch). It carries a single event, so the pointer's
// sub-offset is always 1 ("the one item past this anchor").
// If no real offset is available, synthesize a monotonically-
// increasing `local:...` token so successive applyEvent calls
// still sort in invocation order.
const offset =
event.headers.offset ??
`local:${Date.now().toString().padStart(13, `0`)}`
const order =
orders?.get(event.key) ?? formatStreamTimelineOrder(offset, 0)
const pointer: EventPointer = { offset, subOffset: 1 }
const order = orders?.get(event.key) ?? formatPointerOrderToken(pointer)
orders?.set(event.key, order)
;(event.value as Record<string, unknown>)._timeline_order = order
}
Expand Down
38 changes: 21 additions & 17 deletions packages/agents-runtime/src/entity-timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
QueryBuilder,
} from '@tanstack/db'
import type { EntityStreamDB } from './entity-stream-db'
import { formatPointerOrderToken, type EventPointer } from './event-pointer'
import type { ChildStatusEntry, MessageReceived, Signal } from './entity-schema'
import type { ManifestEntry, Wake, WakeMessage } from './types'

Expand Down Expand Up @@ -500,7 +501,7 @@ function readRequiredOrderToken<TRow extends { key: string | number }>(
collection: {
id?: string
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
},
row: TRow,
index: number
Expand All @@ -510,9 +511,9 @@ function readRequiredOrderToken<TRow extends { key: string | number }>(
return timelineOrder
}

const offset = collection.__electricRowOffsets?.get(row.key)
if (offset) {
return `offset:${offset}`
const pointer = collection.__electricRowOffsets?.get(row.key)
if (pointer) {
return formatPointerOrderToken(pointer)
}

const inlineSeq = readInlineSeq(row)
Expand All @@ -526,7 +527,7 @@ function readRequiredOrderToken<TRow extends { key: string | number }>(
function readOptionalOrderToken<TRow extends { key: string | number }>(
collection: {
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
},
row: TRow
): string | undefined {
Expand All @@ -535,9 +536,9 @@ function readOptionalOrderToken<TRow extends { key: string | number }>(
return timelineOrder
}

const offset = collection.__electricRowOffsets?.get(row.key)
if (offset) {
return `offset:${offset}`
const pointer = collection.__electricRowOffsets?.get(row.key)
if (pointer) {
return formatPointerOrderToken(pointer)
}

const inlineSeq = readInlineSeq(row)
Expand All @@ -547,7 +548,7 @@ function readOptionalOrderToken<TRow extends { key: string | number }>(
function withOrderToken<TRow extends { key: string | number }>(collection: {
id?: string
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
}): Array<OrderedRow<TRow>> {
return collection.toArray.map((row, index) => ({
...row,
Expand All @@ -559,7 +560,7 @@ function withOptionalOrderToken<
TRow extends { key: string | number },
>(collection: {
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
}): Array<TRow & { _orderToken?: string }> {
return collection.toArray.map((row) => {
const orderToken = readOptionalOrderToken(collection, row)
Expand All @@ -574,14 +575,14 @@ function getOrderableCollection<TRow extends { key: string | number }>(
| {
id?: string
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
}
| undefined,
id: string
): {
id?: string
toArray: Array<TRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
} {
if (!collection) {
throw new Error(
Expand Down Expand Up @@ -617,9 +618,12 @@ function withoutOrderToken<TRow extends object & { _orderToken?: string }>(
}

function orderTokenToHistoryOffset(orderToken: string): string {
return orderToken.startsWith(`offset:`)
? orderToken.slice(`offset:`.length)
: orderToken
// The order token is already a stable, sortable string representation
// of an `EventPointer` (or a `_seq` / `pending` fallback). Round-trip
// semantics are maintained as long as every callsite that produces
// a historyOffset goes through the same formatter — see
// `readContextHistoryOffset` in `context-factory.ts`.
return orderToken
}

function withOrderFromOrderIndex<TRow extends object & { _orderToken: string }>(
Expand Down Expand Up @@ -973,7 +977,7 @@ export function buildEntityTimelineData(
| {
id?: string
toArray: Array<ContextInsertedValueRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
}
| undefined,
`contextInserted`
Expand All @@ -985,7 +989,7 @@ export function buildEntityTimelineData(
| {
id?: string
toArray: Array<ContextRemovedValueRow>
__electricRowOffsets?: Map<string | number, string>
__electricRowOffsets?: Map<string | number, EventPointer>
}
| undefined,
`contextRemoved`
Expand Down
Loading
Loading