diff --git a/.changeset/pg-sync-observation-support.md b/.changeset/pg-sync-observation-support.md new file mode 100644 index 0000000000..e7792ff910 --- /dev/null +++ b/.changeset/pg-sync-observation-support.md @@ -0,0 +1,8 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents': patch +'@electric-ax/agents-desktop': patch +--- + +Add pg-sync observation source enabling agents to observe Electric Postgres shape streams and wake on matching row changes (insert/update/delete). Includes server-side bridge management with cursor persistence, durable stream forwarding, and an `observe_pg_sync` tool for Horton agents. diff --git a/.gitignore b/.gitignore index a3d3b4a31e..9a93ba75a1 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ _artifacts .opentrace-index.log .dev-logs/ .streams-data/ +.desktop-data/ # npm local cache/logs .npm/ diff --git a/docs/agents-development.md b/docs/agents-development.md index ab3b1a0777..5907fc7495 100644 --- a/docs/agents-development.md +++ b/docs/agents-development.md @@ -30,6 +30,8 @@ For day-to-day development, use the bundled dev script: ./scripts/dev.sh start --detach # same, but exits after spawning (logs to .dev-logs/) ./scripts/dev.sh start --with-agents # also spawn built-in agents (Horton + Worker) ./scripts/dev.sh desktop # run the Electron desktop app in this terminal +./scripts/dev.sh isolated # run an isolated stack on random ports and + # open Electron desktop against it ./scripts/dev.sh stop # stop processes + docker compose down ./scripts/dev.sh teardown # stop + remove Postgres volume + .streams-data/ ./scripts/dev.sh status # show which services are running @@ -37,6 +39,8 @@ For day-to-day development, use the bundled dev script: `desktop` is a separate command because the Electron app is interactive — it opens a window. Run it in its own terminal after `start` has the rest of the stack up; Ctrl-C in that terminal closes the app without touching the backing services. +`isolated` is the one-command path for testing a worktree or PR without conflicting with another running stack. It chooses random free ports for Postgres, Electric, Jaeger, agents-server, built-in agents, server UI, and desktop UI; uses a branch-based Docker Compose project name (`agents-`) so containers are easy to identify and clean up in Docker Desktop; sets an isolated durable-streams data directory and Electron user data directory per run; starts Horton/Worker by default; and opens the Electron desktop app. Ctrl-C tears the isolated stack down, including Docker volumes. Use `--no-build` to skip the initial package build or `--no-agents` to skip built-in agents. + `build` covers `typescript-client`, `agents-runtime`, `agents-mcp`, `agents-server`, and `agents`. Re-run it after any dep change before restarting — entrypoints do not auto-restart on `dist/` rebuilds. **Built-in agents (`packages/agents`)** register against `agents-server` at startup and will fail with `Stream not found` if they race ahead of it. Pass `--with-agents` to `start` to spawn them after `agents-server` binds `:4437`. Without the flag, run them manually in a separate terminal once `start` reports the server is up — Ctrl-C in that terminal stops only the built-in agents: @@ -131,15 +135,16 @@ Vite dev server with HMR — changes appear instantly. ### agents-server -| Variable | Default | Description | -| ------------------------------------- | --------- | --------------------------------------------------- | -| `DATABASE_URL` | — | Postgres connection URL (required) | -| `ELECTRIC_AGENTS_ELECTRIC_URL` | — | Electric sync service URL | -| `ELECTRIC_AGENTS_HOST` | `0.0.0.0` | Bind address | -| `ELECTRIC_AGENTS_PORT` | `4437` | Server port | -| `ELECTRIC_AGENTS_BASE_URL` | — | Public webhook base URL | -| `ELECTRIC_AGENTS_STREAMS_DATA_DIR` | — | Local streams data directory | -| `ELECTRIC_AGENTS_DURABLE_STREAMS_URL` | — | External durable streams URL (omit to use embedded) | +| Variable | Default | Description | +| -------------------------------------- | -------------------------------- | ------------------------------------------------------ | +| `DATABASE_URL` | — | Postgres connection URL (required) | +| `ELECTRIC_AGENTS_ELECTRIC_URL` | — | Electric sync service URL | +| `ELECTRIC_AGENTS_HOST` | `0.0.0.0` | Bind address | +| `ELECTRIC_AGENTS_PORT` | `4437` | Server port | +| `ELECTRIC_AGENTS_BASE_URL` | — | Public webhook base URL | +| `ELECTRIC_AGENTS_STREAMS_DATA_DIR` | — | Local streams data directory | +| `ELECTRIC_AGENTS_DURABLE_STREAMS_URL` | — | External durable streams URL (omit to use embedded) | +| `ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL` | `http://localhost:3000/v1/shape` | Electric shape URL used by the pgSync prototype bridge | ### agents (built-in) diff --git a/docs/agents-pg-sync-observation-plan.md b/docs/agents-pg-sync-observation-plan.md new file mode 100644 index 0000000000..1878aa7516 --- /dev/null +++ b/docs/agents-pg-sync-observation-plan.md @@ -0,0 +1,673 @@ +# Prototype Plan: Observe Postgres sync streams from Agents + +## Prototype goal + +Build a working prototype of Agents observing Electric Postgres shape streams. + +Prototype API: + +```ts +await ctx.observe( + pgSync({ + table: 'todos', + replica: 'full', + }), + { wake: { on: 'change', ops: ['insert'] } } +) +``` + +Horton demo tool: + +```ts +observe_pg_sync({ + table: 'todos', + where: "priority = 'high'", + replica: 'full', + wake: { ops: ['delete'] }, +}) +``` + +Prototype simplifications: + +- No URL argument. The server assumes Electric is at `http://localhost:3000/v1/shape`. Production pg-sync sources may point at arbitrary Electric servers; the bridge is not limited to the Electric instance used by Agents itself. +- No enablement env flags. +- No URL/table allowlists yet. +- No auth/secret injection yet. +- Keep `pgSync` narrower than raw `ShapeStreamOptions`. +- Accept only JSON-safe source config: `table`, `columns`, `where`, `params`, `replica`. +- Exclude ShapeStream runtime/transport options: `liveSse`, `subscribe`, `fetchClient`, `signal`, `onError`, `headers`, `parser`, `transformer`, `columnMapper`, `log`, `offset`, `handle`. + +The server will own the `ShapeStream`, append its updates to a durable stream, and wake observing agents through the existing wake registry. + +Reviewer-driven constraints: + +- This is a new bridge subsystem modeled after `EntityBridgeManager`, not an extension of the existing `entities()` bridge. The current `entities()` bridge is hard-coded to the Agents `entities` table and tag membership stream. +- Do not rely on `EntityBridgeManager.onEntityChanged()`; it is intentionally a no-op. Shape tailing is the source of truth. +- Make wake delivery explicit. If pgSync bridge appends bypass the agents-server stream append route, the bridge must explicitly invoke the same wake evaluation path after appending. +- Keep manual Electric/Postgres smoke tests optional until unit/integration seams are in place. +- Use shared canonicalization helpers for runtime and server so `sourceRef` cannot diverge. + +## Existing implementation to copy + +Use `observe(entities())` as the template. It already does almost exactly what this prototype needs, but for the constrained built-in `entities` table. + +Key files: + +- Runtime source helper: `packages/agents-runtime/src/observation-sources.ts` +- Runtime registration before preload: `packages/agents-runtime/src/process-wake.ts` +- Out-of-handler client registration: `packages/agents-runtime/src/agents-client.ts` +- Runtime server client: `packages/agents-runtime/src/runtime-server-client.ts` +- Server registration route: `packages/agents-server/src/routing/entities-router.ts` +- Entity bridge manager: `packages/agents-server/src/entity-bridge-manager.ts` +- Manifest/wake mapping: `packages/agents-server/src/manifest-side-effects.ts` +- Manifest source reference tracking: `packages/agents-server/src/entity-manager.ts` and `packages/agents-server/src/runtime.ts` + +`EntityBridgeManager` pattern to copy: + +1. Persist a source registration row with `sourceRef`, config, stream URL, and ShapeStream cursor. +2. Ensure the durable stream exists. +3. Start a server-owned `ShapeStream`. +4. Bootstrap from `offset: '-1'` to reconcile current state. +5. Append normalized Durable Streams events with `type` and `headers.operation`. +6. Persist `shapeHandle` / `lastOffset` for efficient restart. +7. Handle `must-refetch` by clearing cursor and resyncing. +8. Keep bridges alive while referenced by manifests/readers; garbage collect idle bridges. + +For the prototype, `PgSyncBridgeManager` should be a generalized `EntityBridgeManager` with source config `{ table, columns, where, params, replica }` and stream path `/_electric/pg-sync/`. + +## Durable stream shape + +Use stream path: + +```txt +/_electric/pg-sync/ +``` + +`sourceRef` is deterministic from canonicalized config: + +```json +{ + "table": "todos", + "where": "priority = 'high'", + "params": [], + "columns": ["id", "text"], + "replica": "full" +} +``` + +Initial event collection: + +```ts +export const pgSyncObservationCollections = { + changes: { + type: 'pg_sync_change', + primaryKey: 'key', + }, +} +``` + +Append events like: + +```ts +{ + type: 'pg_sync_change', + key: '', + value: { + key: '', + table: 'todos', + operation: 'insert' | 'update' | 'delete', + rowKey?: string, + value?: unknown, + oldValue?: unknown, + headers: Record, + offset?: string, + receivedAt: string, + }, + headers: { + operation: 'insert' | 'update' | 'delete', + timestamp: string, + }, +} +``` + +This lets the existing wake condition matcher handle: + +```ts +{ on: 'change', ops: ['insert'] } +``` + +without new wake semantics. + +## Implementation slices + +Each slice should leave the repo in a working, testable state before moving on. + +--- + +## Slice 1 — Add the runtime `pgSync` source helper only + +### Goal + +Agents can construct a `pgSync(...)` observation source with a deterministic `sourceRef`, `streamUrl`, schema, wake default, and manifest entry. No server registration or ShapeStream work yet. + +### Changes + +In `packages/agents-runtime/src/observation-sources.ts`: + +- Add `PgSyncOptions`: + +```ts +export interface PgSyncOptions { + table: string + columns?: string[] + where?: string + params?: string[] | Record + replica?: 'default' | 'full' +} +``` + +- Add `getPgSyncStreamPath(sourceRef)`. +- Add canonicalization and hashing for `sourceRef`. +- Add `pgSyncObservationCollections`. +- Add `PgSyncObservationSource` with `sourceType: 'pgSync'`. +- Add `pgSync(options)`. +- Add default `wake()` returning: + +```ts +{ + sourceUrl: getPgSyncStreamPath(sourceRef), + condition: { on: 'change', collections: ['pg_sync_change'] }, +} +``` + +- Export `pgSync` from: + - `packages/agents-runtime/src/index.ts` + - `packages/agents-runtime/src/client.ts` + +### Tests + +Add runtime unit tests, e.g. `packages/agents-runtime/test/pg-sync-source.test.ts`: + +- `pgSync({ table: 'todos' })` returns `sourceType: 'pgSync'`. +- Equivalent configs produce the same `sourceRef`. +- Different table/where/params produce different `sourceRef`s. +- `streamUrl` is `/_electric/pg-sync/`. +- `toManifestEntry()` serializes only JSON-safe config. +- `wake()` points at the pg-sync stream and `pg_sync_change` collection. +- `params` object key order does not affect `sourceRef`. +- Undefined optional fields are omitted consistently. +- Decide and test whether `columns` order is meaningful. For the prototype, prefer preserving order because Electric column projection order may be meaningful. +- `replica` defaulting is deterministic: decide whether `pgSync({ table })` and `pgSync({ table, replica: 'default' })` are identical, then lock that behavior in tests. + +### Verification command + +```sh +cd packages/agents-runtime +pnpm test -- test/pg-sync-source.test.ts +``` + +Do not continue until these tests pass. + +--- + +## Slice 2 — Teach manifest/wake code about `pgSync` + +### Goal + +A `pgSync` manifest source can become a wake registration pointing at the pg-sync durable stream. Still no server ShapeStream bridge yet. + +### Changes + +In `packages/agents-server/src/manifest-side-effects.ts`: + +- Extend `extractManifestSourceUrl()`: + +```ts +if (manifest.sourceType === 'pgSync') { + return typeof manifest.sourceRef === 'string' + ? `/_electric/pg-sync/${manifest.sourceRef}` + : undefined +} +``` + +No custom wake semantics needed; `buildManifestWakeRegistration()` should work once `extractManifestSourceUrl()` works. + +### Tests + +Add/extend server tests for manifest side effects: + +- Given a `pgSync` source manifest with `sourceRef`, `extractManifestSourceUrl()` returns `/_electric/pg-sync/`. +- Given a `pgSync` source manifest with `wake: { on: 'change', ops: ['delete'] }`, `buildManifestWakeRegistration()` returns a registration with: + - `sourceUrl: '/_electric/pg-sync/'` + - `condition.ops: ['delete']` + - `oneShot: false` +- Manifest without `sourceRef` returns no wake registration / safe `undefined` behavior. +- Object-form wake preserves `collections`, `ops`, `debounceMs`, and `timeoutMs`. +- If `extractManifestSourceUrl()` is not exported, test through `buildManifestWakeRegistration()` instead of exporting only for tests. + +### Verification command + +```sh +cd packages/agents-server +pnpm test -- test/manifest-side-effects.test.ts +``` + +Do not continue until these tests pass. + +--- + +## Slice 3 — Add registration plumbing in three small steps + +### Goal + +Runtime can tell the server “ensure this pgSync source exists” and receive `{ sourceRef, streamUrl }`. This slice is intentionally split so each part is testable without a live `ShapeStream`. + +### Slice 3a — Runtime server client method + +In `packages/agents-runtime/src/runtime-server-client.ts`: + +- Add `registerPgSyncSource(options)` to the interface and implementation. +- POST to `/_electric/pg-sync/register`. +- Send only `options`; the server computes `sourceRef` using the shared canonicalization helper and returns `{ sourceRef, streamUrl }`. +- Test non-2xx error handling. + +Tests: + +- Add `packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts`. +- Mock fetch and assert: + - URL path is `/_electric/pg-sync/register`. + - method is `POST`. + - JSON body is `{ options }`. + - response parsing returns `sourceRef`/`streamUrl`. + - non-OK response throws useful error text. + +Verification: + +```sh +cd packages/agents-runtime +pnpm test -- test/runtime-server-client-pg-sync.test.ts +``` + +### Slice 3b — Server registration route stub + +In `packages/agents-server`: + +- Add a route module or extend an existing internal/electric router with: + +```txt +POST /_electric/pg-sync/register +``` + +Request body: + +```ts +{ + options: PgSyncOptions +} +``` + +Response: + +```ts +{ + sourceRef: string, + streamUrl: '/_electric/pg-sync/' +} +``` + +For this step, route behavior is minimal: + +- Validate `table` is a non-empty string. +- Compute `sourceRef` from shared canonicalization. +- Ensure durable stream exists using the existing `StreamClient` path. +- Return stream metadata. +- Do not start `ShapeStream` yet. + +Tests: + +- Add `packages/agents-server/test/pg-sync-router.test.ts`. +- Registering `{ table: 'todos' }` returns expected stream path. +- Invalid table returns 400. +- Server-computed `sourceRef` matches runtime helper output. Prefer importing one shared helper rather than duplicating hashing logic. + +Verification: + +```sh +cd packages/agents-server +pnpm test -- test/pg-sync-router.test.ts +``` + +### Slice 3c — Runtime observe wiring + +In both places that already special-case `entities`: + +- `packages/agents-runtime/src/process-wake.ts` +- `packages/agents-runtime/src/agents-client.ts` + +Add: + +```ts +if (source.sourceType === 'pgSync') { + await serverClient.registerPgSyncSource(source.options) +} +``` + +This must happen before `setupCtx.observe(...)` / client preload opens the source StreamDB. + +Tests: + +- Add or extend tests that mirror existing cron/entities registration behavior. +- Assert observing a `pgSync` source calls `registerPgSyncSource` before source DB preload. +- Assert out-of-handler `agentsClient.observe(pgSync(...))` also registers first. + +Verification: + +```sh +cd packages/agents-runtime +pnpm test -- pg-sync +``` + +Do not continue until all Slice 3a/3b/3c tests pass. + +--- + +## Slice 4 — Implement a minimal `PgSyncBridgeManager` that appends ShapeStream messages + +### Goal + +Registering a pgSync source starts a server-owned `ShapeStream` against `http://localhost:3000/v1/shape` and appends received change messages to `/_electric/pg-sync/`. + +This slice proves durable stream mirroring works. Wake delivery can be verified in the next slice. + +### Changes + +Add `packages/agents-server/src/pg-sync-bridge-manager.ts` modeled after `entity-bridge-manager.ts`. + +For the prototype: + +- Use Electric URL constant: `http://localhost:3000/v1/shape`. +- Start with simple lifecycle: + - `register(options)` ensures stream exists and starts bridge. + - Maintain in-memory map of active bridges. + - Persist source rows/cursors only if easy to reuse existing registry patterns; otherwise add persistence in Slice 6. +- Use `ShapeStream` directly. +- Use `params`: + +```ts +{ + table, + columns, + where, + params, + replica, +} +``` + +- Append only `isChangeMessage(message)` initially. +- For `isControlMessage(message)`: + - ignore `up-to-date` for now, + - on `must-refetch`, restart from `offset: '-1'`. + +Event append mapping: + +- Durable event `type`: `pg_sync_change` +- Durable event `key`: use Electric message offset when available, else `${message.headers.operation}:${message.key}` or UUID fallback. +- Durable event `headers.operation`: Electric message operation. +- Durable event `value`: include table, operation, Electric headers, row value/old value, received timestamp. + +Wire the route from Slice 3 to call `pgSyncBridgeManager.register(...)`. + +### Tests + +Unit-test seams before any live Electric smoke test: + +- `buildElectricShapeParams(options)`: + - maps `table`, `columns`, `where`, `params`, `replica` into `ShapeStream` `params`. + - never forwards excluded options such as `liveSse`, `headers`, `fetchClient`, `offset`, or `handle` from source config. +- `pgSyncMessageToDurableEvent(message, options/sourceRef)`: + - insert maps to `headers.operation: 'insert'`. + - update maps to `update`. + - delete maps to `delete` and preserves old value/value as available. + - event key is stable from offset/key. +- Bridge lifecycle with mocked `ShapeStream`: + - register starts one stream per `sourceRef`. + - second register of same source does not start a duplicate bridge. + - change message appends expected durable event. + - `must-refetch` restarts from `offset: '-1'`. + +### Optional manual smoke verification + +Only run this after unit tests pass. Run local Electric stack and agents server, then: + +1. Register a pgSync source via HTTP. +2. Insert a row in Postgres. +3. Read `/_electric/pg-sync/` from Durable Streams. +4. Confirm a `pg_sync_change` event appears. + +Example verification shape: + +```sh +curl -X POST http://localhost:4437/_electric/pg-sync/register \ + -H 'content-type: application/json' \ + -d '{"options":{"table":"todos","replica":"full"}}' +``` + +Then mutate Postgres and read the returned stream URL. + +Do not continue until the durable stream receives events. If repo dev scripts configure Electric on a different local port, keep the prototype code aligned with the user-requested default `http://localhost:3000/v1/shape` or update the smoke-test environment to expose that port. + +--- + +## Slice 5 — Connect pgSync durable events to wake delivery + +### Goal + +An agent observing `pgSync(...)` wakes when matching durable stream events are appended. + +### Changes + +Depending on how append/wake evaluation is structured: + +- Ensure pgSync bridge appends through the same stream append path that triggers `EntityManager` wake evaluation, or explicitly call the same wake evaluation method used for entity/shared-state stream appends. +- Confirm source URL passed to wake registry is exactly `/_electric/pg-sync/`. +- Confirm durable event shape has: + - `type: 'pg_sync_change'` + - `headers.operation: 'insert' | 'update' | 'delete'` + +The existing `WakeRegistry.matchCondition()` should then match: + +```ts +{ on: 'change', ops: ['insert'] } +``` + +### Tests + +Add a wake-registry/entity-manager integration test if feasible: + +- Register wake: + - `subscriberUrl: '/horton/a'` + - `sourceUrl: '/_electric/pg-sync/test'` + - `condition: { on: 'change', ops: ['insert'] }` +- Evaluate/append insert event. +- Assert wake result is produced. +- Evaluate/append delete event. +- Assert no wake for insert-only registration. + +Also test two registrations on same source: + +- Agent A wakes on `insert`. +- Agent B wakes on `delete`. +- Insert wakes A only. +- Delete wakes B only. +- Collection filtering works with `collections: ['pg_sync_change']`. + +Add one test proving the bridge append path invokes the same wake evaluation path used by existing stream appends, not just that `WakeRegistry.matchCondition()` works in isolation. + +### Manual verification + +Run two Horton instances: + +1. Horton A: call `observe_pg_sync({ table: 'todos', replica: 'full', wake: { ops: ['insert'] } })`. +2. Horton B: call `observe_pg_sync({ table: 'todos', where: "priority = 'high'", replica: 'full', wake: { ops: ['delete'] } })`. +3. Insert into `todos`. +4. Confirm Horton A wakes. +5. Delete a matching high-priority todo. +6. Confirm Horton B wakes. + +Do not continue until this end-to-end demo works. + +--- + +## Slice 6 — Add persistence/resume and `must-refetch` robustness + +### Goal + +Make the prototype resilient enough for server restart and shape reset, following `EntityBridgeManager` more closely. + +### Changes + +Add persistence for pgSync bridge registrations and cursors. Options: + +- Add a `pg_sync_bridges` table similar to existing entity bridge metadata, or +- Reuse a generic bridge/source table if one exists. + +Persist: + +- `sourceRef` +- canonical options JSON +- `streamUrl` +- `shapeHandle` +- `shapeOffset` +- `lastTouchedAt` + +On server start: + +- Load existing pgSync bridge rows. +- Restart active/referenced bridges. +- Resume with stored `shapeHandle` + `shapeOffset` if valid. +- Otherwise bootstrap from `offset: '-1'`. + +On `must-refetch`: + +- Clear cursor. +- Resync from `offset: '-1'`. + +Optional for prototype but useful: + +- Idle GC modeled after `EntityBridgeManager`. +- Touch bridge when its durable stream is read or registered. + +### Tests + +- Register source persists row. +- Cursor is updated after messages. +- Manager startup resumes from stored cursor. +- Duplicate/idempotency behavior on resume is controlled. +- Stale/invalid stored shape handle falls back to bootstrap. +- `must-refetch` clears cursor and restarts bootstrap. +- Only test `lastTouchedAt`/GC if GC is implemented in this slice; otherwise leave GC out of scope. + +### Verification + +Manual restart test: + +1. Register source. +2. Mutate Postgres; confirm event appended. +3. Restart agents server. +4. Mutate Postgres again. +5. Confirm new event appended without duplicating the entire stream unexpectedly. + +--- + +## Slice 7 — Add Horton `observe_pg_sync` tool + +### Goal + +Horton can set up pgSync observations from chat for the demo. + +For an LLM-facing integration, the agent/tool code is the boundary: users control what tools their agents expose, so `observe_pg_sync` should present only the shape controls the agent author wants the model to use. Do not expose Electric credentials to the model. If an application needs stricter table/tenant authorization than the tool code provides, route pg-sync through an application-owned Electric proxy that enforces those controls before forwarding shape requests. + +### Changes + +Add `packages/agents/src/tools/observe-pg-sync.ts` or define in Horton tools. + +Tool args: + +```ts +{ + table: string, + columns?: string[], + where?: string, + params?: string[] | Record, + replica?: 'default' | 'full', + wake?: { + ops?: Array<'insert' | 'update' | 'delete'>, + debounceMs?: number, + timeoutMs?: number + } +} +``` + +Implementation: + +```ts +const source = pgSync({ table, columns, where, params, replica }) +await ctx.observe(source, { + wake: { + on: 'change', + ...(wake?.ops ? { ops: wake.ops } : {}), + ...(wake?.debounceMs ? { debounceMs: wake.debounceMs } : {}), + ...(wake?.timeoutMs ? { timeoutMs: wake.timeoutMs } : {}), + }, +}) +return { sourceRef: source.sourceRef, streamUrl: source.streamUrl, wake } +``` + +Add the tool to `createHortonTools(...)` in `packages/agents/src/agents/horton.ts`. + +Update Horton system prompt tool list to include `observe_pg_sync`. + +### Tests + +- Place tests under `packages/agents/test`. +- Tool validates required `table`. +- Tool calls `ctx.observe(pgSync(...), { wake })`. +- Tool returns `sourceRef` and `streamUrl`. +- `observe_pg_sync` is included in Horton's tool list. +- Wake defaulting works when `wake.ops` is omitted. +- Invalid `ops` values are rejected by the tool schema. + +### Manual verification + +Use the exact demo flow: + +- Horton A observes inserts. +- Horton B observes conditional deletes. +- Mutate Postgres and confirm each wakes only on its matching operation. + +--- + +## Out of scope for this prototype + +Defer these to a production hardening pass: + +- URL selection / multiple Electric servers. +- URL allowlists and table allowlists. +- Auth/secret injection. +- Per-tenant authorization and shape authorization. +- UI for inspecting pgSync bridges. +- Rich filtering beyond Electric shape `where` plus wake `ops`. +- Typed row schemas for pgSync streams. +- Backpressure/batching optimizations. +- Full control-message event recording. + +## Success criteria + +The prototype is successful when: + +1. `pgSync(...)` sources serialize to manifests. +2. The server registers and starts a ShapeStream for local Electric. +3. Shape changes are mirrored into a durable stream. +4. Existing wake conditions fire from those durable events. +5. Horton can call `observe_pg_sync` from chat. +6. Two Horton instances can observe different operations on the same or different pgSync sources. diff --git a/packages/agents-desktop/vite.config.ts b/packages/agents-desktop/vite.config.ts index 5a3fe59bdc..bf959b586f 100644 --- a/packages/agents-desktop/vite.config.ts +++ b/packages/agents-desktop/vite.config.ts @@ -3,7 +3,9 @@ import { fileURLToPath } from 'node:url' import { defineConfig, type PluginOption } from 'vite' import electron from 'vite-plugin-electron/simple' -const RENDERER_DEV_SERVER_URL = `http://localhost:5183` +const RENDERER_DEV_SERVER_URL = + process.env.ELECTRIC_DESKTOP_DEV_SERVER_URL ?? + `http://localhost:${process.env.ELECTRIC_DESKTOP_UI_PORT ?? `5183`}` const PACKAGE_DIR = path.dirname(fileURLToPath(import.meta.url)) const REPO_ROOT = path.resolve(PACKAGE_DIR, `../..`) diff --git a/packages/agents-runtime/src/agents-client.ts b/packages/agents-runtime/src/agents-client.ts index dd3358abca..d8995024ac 100644 --- a/packages/agents-runtime/src/agents-client.ts +++ b/packages/agents-runtime/src/agents-client.ts @@ -7,6 +7,7 @@ import type { EntitySignal } from './runtime-server-client' import type { EntitiesObservationSource, EntityObservationSource, + PgSyncObservationSource, } from './observation-sources' import type { EntityStreamDB, @@ -73,6 +74,17 @@ export function createAgentsClient(config: AgentsClientConfig): AgentsClient { } } + if (source.sourceType === `pgSync`) { + const registered = await serverClient.registerPgSyncSource( + (source as PgSyncObservationSource).options + ) + source = { + ...source, + sourceRef: registered.sourceRef, + streamUrl: registered.streamUrl, + } + } + if (!source.streamUrl || !source.schema) { throw new Error( `[agent-runtime] Cannot observe source "${source.sourceType}" without a streamUrl and schema` diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 0d467fd7f1..2eee361829 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -12,9 +12,14 @@ export { normalizeTimelineEntities, } from './entity-timeline' export { + canonicalPgSyncOptions, db, entities, entity, + getPgSyncStreamPath, + pgSync, + pgSyncObservationCollections, + sourceRefForPgSync, webhook, getWebhookStreamPath, webhookObservationCollections, @@ -60,6 +65,9 @@ export type { WebhookObservationSource, WebhookEventRow, EntitiesQuery, + CanonicalPgSyncConfig, + PgSyncObservationSource, + PgSyncOptions, } from './observation-sources' export type { EntityTimelineContentItem, diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 7d70d3cef2..344c4fdf97 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -108,6 +108,8 @@ type WakeChangeEntryValue = { collection: string kind: `insert` | `update` | `delete` key: string + value?: unknown + oldValue?: unknown from?: string from_principal?: string from_agent?: string @@ -407,6 +409,8 @@ function createWakeChangeSchema(): Schema { collection: z.string(), kind: z.enum([`insert`, `update`, `delete`]), key: z.string(), + value: z.unknown().optional(), + oldValue: z.unknown().optional(), from: z.string().optional(), from_principal: z.string().optional(), from_agent: z.string().optional(), diff --git a/packages/agents-runtime/src/event-sources.ts b/packages/agents-runtime/src/event-sources.ts index 305c0ecbe5..ed79e07301 100644 --- a/packages/agents-runtime/src/event-sources.ts +++ b/packages/agents-runtime/src/event-sources.ts @@ -126,7 +126,7 @@ export type HydratedEventSourceWake = { } const DEFAULT_LIFETIME: SubscriptionLifetime = { kind: `until_entity_stopped` } -const paramsSchemaValidator = new Ajv({ allErrors: true, strict: false }) +const paramsSchemaValidator = new Ajv({ allErrors: true, strict: false } as any) const paramsSchemaCache = new WeakMap< Record, ValidateFunction @@ -296,8 +296,9 @@ function formatParamsSchemaError(error: ErrorObject): string { `string` ? (error.params as { missingProperty: string }).missingProperty : undefined - const path = - error.instancePath || (missingProperty ? `/${missingProperty}` : `/`) + const instancePath = (error as ErrorObject & { instancePath?: string }) + .instancePath + const path = instancePath || (missingProperty ? `/${missingProperty}` : `/`) return `${path} ${error.message ?? `is invalid`}` } diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 3275e31be3..7d0d4deb25 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -292,6 +292,11 @@ export { cron, entities, db, + canonicalPgSyncOptions, + getPgSyncStreamPath, + pgSync, + pgSyncObservationCollections, + sourceRefForPgSync, webhook, getWebhookStreamPath, webhookObservationCollections, @@ -305,6 +310,9 @@ export type { WebhookObservationSource, WebhookEventRow, EntitiesQuery, + CanonicalPgSyncConfig, + PgSyncObservationSource, + PgSyncOptions, } from './observation-sources' export { processWake } from './process-wake' diff --git a/packages/agents-runtime/src/observation-sources.ts b/packages/agents-runtime/src/observation-sources.ts index b638746caf..f2883707fe 100644 --- a/packages/agents-runtime/src/observation-sources.ts +++ b/packages/agents-runtime/src/observation-sources.ts @@ -9,6 +9,7 @@ import { entitiesObservationCollections, getEntitiesStreamPath, normalizeTags, + hashString, sourceRefForTags, } from './tags' import { getSharedStateStreamPath } from './runtime-server-client' @@ -20,6 +21,76 @@ import type { import type { EntityTags } from './tags' import type { CollectionDefinition } from '@durable-streams/state' +export interface PgSyncOptions { + table: string + columns?: string[] + where?: string + params?: string[] | Record + replica?: `default` | `full` +} + +export interface PgSyncObservationSource extends ObservationSource { + readonly sourceType: `pgSync` + readonly options: PgSyncOptions + readonly streamUrl: string + readonly schema: typeof pgSyncObservationCollections +} + +export const pgSyncObservationCollections = { + changes: { + type: `pg_sync_change`, + primaryKey: `key`, + }, +} + +export function getPgSyncStreamPath( + sourceRef: string, + namespace?: string +): string { + return namespace + ? `/_electric/pg-sync/${encodeURIComponent(namespace)}/${sourceRef}` + : `/_electric/pg-sync/${sourceRef}` +} + +export type CanonicalPgSyncConfig = { + table: string + columns?: string[] + where?: string + params?: string[] | Record + replica: `default` | `full` +} + +function normalizePgSyncParams( + params: PgSyncOptions[`params`] +): PgSyncOptions[`params`] | undefined { + if (params === undefined) return undefined + if (Array.isArray(params)) return [...params] + return Object.keys(params) + .sort() + .reduce>((sorted, key) => { + sorted[key] = params[key]! + return sorted + }, {}) +} + +export function canonicalPgSyncOptions( + options: PgSyncOptions +): CanonicalPgSyncConfig { + return { + table: options.table, + ...(options.columns !== undefined ? { columns: [...options.columns] } : {}), + ...(options.where !== undefined ? { where: options.where } : {}), + ...(options.params !== undefined + ? { params: normalizePgSyncParams(options.params) } + : {}), + replica: options.replica ?? `default`, + } +} + +export function sourceRefForPgSync(options: PgSyncOptions): string { + return hashString(JSON.stringify(canonicalPgSyncOptions(options))) +} + export interface EntityObservationSource extends ObservationSource { readonly sourceType: `entity` readonly entityUrl: string @@ -265,6 +336,34 @@ export function db( } } +export function pgSync(options: PgSyncOptions): PgSyncObservationSource { + const config = canonicalPgSyncOptions(options) + const sourceRef = sourceRefForPgSync(config) + const streamUrl = getPgSyncStreamPath(sourceRef) + return { + sourceType: `pgSync`, + sourceRef, + streamUrl, + schema: pgSyncObservationCollections, + options: config, + wake() { + return { + sourceUrl: streamUrl, + condition: { on: `change`, collections: [`pg_sync_change`] }, + } + }, + toManifestEntry(): ManifestSourceEntry { + return { + key: manifestSourceKey(`pgSync`, sourceRef), + kind: `source`, + sourceType: `pgSync`, + sourceRef, + config, + } + }, + } +} + function assertWebhookEndpointKey(endpointKey: string): void { if (!/^[a-z0-9][a-z0-9._-]{0,62}$/.test(endpointKey)) { throw new Error( diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 5d3b0df6ba..55bc2db32a 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -25,6 +25,7 @@ import type { CronObservationSource, EntitiesObservationSource, EntityObservationSource, + PgSyncObservationSource, WebhookEventRow, } from './observation-sources' import type { @@ -1655,12 +1656,25 @@ export async function processWake( } } + let registeredPgSync: { streamUrl: string; sourceRef: string } | undefined + if (source.sourceType === `pgSync`) { + registeredPgSync = await serverClient.registerPgSyncSource( + (source as PgSyncObservationSource).options + ) + observedSource = { + ...source, + sourceRef: registeredPgSync.sourceRef, + streamUrl: registeredPgSync.streamUrl, + } + } + if (effectiveWake) { const observeHandle = await setupCtx.observe(observedSource, { wake: effectiveWake, }) const sourceUrl = + registeredPgSync?.streamUrl ?? sourceWakeConfig?.sourceUrl ?? (observedSource.sourceType === `entity` ? (observedSource as EntityObservationSource).entityUrl diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index c391731313..aa4cecab7d 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -1,3 +1,4 @@ +import type { PgSyncOptions } from './observation-sources' import type { EntityTags, TagOperation } from './tags' import { appendPathToUrl } from './url' import { buildEventSourceSubscriptionId } from './event-sources' @@ -138,6 +139,10 @@ export interface RuntimeServerClient { streamUrl: string sourceRef: string }> + registerPgSyncSource: (options: PgSyncOptions) => Promise<{ + streamUrl: string + sourceRef: string + }> listEventSources: () => Promise> subscribeToEventSource: ( options: EventSourceSubscriptionInput & { entityUrl: string } @@ -577,6 +582,22 @@ export function createRuntimeServerClient( return (await response.json()) as { streamUrl: string; sourceRef: string } } + const registerPgSyncSource = async ( + options: PgSyncOptions + ): Promise<{ streamUrl: string; sourceRef: string }> => { + const response = await request(`/_electric/pg-sync/register`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ options }), + }) + if (!response.ok) { + throw new Error( + `registerPgSyncSource failed (${response.status}): ${await readErrorText(response)}` + ) + } + return (await response.json()) as { streamUrl: string; sourceRef: string } + } + const listEventSources = async (): Promise> => { const response = await request(`/_electric/event-sources`, { method: `GET`, @@ -794,6 +815,7 @@ export function createRuntimeServerClient( registerWake, ensureCronStream, ensureEntitiesMembershipStream, + registerPgSyncSource, listEventSources, subscribeToEventSource, unsubscribeFromEventSource, diff --git a/packages/agents-runtime/src/setup-context.ts b/packages/agents-runtime/src/setup-context.ts index e0ea11aac1..0296df9b2c 100644 --- a/packages/agents-runtime/src/setup-context.ts +++ b/packages/agents-runtime/src/setup-context.ts @@ -5,12 +5,12 @@ import { queryOnce, } from '@durable-streams/state/db' import { createWakeSession } from './wake-session' +import { appendPathToUrl } from './url' import { manifestChildKey, manifestEffectKey, manifestSharedStateKey, } from './manifest-helpers' -import { appendPathToUrl } from './url' import type { DbObservationSource, EntityObservationSource, @@ -832,9 +832,10 @@ export function createSetupContext( source.ensureStream.contentType ) } - const sourceStreamUrl = source.streamUrl.startsWith(`/`) - ? appendPathToUrl(config.serverBaseUrl, source.streamUrl) - : source.streamUrl + const sourceStreamUrl = + source.sourceType === `pgSync` || !source.streamUrl.startsWith(`/`) + ? source.streamUrl + : appendPathToUrl(config.serverBaseUrl, source.streamUrl) sourceDb = await wiring.createSourceDb( sourceStreamUrl, source.schema, diff --git a/packages/agents-runtime/test/electric-agents-client.test.ts b/packages/agents-runtime/test/electric-agents-client.test.ts index 7e60b9c9c3..b4587f4fd4 100644 --- a/packages/agents-runtime/test/electric-agents-client.test.ts +++ b/packages/agents-runtime/test/electric-agents-client.test.ts @@ -1,12 +1,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import { createAgentsClient } from '../src/agents-client' -import { cron, entities, webhook } from '../src/observation-sources' +import { cron, entities, pgSync, webhook } from '../src/observation-sources' import type * as StateDbModule from '@durable-streams/state/db' const { mockState } = vi.hoisted(() => ({ mockState: { ensureEntitiesMembershipStream: vi.fn(), ensureCronStream: vi.fn(), + registerPgSyncSource: vi.fn(), signalEntity: vi.fn(), ensureStream: vi.fn(), createStreamDB: vi.fn(), @@ -24,6 +25,7 @@ vi.mock(`../src/runtime-server-client`, () => ({ createRuntimeServerClient: () => ({ ensureEntitiesMembershipStream: mockState.ensureEntitiesMembershipStream, ensureCronStream: mockState.ensureCronStream, + registerPgSyncSource: mockState.registerPgSyncSource, signalEntity: mockState.signalEntity, ensureStream: mockState.ensureStream, }), @@ -46,6 +48,10 @@ describe(`createAgentsClient`, () => { sourceRef: `source-1`, streamUrl: `/_entities/source-1`, }) + mockState.registerPgSyncSource = vi.fn().mockResolvedValue({ + sourceRef: `pg-source-1`, + streamUrl: `/_electric/pg-sync/default/pg-source-1`, + }) mockState.ensureStream = vi.fn().mockResolvedValue(`/_webhooks/repo`) mockState.createStreamDB = vi.fn() mockState.signalEntity = vi.fn().mockResolvedValue({ txid: 123 }) @@ -102,6 +108,40 @@ describe(`createAgentsClient`, () => { expect(db).toBe(mockState.observedDb) }) + it(`registers pgSync sources before preloading the observed StreamDB`, async () => { + const client = createAgentsClient({ + baseUrl: `http://electric-agents.test`, + }) + + const source = pgSync({ + table: `todos`, + where: `priority = $1`, + params: [`high`], + replica: `full`, + }) + + const db = await client.observe(source) + + expect(mockState.registerPgSyncSource).toHaveBeenCalledWith(source.options) + expect(mockState.createStreamDB).toHaveBeenCalledWith({ + streamOptions: { + url: `http://electric-agents.test/_electric/pg-sync/default/pg-source-1`, + contentType: `application/json`, + }, + state: expect.objectContaining({ + changes: expect.objectContaining({ + type: `pg_sync_change`, + primaryKey: `key`, + }), + }), + }) + expect(mockState.observedDb.preload).toHaveBeenCalledOnce() + expect( + mockState.registerPgSyncSource.mock.invocationCallOrder[0] + ).toBeLessThan(mockState.observedDb.preload.mock.invocationCallOrder[0]) + expect(db).toBe(mockState.observedDb) + }) + it(`preserves tenant path prefixes on observed stream URLs`, async () => { const client = createAgentsClient({ baseUrl: `http://electric-agents.test/t/tenant-a/v1`, diff --git a/packages/agents-runtime/test/pg-sync-source.test.ts b/packages/agents-runtime/test/pg-sync-source.test.ts new file mode 100644 index 0000000000..886e2bd1c0 --- /dev/null +++ b/packages/agents-runtime/test/pg-sync-source.test.ts @@ -0,0 +1,109 @@ +import { describe, expect, it } from 'vitest' +import { getPgSyncStreamPath, pgSync } from '../src/observation-sources' + +describe(`pgSync observation source`, () => { + it(`uses the pgSync source type`, () => { + expect(pgSync({ table: `todos` }).sourceType).toBe(`pgSync`) + }) + + it(`produces deterministic equivalent sourceRefs`, () => { + expect( + pgSync({ + table: `todos`, + where: `priority = $1`, + params: { priority: `high`, org: `acme` }, + replica: `full`, + }).sourceRef + ).toBe( + pgSync({ + table: `todos`, + where: `priority = $1`, + params: { org: `acme`, priority: `high` }, + replica: `full`, + }).sourceRef + ) + }) + + it(`changes sourceRef for different table, where, and params`, () => { + const base = pgSync({ + table: `todos`, + where: `done = $1`, + params: [`false`], + }) + expect( + pgSync({ table: `tasks`, where: `done = $1`, params: [`false`] }) + .sourceRef + ).not.toBe(base.sourceRef) + expect( + pgSync({ table: `todos`, where: `done = $2`, params: [`false`] }) + .sourceRef + ).not.toBe(base.sourceRef) + expect( + pgSync({ table: `todos`, where: `done = $1`, params: [`true`] }).sourceRef + ).not.toBe(base.sourceRef) + }) + + it(`sets streamUrl from sourceRef`, () => { + const source = pgSync({ table: `todos` }) + expect(source.streamUrl).toBe(getPgSyncStreamPath(source.sourceRef)) + expect(source.streamUrl).toBe(`/_electric/pg-sync/${source.sourceRef}`) + }) + + it(`serializes a JSON-safe manifest config`, () => { + const entry = pgSync({ + table: `todos`, + columns: [`id`, `text`], + where: `priority = $1`, + params: { priority: `high` }, + replica: `full`, + }).toManifestEntry() + expect(JSON.parse(JSON.stringify(entry.config))).toEqual({ + table: `todos`, + columns: [`id`, `text`], + where: `priority = $1`, + params: { priority: `high` }, + replica: `full`, + }) + }) + + it(`defaults wake to the pg-sync stream and change collection`, () => { + const source = pgSync({ table: `todos` }) + expect(source.wake?.()).toEqual({ + sourceUrl: source.streamUrl, + condition: { on: `change`, collections: [`pg_sync_change`] }, + }) + }) + + it(`ignores params object key ordering`, () => { + expect( + pgSync({ table: `todos`, params: { b: `2`, a: `1` } }).sourceRef + ).toBe(pgSync({ table: `todos`, params: { a: `1`, b: `2` } }).sourceRef) + }) + + it(`omits undefined optional fields consistently`, () => { + expect( + pgSync({ + table: `todos`, + columns: undefined, + where: undefined, + params: undefined, + replica: undefined, + }).toManifestEntry().config + ).toEqual({ table: `todos`, replica: `default` }) + expect(pgSync({ table: `todos`, where: undefined }).sourceRef).toBe( + pgSync({ table: `todos` }).sourceRef + ) + }) + + it(`preserves columns order as sourceRef-significant`, () => { + expect( + pgSync({ table: `todos`, columns: [`id`, `text`] }).sourceRef + ).not.toBe(pgSync({ table: `todos`, columns: [`text`, `id`] }).sourceRef) + }) + + it(`treats omitted replica as replica default`, () => { + expect(pgSync({ table: `todos` }).sourceRef).toBe( + pgSync({ table: `todos`, replica: `default` }).sourceRef + ) + }) +}) diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index 4e2cc6a4b3..9cb84fa08b 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -8,7 +8,7 @@ import { type EventSourceContract, } from '../src/event-sources' import { manifestSourceKey } from '../src/manifest-helpers' -import { db } from '../src/observation-sources' +import { db, pgSync } from '../src/observation-sources' import { processWake } from '../src/process-wake' import { clearRegistry, defineEntity } from '../src/define-entity' import { entityStateSchema, passthrough } from '../src/entity-schema' @@ -38,9 +38,10 @@ const { mockStreamHead, mockStreamJson, mockDurableStreamStream, - mockCreateStreamDB, - mockSourceDbClose, mockSourceDbPreload, + mockSourceDbClose, + mockCreateStreamDB, + mockActualCreateStreamDB, mockSourceEvents, mockInitialManifests, mockEntityOnEvent, @@ -65,9 +66,12 @@ const { }), mockStreamJson: vi.fn().mockResolvedValue([]), mockDurableStreamStream: vi.fn(), - mockCreateStreamDB: vi.fn(), - mockSourceDbClose: vi.fn(), mockSourceDbPreload: vi.fn().mockResolvedValue(undefined), + mockSourceDbClose: vi.fn(), + mockCreateStreamDB: vi.fn(), + mockActualCreateStreamDB: { + current: null as ((options: Record) => unknown) | null, + }, mockSourceEvents: { current: [] as Array> }, mockInitialManifests: { current: [] as Array> }, mockEntityOnEvent: { current: null as ((event: unknown) => void) | null }, @@ -135,6 +139,15 @@ vi.mock(`@durable-streams/client`, async (importOriginal) => { } }) +vi.mock(`@durable-streams/state`, async (importOriginal) => { + const actual = await importOriginal() + mockActualCreateStreamDB.current = actual.createStreamDB + return { + ...actual, + createStreamDB: mockCreateStreamDB, + } +}) + // Mock createEntityStreamDB so it doesn't try to create a real TanStack DB vi.mock(`../src/entity-stream-db`, () => ({ createEntityStreamDB: vi.fn().mockImplementation( @@ -419,6 +432,15 @@ describe(`processWake`, () => { clearRegistry() mockConstructedProducers.length = 0 mockDbPreload.mockResolvedValue(undefined) + mockSourceDbPreload.mockResolvedValue(undefined) + mockCreateStreamDB.mockImplementation((options) => { + if (!mockActualCreateStreamDB.current) { + throw new Error(`createStreamDB mock was not initialized`) + } + return mockActualCreateStreamDB.current( + options as Record + ) + }) mockInitialManifests.current = [] mockCreateStreamDB.mockClear() mockSourceDbClose.mockClear() @@ -439,6 +461,17 @@ describe(`processWake`, () => { .mockImplementation((url, opts) => { const urlStr = String(url) const method = opts?.method ?? `GET` + if (urlStr.includes(`/_electric/pg-sync/register`)) { + return Promise.resolve( + new Response( + JSON.stringify({ + sourceRef: `pg-source-1`, + streamUrl: `/_electric/pg-sync/default/pg-source-1`, + }), + { status: 200, headers: { 'content-type': `application/json` } } + ) + ) + } if (method === `PUT` && !urlStr.includes(`subscription=`)) { return Promise.resolve( new Response( @@ -1570,6 +1603,55 @@ describe(`processWake`, () => { ).resolves.not.toBeNull() }) + it(`pgSync observe registers pgSync source before source DB preload`, async () => { + const source = pgSync({ + table: `todos`, + where: `priority = $1`, + params: [`high`], + replica: `full`, + }) + + defineEntity(`test-agent`, { + async handler(ctx) { + await ctx.observe(source) + }, + }) + mockCreateStreamDB.mockReturnValueOnce({ + preload: mockSourceDbPreload, + close: mockSourceDbClose, + }) + + await processWake(makeNotification(), BASE_CONFIG) + + expect(mockCreateStreamDB).toHaveBeenCalledWith( + expect.objectContaining({ + streamOptions: expect.objectContaining({ + url: `/_electric/pg-sync/default/pg-source-1`, + contentType: `application/json`, + }), + state: expect.objectContaining({ + changes: expect.objectContaining({ + type: `pg_sync_change`, + primaryKey: `key`, + }), + }), + }) + ) + expect(mockSourceDbPreload).toHaveBeenCalledOnce() + + const pgSyncCallIndex = fetchMock.mock.calls.findIndex(([url]) => + String(url).includes(`/_electric/pg-sync/register`) + ) + expect(pgSyncCallIndex).toBeGreaterThanOrEqual(0) + const pgSyncBody = JSON.parse( + fetchMock.mock.calls[pgSyncCallIndex]![1]!.body as string + ) as Record + expect(pgSyncBody).toEqual({ options: source.options }) + expect(fetchMock.mock.invocationCallOrder[pgSyncCallIndex]).toBeLessThan( + mockSourceDbPreload.mock.invocationCallOrder[0] + ) + }) + it(`heartbeat is registered with configured interval`, async () => { const setIntervalSpy = vi.spyOn(globalThis, `setInterval`) @@ -1696,6 +1778,17 @@ describe(`processWake`, () => { }, }, ] + mockCreateStreamDB.mockReturnValueOnce({ + collections: { + events: { + get toArray() { + return mockSourceEvents.current + }, + }, + }, + close: mockSourceDbClose, + preload: mockSourceDbPreload, + }) let receivedMessage = `` defineEntity(`test-agent`, { diff --git a/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts new file mode 100644 index 0000000000..652d4342fc --- /dev/null +++ b/packages/agents-runtime/test/runtime-server-client-pg-sync.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it, vi } from 'vitest' +import { createRuntimeServerClient } from '../src/runtime-server-client' + +function jsonResponse(body: unknown, init?: ResponseInit): Response { + return new Response(JSON.stringify(body), { + status: 200, + headers: { 'content-type': `application/json` }, + ...init, + }) +} + +describe(`runtime-server-client.registerPgSyncSource`, () => { + it(`posts options to the pg-sync register route and parses the response`, async () => { + const calls: Array<{ url: string; init?: RequestInit }> = [] + const fakeFetch = vi.fn(async (url: string, init?: RequestInit) => { + calls.push({ url, init }) + return jsonResponse({ + sourceRef: `pg_sync_abc123`, + streamUrl: `/_electric/pg-sync/pg_sync_abc123`, + }) + }) as unknown as typeof fetch + + const client = createRuntimeServerClient({ + baseUrl: `http://test.example?secret=s1`, + fetch: fakeFetch, + }) + + const options = { + table: `todos`, + columns: [`id`, `text`], + where: `priority = $1`, + params: [`high`], + replica: `full` as const, + } + + await expect(client.registerPgSyncSource(options)).resolves.toEqual({ + sourceRef: `pg_sync_abc123`, + streamUrl: `/_electric/pg-sync/pg_sync_abc123`, + }) + + expect(calls).toHaveLength(1) + expect(calls[0]!.url).toBe( + `http://test.example/_electric/pg-sync/register?secret=s1` + ) + expect(calls[0]!.init?.method).toBe(`POST`) + const headers = new Headers(calls[0]!.init?.headers) + expect(headers.get(`content-type`)).toBe(`application/json`) + expect(JSON.parse(calls[0]!.init!.body as string)).toEqual({ options }) + }) + + it(`throws a useful error for non-OK responses`, async () => { + const fakeFetch = vi.fn( + async () => + new Response(`bad table`, { status: 400, statusText: `Bad Request` }) + ) as unknown as typeof fetch + const client = createRuntimeServerClient({ + baseUrl: `http://test.example`, + fetch: fakeFetch, + }) + + await expect( + client.registerPgSyncSource({ table: `todos` }) + ).rejects.toThrow(/registerPgSyncSource failed \(400\): bad table/) + }) +}) diff --git a/packages/agents-runtime/test/setup-context.test.ts b/packages/agents-runtime/test/setup-context.test.ts index bd3a95b4d7..1efa2b27ad 100644 --- a/packages/agents-runtime/test/setup-context.test.ts +++ b/packages/agents-runtime/test/setup-context.test.ts @@ -2216,6 +2216,57 @@ describe(`entity patterns`, () => { }) }) + it(`custom source with streamUrl passes an absolute URL to wiring`, async () => { + const source: ObservationSource = { + sourceType: `webhook`, + sourceRef: `stripe`, + streamUrl: `/webhooks/stripe/events`, + schema: { + events: { type: `webhook_event`, primaryKey: `key` }, + }, + toManifestEntry() { + return { + key: `source:webhook:stripe`, + kind: `source` as const, + sourceType: `webhook`, + sourceRef: `stripe`, + config: {}, + } + }, + } + const { db, writes } = makeCtx() + const createSourceDb = vi.fn(async () => ({ + close: vi.fn(), + preload: vi.fn(), + })) + const ctx = createSetupContext({ + entityUrl: `test-entity-1`, + entityType: `test-agent`, + args: Object.freeze({}), + db, + events: [], + writeEvent: (e: ChangeEvent) => writes.push(e), + serverBaseUrl: `http://localhost:3000/base`, + effectScope: { register: vi.fn() }, + customStateNames: [], + wiring: { + createSourceDb: createSourceDb as never, + createChildDb: vi.fn() as never, + createOrGetChild: vi.fn() as never, + createSharedStateDb: vi.fn() as never, + }, + }) + + await ctx.observe(source) + + expect(createSourceDb).toHaveBeenCalledWith( + `http://localhost:3000/base/webhooks/stripe/events`, + source.schema, + expect.any(Function), + { preload: true } + ) + }) + it(`custom source with streamUrl registers wireDb on source handle`, async () => { const webhookSource: ObservationSource = { sourceType: `webhook`, diff --git a/packages/agents-server/drizzle/0015_pg_sync_bridges.sql b/packages/agents-server/drizzle/0015_pg_sync_bridges.sql new file mode 100644 index 0000000000..1bbf9fd0c5 --- /dev/null +++ b/packages/agents-server/drizzle/0015_pg_sync_bridges.sql @@ -0,0 +1,14 @@ +CREATE TABLE "pg_sync_bridges" ( + "tenant_id" text DEFAULT 'default' NOT NULL, + "source_ref" text NOT NULL, + "options" jsonb NOT NULL, + "stream_url" text NOT NULL, + "shape_handle" text, + "shape_offset" text, + "initial_snapshot_complete" boolean DEFAULT false NOT NULL, + "last_touched_at" timestamp with time zone DEFAULT now() NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "pg_sync_bridges_tenant_id_source_ref_pk" PRIMARY KEY("tenant_id","source_ref"), + CONSTRAINT "uq_pg_sync_bridges_stream_url" UNIQUE("tenant_id","stream_url") +); diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index 994885b230..925f30d355 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -106,6 +106,13 @@ "when": 1780600000000, "tag": "0014_entity_type_slash_commands", "breakpoints": true + }, + { + "idx": 15, + "version": "7", + "when": 1779728400000, + "tag": "0015_pg_sync_bridges", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 5c01ada8b7..683b649e74 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -621,6 +621,34 @@ export const scheduledTasks = pgTable( ] ) +export const pgSyncBridges = pgTable( + `pg_sync_bridges`, + { + tenantId: text(`tenant_id`).notNull().default(`default`), + sourceRef: text(`source_ref`).notNull(), + options: jsonb(`options`).notNull(), + streamUrl: text(`stream_url`).notNull(), + shapeHandle: text(`shape_handle`), + shapeOffset: text(`shape_offset`), + initialSnapshotComplete: boolean(`initial_snapshot_complete`) + .notNull() + .default(false), + lastTouchedAt: timestamp(`last_touched_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + updatedAt: timestamp(`updated_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + primaryKey({ columns: [table.tenantId, table.sourceRef] }), + unique(`uq_pg_sync_bridges_stream_url`).on(table.tenantId, table.streamUrl), + ] +) + export const entityBridges = pgTable( `entity_bridges`, { diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index c35b62813c..9b592fe561 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -10,6 +10,7 @@ import { entityLineage, entityPermissionGrants, entityTypes, + pgSyncBridges, entityTypePermissionGrants, runnerRuntimeDiagnostics, runners, @@ -43,7 +44,7 @@ import type { EntityTypePermissionGrant, PermissionSubjectKind, } from './electric-agents-types.js' -import type { EntityTags } from '@electric-ax/agents-runtime' +import type { EntityTags, PgSyncOptions } from '@electric-ax/agents-runtime' import type { Principal } from './principal.js' export class EntityAlreadyExistsError extends Error { @@ -73,6 +74,19 @@ export interface EntityBridgeRow { updatedAt: Date } +export interface PgSyncBridgeRow { + tenantId: string + sourceRef: string + options: PgSyncOptions + streamUrl: string + shapeHandle?: string + shapeOffset?: string + initialSnapshotComplete: boolean + lastTouchedAt: Date + createdAt: Date + updatedAt: Date +} + export interface TagStreamOutboxRow { id: number tenantId: string @@ -623,6 +637,13 @@ export class PostgresRegistry { ) } + private pgSyncBridgeWhere(sourceRef: string) { + return and( + eq(pgSyncBridges.tenantId, this.tenantId), + eq(pgSyncBridges.sourceRef, sourceRef) + ) + } + async createEntityType(et: ElectricAgentsEntityType): Promise { await this.db .insert(entityTypes) @@ -1517,6 +1538,98 @@ export class PostgresRegistry { }) } + async upsertPgSyncBridge(row: { + sourceRef: string + options: PgSyncOptions + streamUrl: string + }): Promise { + await this.db + .insert(pgSyncBridges) + .values({ + tenantId: this.tenantId, + sourceRef: row.sourceRef, + options: row.options, + streamUrl: row.streamUrl, + lastTouchedAt: new Date(), + updatedAt: new Date(), + }) + .onConflictDoUpdate({ + target: [pgSyncBridges.tenantId, pgSyncBridges.sourceRef], + set: { + options: row.options, + streamUrl: row.streamUrl, + initialSnapshotComplete: false, + lastTouchedAt: new Date(), + updatedAt: new Date(), + }, + }) + + const existing = await this.getPgSyncBridge(row.sourceRef) + if (!existing) + throw new Error(`Failed to load pgSync bridge ${row.sourceRef}`) + return existing + } + + async getPgSyncBridge(sourceRef: string): Promise { + const rows = await this.db + .select() + .from(pgSyncBridges) + .where(this.pgSyncBridgeWhere(sourceRef)) + .limit(1) + return rows[0] ? this.rowToPgSyncBridge(rows[0]) : null + } + + async listPgSyncBridges( + tenantId: string | null = this.tenantId + ): Promise> { + const rows = + tenantId === null + ? await this.db.select().from(pgSyncBridges) + : await this.db + .select() + .from(pgSyncBridges) + .where(eq(pgSyncBridges.tenantId, tenantId)) + return rows.map((row) => this.rowToPgSyncBridge(row)) + } + + async touchPgSyncBridge(sourceRef: string): Promise { + await this.db + .update(pgSyncBridges) + .set({ lastTouchedAt: new Date(), updatedAt: new Date() }) + .where(this.pgSyncBridgeWhere(sourceRef)) + } + + async updatePgSyncBridgeCursor( + sourceRef: string, + shapeHandle: string, + shapeOffset: string, + initialSnapshotComplete?: boolean + ): Promise { + await this.db + .update(pgSyncBridges) + .set({ + shapeHandle, + shapeOffset, + ...(initialSnapshotComplete !== undefined + ? { initialSnapshotComplete } + : {}), + updatedAt: new Date(), + }) + .where(this.pgSyncBridgeWhere(sourceRef)) + } + + async clearPgSyncBridgeCursor(sourceRef: string): Promise { + await this.db + .update(pgSyncBridges) + .set({ + shapeHandle: null, + shapeOffset: null, + initialSnapshotComplete: false, + updatedAt: new Date(), + }) + .where(this.pgSyncBridgeWhere(sourceRef)) + } + async upsertEntityBridge(row: { sourceRef: string tags: EntityTags @@ -1911,6 +2024,23 @@ export class PostgresRegistry { } } + private rowToPgSyncBridge( + row: typeof pgSyncBridges.$inferSelect + ): PgSyncBridgeRow { + return { + tenantId: row.tenantId, + sourceRef: row.sourceRef, + options: row.options as PgSyncOptions, + streamUrl: row.streamUrl, + shapeHandle: row.shapeHandle ?? undefined, + shapeOffset: row.shapeOffset ?? undefined, + initialSnapshotComplete: row.initialSnapshotComplete, + lastTouchedAt: row.lastTouchedAt, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + } + } + private rowToEntityBridge( row: typeof entityBridges.$inferSelect ): EntityBridgeRow { diff --git a/packages/agents-server/src/manifest-side-effects.ts b/packages/agents-server/src/manifest-side-effects.ts index 0a3b87b276..65c160e04a 100644 --- a/packages/agents-server/src/manifest-side-effects.ts +++ b/packages/agents-server/src/manifest-side-effects.ts @@ -10,6 +10,10 @@ export function isRecord(value: unknown): value is Record { return typeof value === `object` && value !== null && !Array.isArray(value) } +function getPgSyncManifestStreamPath(sourceRef: string): string { + return `/_electric/pg-sync/${sourceRef}` +} + export function extractManifestSourceUrl( manifest: Record | undefined ): string | undefined { @@ -57,6 +61,12 @@ export function extractManifestSourceUrl( : undefined } + if (manifest.sourceType === `pgSync`) { + return typeof manifest.sourceRef === `string` + ? getPgSyncManifestStreamPath(manifest.sourceRef) + : undefined + } + if (manifest.sourceType === `webhook`) { if (typeof config?.streamUrl === `string`) return config.streamUrl if (typeof config?.endpointKey === `string`) { diff --git a/packages/agents-server/src/pg-sync-bridge-manager.ts b/packages/agents-server/src/pg-sync-bridge-manager.ts new file mode 100644 index 0000000000..b2a740d86f --- /dev/null +++ b/packages/agents-server/src/pg-sync-bridge-manager.ts @@ -0,0 +1,513 @@ +import { DurableStream, IdempotentProducer } from '@durable-streams/client' +import { + canonicalPgSyncOptions, + getPgSyncStreamPath, + sourceRefForPgSync, + type CanonicalPgSyncConfig, + type PgSyncOptions, +} from '@electric-ax/agents-runtime' +import { + ShapeStream, + isChangeMessage, + isControlMessage, +} from '@electric-sql/client' +import { serverLog } from './utils/log.js' +import type { StreamClient } from './stream-client.js' +import type { PgSyncBridgeRow, PostgresRegistry } from './entity-registry.js' +import type { Offset, ShapeStreamInterface } from '@electric-sql/client' + +export const PG_SYNC_ELECTRIC_SHAPE_URL = + process.env.ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL ?? + `http://localhost:3000/v1/shape` + +type PgSyncOperation = `insert` | `update` | `delete` +type WakeEvaluator = ( + sourceUrl: string, + event: Record +) => Promise | void + +export type PgSyncResolvedSource = { + shapeUrl: string + secret?: string +} + +export interface PgSyncBridgeManagerOptions { + shapeUrl?: string + secret?: string + retry?: { + initialDelayMs?: number + maxDelayMs?: number + random?: () => number + sleep?: (ms: number) => Promise + } +} + +const DEFAULT_RETRY_INITIAL_DELAY_MS = 1_000 +const DEFAULT_RETRY_MAX_DELAY_MS = 30_000 + +type PgSyncChangeMessage = { + headers: Record & { + operation?: PgSyncOperation | string + offset?: unknown + key?: unknown + rowKey?: unknown + } + value?: Record + key?: string + old_value?: Record +} + +type PgSyncCursor = { + handle: string + offset: string + initialSnapshotComplete: boolean +} + +export interface PgSyncBridgeCoordinator { + start?(): Promise + register( + options: PgSyncOptions + ): Promise<{ sourceRef: string; streamUrl: string }> + stop(): Promise +} + +export function buildElectricShapeParams( + options: PgSyncOptions +): Record { + return { + table: options.table, + ...(options.columns !== undefined ? { columns: [...options.columns] } : {}), + ...(options.where !== undefined ? { where: options.where } : {}), + ...(options.params !== undefined + ? { + params: Array.isArray(options.params) + ? [...options.params] + : { ...options.params }, + } + : {}), + ...(options.replica !== undefined ? { replica: options.replica } : {}), + } +} + +function jsonSafe(value: unknown): unknown { + if (typeof value === `bigint`) return value.toString() + if (value === null || typeof value !== `object`) return value + if (Array.isArray(value)) return value.map(jsonSafe) + return Object.fromEntries( + Object.entries(value as Record).map(([key, item]) => [ + key, + jsonSafe(item), + ]) + ) +} + +function stableJson(value: unknown): string { + if (typeof value === `bigint`) return JSON.stringify(value.toString()) + if (value === null || typeof value !== `object`) return JSON.stringify(value) + if (Array.isArray(value)) return `[${value.map(stableJson).join(`,`)}]` + return `{${Object.keys(value as Record) + .sort() + .map( + (key) => + `${JSON.stringify(key)}:${stableJson((value as Record)[key])}` + ) + .join(`,`)}}` +} + +function parseElectricOffset(offset: string): Offset | null { + if (offset === `-1`) return offset + return /^\d+_\d+$/.test(offset) ? (offset as Offset) : null +} + +function rowKeyForMessage(message: PgSyncChangeMessage): string | undefined { + const headers = message.headers as Record + const candidate = + headers.key ?? + headers.rowKey ?? + message.value?.id ?? + message.value?.key ?? + message.old_value?.id ?? + message.old_value?.key + return candidate === undefined ? undefined : stableJson(candidate) +} + +export function pgSyncMessageToDurableEvent( + message: PgSyncChangeMessage, + optionsOrSourceRef: PgSyncOptions | string +): { + type: `pg_sync_change` + key: string + value: Record + headers: { operation: PgSyncOperation; timestamp: string } +} | null { + const operation = message.headers.operation + if ( + operation !== `insert` && + operation !== `update` && + operation !== `delete` + ) + return null + + const sourceRef = + typeof optionsOrSourceRef === `string` + ? optionsOrSourceRef + : sourceRefForPgSync(optionsOrSourceRef) + const rowKey = rowKeyForMessage(message) + const offset = message.headers.offset + if (typeof offset !== `string` || offset.length === 0) return null + const messageKeyPart = offset + const messageKey = `${sourceRef}:${operation}:${messageKeyPart}` + const timestamp = new Date().toISOString() + const oldValue = message.old_value + const safeValue = jsonSafe(message.value) + const safeOldValue = jsonSafe(oldValue) + const safeHeaders = jsonSafe(message.headers) + + return { + type: `pg_sync_change`, + key: messageKey, + value: { + key: messageKey, + table: + typeof optionsOrSourceRef === `string` + ? undefined + : optionsOrSourceRef.table, + operation, + ...(rowKey !== undefined ? { rowKey } : {}), + ...(message.value !== undefined ? { value: safeValue } : {}), + ...(oldValue !== undefined ? { oldValue: safeOldValue } : {}), + headers: safeHeaders, + ...(typeof offset === `string` ? { offset } : {}), + receivedAt: timestamp, + }, + headers: { operation, timestamp }, + } +} + +function cursorFromRow( + row: + | Pick< + PgSyncBridgeRow, + `shapeHandle` | `shapeOffset` | `initialSnapshotComplete` + > + | undefined +): PgSyncCursor | undefined { + return row?.shapeHandle && row.shapeOffset + ? { + handle: row.shapeHandle, + offset: row.shapeOffset, + initialSnapshotComplete: row.initialSnapshotComplete, + } + : undefined +} + +class PgSyncBridge { + private producer: IdempotentProducer | null = null + private unsubscribe: (() => void) | null = null + private abortController: AbortController | null = null + private skipChangesUntilUpToDate = false + private recovering = false + private committedCursor?: PgSyncCursor + private retryAttempt = 0 + + constructor( + readonly sourceRef: string, + readonly streamUrl: string, + private options: CanonicalPgSyncConfig, + private resolvedSource: PgSyncResolvedSource, + private retry: Required>, + private streamClient: StreamClient, + private registry?: PostgresRegistry, + private evaluateWakes?: WakeEvaluator, + private initialCursor?: PgSyncCursor + ) { + this.committedCursor = initialCursor + } + + async start(): Promise { + if (!this.producer) { + this.producer = new IdempotentProducer( + new DurableStream({ + url: `${this.streamClient.baseUrl}${this.streamUrl}`, + contentType: `application/json`, + }), + `pg-sync-bridge-${this.sourceRef}` + ) + } + if (this.initialCursor) { + const offset = parseElectricOffset(this.initialCursor.offset) + if (offset) { + this.startStream( + offset, + this.initialCursor.handle, + !this.initialCursor.initialSnapshotComplete + ) + return + } + } + await this.registry?.clearPgSyncBridgeCursor(this.sourceRef) + this.startStream(`-1`, undefined, true) + } + + async stop(): Promise { + this.unsubscribe?.() + this.abortController?.abort() + this.unsubscribe = null + this.abortController = null + try { + await this.producer?.flush() + } finally { + await this.producer?.detach() + this.producer = null + } + } + + private startStream( + offset: Offset, + handle?: string, + skipChangesUntilUpToDate = false + ): void { + this.unsubscribe?.() + this.abortController?.abort() + this.skipChangesUntilUpToDate = skipChangesUntilUpToDate + this.abortController = new AbortController() + const stream: ShapeStreamInterface> = + new ShapeStream({ + url: this.resolvedSource.shapeUrl, + params: { + ...buildElectricShapeParams(this.options), + ...(this.resolvedSource.secret + ? { secret: this.resolvedSource.secret } + : {}), + } as never, + offset, + ...(handle ? { handle } : {}), + signal: this.abortController.signal, + }) + this.unsubscribe = stream.subscribe( + async (messages) => { + try { + for (const message of messages) { + if (isControlMessage(message)) { + if (message.headers.control === `must-refetch`) { + await this.registry?.clearPgSyncBridgeCursor(this.sourceRef) + this.startStream(`-1`, undefined, true) + return + } + if (message.headers.control === `up-to-date`) { + this.skipChangesUntilUpToDate = false + await this.persistCursor(stream, true) + continue + } + await this.persistCursor(stream) + continue + } + if (!isChangeMessage(message)) continue + if (!this.skipChangesUntilUpToDate) { + const event = pgSyncMessageToDurableEvent(message, this.options) + if (event) { + if (!this.producer) + throw new Error(`pg-sync producer is not started`) + await this.producer.append(JSON.stringify(event)) + await this.producer.flush?.() + await this.evaluateWakes?.(this.streamUrl, event) + } + } + await this.persistCursor(stream) + this.retryAttempt = 0 + } + } catch (error) { + serverLog.warn( + `[pg-sync-bridge] subscription callback failed for ${this.sourceRef}:`, + error + ) + await this.recoverStream() + } + }, + (error) => { + if (this.abortController?.signal.aborted) return + serverLog.warn( + `[pg-sync-bridge] subscription failed for ${this.sourceRef}:`, + error + ) + void this.recoverStream() + } + ) + } + + private async recoverStream(): Promise { + if (this.recovering) return + this.recovering = true + try { + const attempt = this.retryAttempt++ + const baseDelay = Math.min( + this.retry.initialDelayMs * 2 ** attempt, + this.retry.maxDelayMs + ) + const jitter = Math.floor(baseDelay * 0.2 * this.retry.random()) + const delay = baseDelay + jitter + if (delay > 0) await this.retry.sleep(delay) + + const offset = this.committedCursor + ? parseElectricOffset(this.committedCursor.offset) + : null + if (offset && this.committedCursor) { + this.startStream( + offset, + this.committedCursor.handle, + !this.committedCursor.initialSnapshotComplete + ) + } else { + await this.registry?.clearPgSyncBridgeCursor(this.sourceRef) + this.startStream(`-1`, undefined, true) + } + } finally { + this.recovering = false + } + } + + private async persistCursor( + stream: ShapeStreamInterface>, + initialSnapshotComplete = !this.skipChangesUntilUpToDate + ): Promise { + const shapeHandle = stream.shapeHandle + const shapeOffset = stream.lastOffset + if (!shapeHandle || !shapeOffset || shapeOffset === `-1`) return + await this.registry?.updatePgSyncBridgeCursor( + this.sourceRef, + shapeHandle, + shapeOffset, + initialSnapshotComplete + ) + this.committedCursor = { + handle: shapeHandle, + offset: shapeOffset, + initialSnapshotComplete, + } + } +} + +export class PgSyncBridgeManager implements PgSyncBridgeCoordinator { + private bridges = new Map() + private starting = new Map>() + + private readonly shapeUrl: string + private readonly secret?: string + private readonly retry: Required< + NonNullable + > + + constructor( + private streamClient: StreamClient, + private evaluateWakes?: WakeEvaluator, + private registry?: PostgresRegistry, + options: PgSyncBridgeManagerOptions = {} + ) { + this.shapeUrl = options.shapeUrl ?? PG_SYNC_ELECTRIC_SHAPE_URL + this.secret = options.secret ?? process.env.ELECTRIC_AGENTS_PG_SYNC_SECRET + this.retry = { + initialDelayMs: + options.retry?.initialDelayMs ?? DEFAULT_RETRY_INITIAL_DELAY_MS, + maxDelayMs: options.retry?.maxDelayMs ?? DEFAULT_RETRY_MAX_DELAY_MS, + random: options.retry?.random ?? Math.random, + sleep: + options.retry?.sleep ?? + ((ms: number) => + new Promise((resolve) => setTimeout(resolve, ms))), + } + } + + async start(): Promise { + const rows = await this.registry?.listPgSyncBridges?.() + if (!rows) return + await Promise.all( + rows.map((row) => + this.ensureBridge(row).catch((error) => { + serverLog.warn( + `[pg-sync-bridge] failed to start ${row.sourceRef}:`, + error + ) + }) + ) + ) + } + + async register( + options: PgSyncOptions + ): Promise<{ sourceRef: string; streamUrl: string }> { + const canonicalOptions = canonicalPgSyncOptions(options) + const resolvedSource = this.resolveSource() + const sourceRef = sourceRefForPgSync(canonicalOptions) + const streamUrl = getPgSyncStreamPath(sourceRef, this.registry?.tenantId) + const row = await this.registry?.upsertPgSyncBridge({ + sourceRef, + options: canonicalOptions, + streamUrl, + }) + await this.streamClient.ensure(streamUrl, { + contentType: `application/json`, + }) + if (!this.bridges.has(sourceRef)) { + let start = this.starting.get(sourceRef) + if (!start) { + start = (async () => { + const bridge = new PgSyncBridge( + sourceRef, + streamUrl, + canonicalOptions, + resolvedSource, + this.retry, + this.streamClient, + this.registry, + this.evaluateWakes, + cursorFromRow(row) + ) + await bridge.start() + this.bridges.set(sourceRef, bridge) + })().finally(() => this.starting.delete(sourceRef)) + this.starting.set(sourceRef, start) + } + await start + } + return { sourceRef, streamUrl } + } + + private async ensureBridge(row: PgSyncBridgeRow): Promise { + if (this.bridges.has(row.sourceRef)) return + let start = this.starting.get(row.sourceRef) + if (!start) { + start = (async () => { + await this.streamClient.ensure(row.streamUrl, { + contentType: `application/json`, + }) + const canonicalOptions = canonicalPgSyncOptions(row.options) + const resolvedSource = this.resolveSource() + const bridge = new PgSyncBridge( + row.sourceRef, + row.streamUrl, + canonicalOptions, + resolvedSource, + this.retry, + this.streamClient, + this.registry, + this.evaluateWakes, + cursorFromRow(row) + ) + await bridge.start() + this.bridges.set(row.sourceRef, bridge) + })().finally(() => this.starting.delete(row.sourceRef)) + this.starting.set(row.sourceRef, start) + } + await start + } + + private resolveSource(): PgSyncResolvedSource { + return { shapeUrl: this.shapeUrl, secret: this.secret } + } + + async stop(): Promise { + await Promise.allSettled(this.starting.values()) + await Promise.all([...this.bridges.values()].map((bridge) => bridge.stop())) + this.bridges.clear() + } +} diff --git a/packages/agents-server/src/routing/context.ts b/packages/agents-server/src/routing/context.ts index c81a048b5a..bb58be4ffd 100644 --- a/packages/agents-server/src/routing/context.ts +++ b/packages/agents-server/src/routing/context.ts @@ -5,6 +5,7 @@ import type { } from '@electric-ax/agents-runtime' import type { DrizzleDB } from '../db/index.js' import type { EntityBridgeCoordinator } from '../entity-bridge-manager.js' +import type { PgSyncBridgeCoordinator } from '../pg-sync-bridge-manager.js' import type { EntityManager } from '../entity-manager.js' import type { ElectricAgentsTenantRuntime } from '../runtime.js' import type { StreamClient } from '../stream-client.js' @@ -54,6 +55,7 @@ export interface TenantContext { streamClient: StreamClient runtime: ElectricAgentsTenantRuntime entityBridgeManager: EntityBridgeCoordinator + pgSyncBridgeManager?: PgSyncBridgeCoordinator eventSources?: EventSourceCatalog ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void authorizeRequest?: AuthorizeRequest diff --git a/packages/agents-server/src/routing/global-router.ts b/packages/agents-server/src/routing/global-router.ts index e554db08dd..5d0cdbd216 100644 --- a/packages/agents-server/src/routing/global-router.ts +++ b/packages/agents-server/src/routing/global-router.ts @@ -29,5 +29,8 @@ export const globalRouter: GlobalRoutes = AutoRouter< }) globalRouter.all(`/_electric/shared-state/*`, durableStreamsRouter.fetch) +globalRouter.all(`/_electric/pg-sync/register`, internalRouter.fetch) +globalRouter.get(`/_electric/pg-sync/*`, durableStreamsRouter.fetch) +globalRouter.head(`/_electric/pg-sync/*`, durableStreamsRouter.fetch) globalRouter.all(`/_electric/*`, internalRouter.fetch) globalRouter.all(`*`, durableStreamsRouter.fetch) diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index 43970704d9..fe544d3b10 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -30,6 +30,7 @@ import { resolveDurableStreamsRoutingAdapter } from './durable-streams-routing-a import { electricProxyRouter } from './electric-proxy-router.js' import { entitiesRouter } from './entities-router.js' import { entityTypesRouter } from './entity-types-router.js' +import { pgSyncRouter } from './pg-sync-router.js' import { getRequestSpan } from './hooks.js' import { observationsRouter } from './observations-router.js' import { runnersRouter } from './runners-router.js' @@ -135,6 +136,7 @@ internalRouter.all(`/runners`, runnersRouter.fetch) internalRouter.all(`/runners/*`, runnersRouter.fetch) internalRouter.all(`/entities/*`, entitiesRouter.fetch) internalRouter.all(`/entity-types/*`, entityTypesRouter.fetch) +internalRouter.all(`/pg-sync/*`, pgSyncRouter.fetch) internalRouter.all(`/observations/*`, observationsRouter.fetch) internalRouter.get(`/electric/*`, electricProxyRouter.fetch) internalRouter.all(`*`, () => status(404)) diff --git a/packages/agents-server/src/routing/pg-sync-router.ts b/packages/agents-server/src/routing/pg-sync-router.ts new file mode 100644 index 0000000000..44297fbeae --- /dev/null +++ b/packages/agents-server/src/routing/pg-sync-router.ts @@ -0,0 +1,91 @@ +/** + * HTTP routes for pg-sync observation source registration. + */ + +import type { PgSyncOptions } from '@electric-ax/agents-runtime' +import { Type, type Static } from '@sinclair/typebox' +import { Router, json } from 'itty-router' +import { apiError } from '../electric-agents-http.js' +import { ErrCodeInvalidRequest } from '../electric-agents-types.js' +import { routeBody, withSchema } from './schema.js' +import type { JsonRouteRequest } from './schema.js' +import type { RouterType } from 'itty-router' +import type { TenantContext } from './context.js' + +const pgSyncOptionsSchema = Type.Object({ + table: Type.String(), + columns: Type.Optional(Type.Array(Type.String())), + where: Type.Optional(Type.String()), + params: Type.Optional( + Type.Union([ + Type.Array(Type.String()), + Type.Record(Type.String(), Type.String()), + ]) + ), + replica: Type.Optional( + Type.Union([Type.Literal(`default`), Type.Literal(`full`)]) + ), +}) + +const pgSyncRegisterBodySchema = Type.Object({ + options: pgSyncOptionsSchema, +}) + +type PgSyncRegisterBody = Static + +export type PgSyncRoutes = RouterType< + JsonRouteRequest, + [TenantContext], + Response | undefined +> + +export const pgSyncRouter: PgSyncRoutes = Router< + JsonRouteRequest, + [TenantContext], + Response | undefined +>({ + base: `/_electric/pg-sync`, +}) + +pgSyncRouter.post( + `/register`, + withSchema(pgSyncRegisterBodySchema), + registerPgSync +) + +async function registerPgSync( + request: JsonRouteRequest, + ctx: TenantContext +): Promise { + const { options } = routeBody(request) + + if (options.table.trim() === ``) { + return apiError( + 400, + ErrCodeInvalidRequest, + `pgSync table must be non-empty` + ) + } + + if (!ctx.pgSyncBridgeManager) { + return apiError( + 503, + ErrCodeInvalidRequest, + `pgSync bridge manager is not configured` + ) + } + + try { + const result = await ctx.pgSyncBridgeManager.register( + options as PgSyncOptions + ) + + return json(result) + } catch (error) { + return apiError( + 500, + ErrCodeInvalidRequest, + `pgSync registration failed: ${error instanceof Error ? error.message : String(error)}` + ) + } +} diff --git a/packages/agents-server/src/runtime.ts b/packages/agents-server/src/runtime.ts index 81af4e3a64..c545fa714c 100644 --- a/packages/agents-server/src/runtime.ts +++ b/packages/agents-server/src/runtime.ts @@ -14,7 +14,12 @@ import { isPermanentElectricAgentsError } from './scheduler.js' import { StreamClient } from './stream-client.js' import { DEFAULT_TENANT_ID } from './tenant.js' import type { DrizzleDB } from './db/index.js' +import { PgSyncBridgeManager } from './pg-sync-bridge-manager.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' +import type { + PgSyncBridgeCoordinator, + PgSyncBridgeManagerOptions, +} from './pg-sync-bridge-manager.js' import type { DurableStreamsBearerProvider } from './stream-client.js' import type { CronTickPayload, @@ -40,6 +45,8 @@ export interface ElectricAgentsTenantRuntimeOptions { wakeRegistry: WakeRegistry scheduler: SchedulerClient entityBridgeManager: EntityBridgeCoordinator + pgSyncBridgeManager?: PgSyncBridgeCoordinator + pgSync?: PgSyncBridgeManagerOptions claimWriteTokens?: ClaimWriteTokenStore stopWakeRegistryOnShutdown?: boolean } @@ -53,6 +60,7 @@ export class ElectricAgentsTenantRuntime { readonly wakeRegistry: WakeRegistry readonly scheduler: SchedulerClient readonly entityBridgeManager: EntityBridgeCoordinator + readonly pgSyncBridgeManager: PgSyncBridgeCoordinator readonly claimWriteTokens: ClaimWriteTokenStore readonly manager: EntityManager @@ -92,10 +100,21 @@ export class ElectricAgentsTenantRuntime { ), stopWakeRegistryOnShutdown: options.stopWakeRegistryOnShutdown ?? false, }) + this.pgSyncBridgeManager = + options.pgSyncBridgeManager ?? + new PgSyncBridgeManager( + this.streamClient, + (sourceUrl, event) => this.manager.evaluateWakes(sourceUrl, event), + this.registry, + options.pgSync + ) } async stop(): Promise { - await this.manager.shutdown() + await Promise.all([ + this.manager.shutdown(), + this.pgSyncBridgeManager.stop(), + ]) } async rehydrateCronSchedules(): Promise { diff --git a/packages/agents-server/src/server.ts b/packages/agents-server/src/server.ts index bb68961c3d..6a09239308 100644 --- a/packages/agents-server/src/server.ts +++ b/packages/agents-server/src/server.ts @@ -36,6 +36,7 @@ import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' import type { DurableStreamsRoutingAdapter } from './routing/durable-streams-routing-adapter.js' import type { OssServerContext } from './routing/oss-server-router.js' import type { EventSourceCatalog } from './routing/context.js' +import type { PgSyncBridgeManagerOptions } from './pg-sync-bridge-manager.js' import type { StartedStandaloneAgentsRuntime } from './standalone-runtime.js' import type { DurableStreamsBearerProvider } from './stream-client.js' import type { @@ -72,6 +73,7 @@ export interface ElectricAgentsServerOptions { allowDevPrincipalFallback?: boolean eventSources?: EventSourceCatalog ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void + pgSync?: PgSyncBridgeManagerOptions /** * Disabled by default. When set to a positive interval, periodically * recovers expired dispatch claims and stale outstanding wakes. @@ -242,6 +244,7 @@ export class ElectricAgentsServer { streamClient: this.streamClient, electricUrl: this.options.electricUrl, electricSecret: this.options.electricSecret, + pgSync: this.options.pgSync, }) this.electricAgentsManager = this.standaloneRuntime.manager this.entityBridgeManager = this.standaloneRuntime.entityBridgeManager @@ -446,6 +449,7 @@ export class ElectricAgentsServer { streamClient: this.streamClient, runtime: this.standaloneRuntime.runtime, entityBridgeManager: this.entityBridgeManager, + pgSyncBridgeManager: this.standaloneRuntime.runtime.pgSyncBridgeManager, ...(this.options.eventSources ? { eventSources: this.options.eventSources } : {}), diff --git a/packages/agents-server/src/standalone-runtime.ts b/packages/agents-server/src/standalone-runtime.ts index 5f94c5b856..bddf870b43 100644 --- a/packages/agents-server/src/standalone-runtime.ts +++ b/packages/agents-server/src/standalone-runtime.ts @@ -11,6 +11,10 @@ import { WakeRegistry } from './wake-registry.js' import type { DrizzleDB, PgClient } from './db/index.js' import type { EntityManager } from './entity-manager.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' +import type { + PgSyncBridgeCoordinator, + PgSyncBridgeManagerOptions, +} from './pg-sync-bridge-manager.js' import type { CronTickPayload, DelayedSendPayload } from './scheduler.js' import type { DurableStreamsBearerProvider } from './stream-client.js' @@ -30,8 +34,11 @@ export interface StandaloneAgentsRuntimeOptions { startScheduler?: boolean startTagStreamOutboxDrainer?: boolean startEntityBridgeManager?: boolean + startPgSyncBridgeManager?: boolean rehydrateOnStart?: boolean entityBridgeManager?: EntityBridgeCoordinator + pgSyncBridgeManager?: PgSyncBridgeCoordinator + pgSync?: PgSyncBridgeManagerOptions } export interface StartedStandaloneAgentsRuntime { @@ -46,6 +53,7 @@ export interface StartedStandaloneAgentsRuntime { manager: EntityManager scheduler: Scheduler entityBridgeManager: EntityBridgeCoordinator + pgSyncBridgeManager: PgSyncBridgeCoordinator tagStreamOutboxDrainer: TagStreamOutboxDrainer stop: () => Promise } @@ -104,6 +112,8 @@ export async function startStandaloneAgentsRuntime( wakeRegistry, scheduler, entityBridgeManager, + pgSyncBridgeManager: options.pgSyncBridgeManager, + pgSync: options.pgSync, stopWakeRegistryOnShutdown: options.wakeRegistry ? false : true, }) @@ -112,6 +122,7 @@ export async function startStandaloneAgentsRuntime( const startTagStreamOutboxDrainer = options.startTagStreamOutboxDrainer ?? true const startEntityBridgeManager = options.startEntityBridgeManager ?? true + const startPgSyncBridgeManager = options.startPgSyncBridgeManager ?? true const rehydrateOnStart = options.rehydrateOnStart ?? true let entityBridgeManagerStarted = false let tagStreamOutboxDrainerStarted = false @@ -153,6 +164,10 @@ export async function startStandaloneAgentsRuntime( await entityBridgeManager.start() entityBridgeManagerStarted = true } + if (startPgSyncBridgeManager) { + serverLog.info(`[agent-server] starting pg-sync bridge manager...`) + await runtime.pgSyncBridgeManager.start?.() + } if (startTagStreamOutboxDrainer) { serverLog.info(`[agent-server] starting tag stream outbox drainer...`) tagStreamOutboxDrainer.start() @@ -181,6 +196,7 @@ export async function startStandaloneAgentsRuntime( manager: runtime.manager, scheduler, entityBridgeManager, + pgSyncBridgeManager: runtime.pgSyncBridgeManager, tagStreamOutboxDrainer, stop, } diff --git a/packages/agents-server/src/wake-registry.ts b/packages/agents-server/src/wake-registry.ts index 8c1c6d8049..6200057fd3 100644 --- a/packages/agents-server/src/wake-registry.ts +++ b/packages/agents-server/src/wake-registry.ts @@ -41,6 +41,8 @@ export interface WakeEvalResult { collection: string kind: `insert` | `update` | `delete` key: string + value?: unknown + oldValue?: unknown from?: string from_principal?: string from_agent?: string @@ -937,14 +939,21 @@ export class WakeRegistry { return null } + const value = event.value as Record | undefined const change: WakeEvalResult[`wakeMessage`][`changes`][number] = { collection: eventType, kind, key: (event.key as string) || ``, } + if (value && `value` in value) { + change.value = value.value + } + if (value && `oldValue` in value) { + change.oldValue = value.oldValue + } + if (eventType === `inbox`) { - const value = event.value as Record | undefined if (typeof value?.from === `string`) change.from = value.from if (typeof value?.from_principal === `string`) { change.from_principal = value.from_principal diff --git a/packages/agents-server/test/manifest-side-effects.test.ts b/packages/agents-server/test/manifest-side-effects.test.ts index 4588420478..6a6ccb91c2 100644 --- a/packages/agents-server/test/manifest-side-effects.test.ts +++ b/packages/agents-server/test/manifest-side-effects.test.ts @@ -1,5 +1,11 @@ import { describe, expect, it } from 'vitest' -import { buildManifestWakeRegistration } from '../src/manifest-side-effects' +import { + buildManifestWakeRegistration, + extractManifestSourceUrl, +} from '../src/manifest-side-effects' + +const getPgSyncStreamPath = (sourceRef: string) => + `/_electric/pg-sync/${sourceRef}` describe(`manifest side effects`, () => { it(`uses sourceRef for entity manifest wakes when config has no entityUrl`, () => { @@ -45,6 +51,86 @@ describe(`manifest side effects`, () => { }) }) + it(`maps pgSync source manifest sourceRef to pg-sync stream path`, () => { + const sourceRef = `pg_abc123` + + expect( + extractManifestSourceUrl({ + kind: `source`, + sourceType: `pgSync`, + sourceRef, + }) + ).toBe(getPgSyncStreamPath(sourceRef)) + }) + + it(`builds pgSync change wake registrations with ops`, () => { + const sourceRef = `pg_delete_source` + const registration = buildManifestWakeRegistration( + `/parent/p1`, + { + kind: `source`, + sourceType: `pgSync`, + sourceRef, + wake: { on: `change`, ops: [`delete`] }, + }, + `source:pgSync:${sourceRef}` + ) + + expect(registration).toEqual({ + subscriberUrl: `/parent/p1`, + sourceUrl: getPgSyncStreamPath(sourceRef), + condition: { + on: `change`, + ops: [`delete`], + }, + debounceMs: undefined, + timeoutMs: undefined, + oneShot: false, + manifestKey: `source:pgSync:${sourceRef}`, + }) + }) + + it(`does not register pgSync source manifests without sourceRef`, () => { + const manifest = { + kind: `source`, + sourceType: `pgSync`, + wake: { on: `change`, ops: [`delete`] }, + } + + expect(extractManifestSourceUrl(manifest)).toBeUndefined() + expect(buildManifestWakeRegistration(`/parent/p1`, manifest)).toBeNull() + }) + + it(`preserves pgSync object-form wake collections, ops, debounceMs, and timeoutMs`, () => { + const sourceRef = `pg_object_wake` + const registration = buildManifestWakeRegistration(`/parent/p1`, { + kind: `source`, + sourceType: `pgSync`, + sourceRef, + wake: { + on: `change`, + collections: [`pg_sync_change`], + ops: [`insert`, `update`], + debounceMs: 250, + timeoutMs: 5_000, + }, + }) + + expect(registration).toEqual({ + subscriberUrl: `/parent/p1`, + sourceUrl: getPgSyncStreamPath(sourceRef), + condition: { + on: `change`, + collections: [`pg_sync_change`], + ops: [`insert`, `update`], + }, + debounceMs: 250, + timeoutMs: 5_000, + oneShot: false, + manifestKey: undefined, + }) + }) + it(`builds webhook manifest wakes from the configured stream URL`, () => { const registration = buildManifestWakeRegistration( `/webhook-smoke/demo`, diff --git a/packages/agents-server/test/oss-server-router.test.ts b/packages/agents-server/test/oss-server-router.test.ts index 84dcad49bf..dec08c8385 100644 --- a/packages/agents-server/test/oss-server-router.test.ts +++ b/packages/agents-server/test/oss-server-router.test.ts @@ -67,6 +67,69 @@ describe(`OSS server routing wrapper`, () => { } }) + it(`routes pg-sync stream reads to Durable Streams while keeping registration internal and writes blocked`, async () => { + const fetchSpy = vi + .spyOn(globalThis, `fetch`) + .mockResolvedValue(new Response(`[]`, { status: 200 })) + const pgSyncBridgeManager = { + register: vi.fn().mockResolvedValue({ + sourceRef: `abc123`, + streamUrl: `/_electric/pg-sync/abc123`, + }), + } + + try { + const readResponse = await globalRouter.fetch( + request(`GET`, `/_electric/pg-sync/abc123?offset=-1`), + buildTenantContext() + ) + + expect(readResponse.status).toBe(200) + expect(String(fetchSpy.mock.calls[0]![0])).toBe( + `http://durable.local/v1/stream/tenant-test/_electric/pg-sync/abc123?offset=-1` + ) + + const headResponse = await globalRouter.fetch( + request(`HEAD`, `/_electric/pg-sync/abc123`), + buildTenantContext() + ) + + expect(headResponse.status).toBe(200) + expect(String(fetchSpy.mock.calls[1]![0])).toBe( + `http://durable.local/v1/stream/tenant-test/_electric/pg-sync/abc123` + ) + + const writeResponse = await globalRouter.fetch( + new Request(`http://server/_electric/pg-sync/abc123`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ type: `pg_sync_change`, key: `forged` }), + }), + buildTenantContext() + ) + + expect(writeResponse.status).toBe(404) + expect(fetchSpy).toHaveBeenCalledTimes(2) + + const registerResponse = await globalRouter.fetch( + new Request(`http://server/_electric/pg-sync/register`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ options: { table: `entities` } }), + }), + buildTenantContext({ pgSyncBridgeManager: pgSyncBridgeManager as any }) + ) + + expect(registerResponse.status).toBe(200) + expect(pgSyncBridgeManager.register).toHaveBeenCalledWith({ + table: `entities`, + }) + expect(fetchSpy).toHaveBeenCalledTimes(2) + } finally { + fetchSpy.mockRestore() + } + }) + it(`keeps the exported global router free of the mock agent handler`, async () => { const runtime = { handleWebhookRequest: vi diff --git a/packages/agents-server/test/pg-sync-bridge-manager.test.ts b/packages/agents-server/test/pg-sync-bridge-manager.test.ts new file mode 100644 index 0000000000..025d6712e4 --- /dev/null +++ b/packages/agents-server/test/pg-sync-bridge-manager.test.ts @@ -0,0 +1,599 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { + getPgSyncStreamPath, + sourceRefForPgSync, +} from '@electric-ax/agents-runtime' +import { + buildElectricShapeParams, + pgSyncMessageToDurableEvent, + PgSyncBridgeManager, + PG_SYNC_ELECTRIC_SHAPE_URL, +} from '../src/pg-sync-bridge-manager' + +const { mockState } = vi.hoisted(() => ({ + mockState: { + callbacks: [] as Array< + (messages: Array>) => unknown + >, + constructedOptions: [] as Array>, + appends: [] as string[], + appendError: null as Error | null, + streams: [] as Array<{ shapeHandle?: string; lastOffset?: string }>, + }, +})) + +vi.mock(`@electric-sql/client`, () => ({ + isControlMessage: (message: { headers?: Record }) => + typeof message.headers?.control === `string`, + isChangeMessage: (message: { headers?: Record }) => + typeof message.headers?.operation === `string`, + ShapeStream: class MockShapeStream { + shapeHandle = `handle-${mockState.streams.length + 1}` + lastOffset = `1_0` + constructor(options: Record) { + mockState.constructedOptions.push(options) + mockState.streams.push(this) + } + subscribe( + callback: (messages: Array>) => unknown + ): () => void { + mockState.callbacks.push(callback) + return () => undefined + } + }, +})) + +vi.mock(`@durable-streams/client`, () => ({ + DurableStream: class MockDurableStream { + constructor(readonly options: unknown) {} + }, + IdempotentProducer: class MockIdempotentProducer { + async append(payload: string): Promise { + if (mockState.appendError) throw mockState.appendError + mockState.appends.push(payload) + } + async flush(): Promise {} + async detach(): Promise {} + }, +})) + +beforeEach(() => { + mockState.callbacks = [] + mockState.constructedOptions = [] + mockState.appends = [] + mockState.appendError = null + mockState.streams = [] +}) + +describe(`pg-sync bridge helpers`, () => { + it(`builds Electric shape params from JSON-safe options`, () => { + expect( + buildElectricShapeParams({ + table: `todos`, + columns: [`id`, `text`], + where: `done = $1`, + params: [`false`], + replica: `full`, + }) + ).toEqual({ + table: `todos`, + columns: [`id`, `text`], + where: `done = $1`, + params: [`false`], + replica: `full`, + }) + }) + + it(`converts insert/update/delete messages with stable keys`, () => { + const options = { table: `todos` } + const insert = pgSyncMessageToDurableEvent( + { + headers: { operation: `insert`, offset: `1_0` }, + value: { id: 1, text: `a` }, + } as any, + options + )! + const update = pgSyncMessageToDurableEvent( + { + headers: { operation: `update`, offset: `2_0` }, + value: { id: 1, text: `b` }, + } as any, + options + )! + const del = pgSyncMessageToDurableEvent( + { + headers: { operation: `delete`, offset: `3_0` }, + value: { id: 1 }, + } as any, + options + )! + + expect(insert.key).toBe(`${sourceRefForPgSync(options)}:insert:1_0`) + expect(update.headers.operation).toBe(`update`) + expect(del.value.operation).toBe(`delete`) + expect(del.value.rowKey).toBe(`1`) + }) + + it(`rejects messages without stable string offsets`, () => { + const options = { table: `todos` } + + expect( + pgSyncMessageToDurableEvent( + { + key: `shape-key-1`, + headers: { operation: `insert` }, + value: { id: 1 }, + } as any, + options + ) + ).toBeNull() + expect( + pgSyncMessageToDurableEvent( + { headers: { operation: `insert` }, value: { id: 1 } } as any, + options + ) + ).toBeNull() + }) + + it(`converts BigInt values to strings so durable events are JSON serializable`, () => { + const options = { table: `entities` } + const event = pgSyncMessageToDurableEvent( + { + headers: { operation: `insert`, offset: `12_0` }, + value: { id: 1n, nested: { count: 2n } }, + old_value: { id: 0n }, + } as any, + options + )! + + expect(JSON.stringify(event)).toContain(`"1"`) + expect(event.value.value).toEqual({ id: `1`, nested: { count: `2` } }) + expect(event.value.oldValue).toEqual({ id: `0` }) + expect(event.value.headers).toEqual({ operation: `insert`, offset: `12_0` }) + }) +}) + +describe(`PgSyncBridgeManager`, () => { + it(`starts one stream per sourceRef and appends change events`, async () => { + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager(streamClient as any) + + await manager.register({ table: `todos` }) + await manager.register({ table: `todos` }) + + expect(streamClient.ensure).toHaveBeenCalledTimes(2) + expect(mockState.constructedOptions).toHaveLength(1) + expect(mockState.constructedOptions[0]).toMatchObject({ + url: PG_SYNC_ELECTRIC_SHAPE_URL, + params: { table: `todos` }, + offset: `-1`, + }) + + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + ]) + expect(JSON.parse(mockState.appends[0]!)).toMatchObject({ + type: `pg_sync_change`, + headers: { operation: `insert` }, + value: { table: `todos`, operation: `insert`, rowKey: `1` }, + }) + }) + + it(`does not append or wake for initial snapshot changes before up-to-date`, async () => { + const evaluateWakes = vi.fn(async () => undefined) + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) + + await manager.register({ table: `todos` }) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + { headers: { control: `up-to-date` } }, + ]) + + expect(mockState.appends).toEqual([]) + expect(evaluateWakes).not.toHaveBeenCalled() + + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `2_0` }, value: { id: 2 } }, + ]) + expect(mockState.appends).toHaveLength(1) + expect(evaluateWakes).toHaveBeenCalledTimes(1) + }) + + it(`invokes wake evaluation with the pgSync stream URL after appending`, async () => { + const evaluateWakes = vi.fn(async () => undefined) + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager(streamClient as any, evaluateWakes) + const options = { table: `todos` } + const sourceRef = sourceRefForPgSync(options) + + await manager.register(options) + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + ]) + + expect(evaluateWakes).toHaveBeenCalledTimes(1) + expect(evaluateWakes).toHaveBeenCalledWith( + getPgSyncStreamPath(sourceRef), + expect.objectContaining({ + type: `pg_sync_change`, + headers: expect.objectContaining({ operation: `insert` }), + }) + ) + }) + + it(`namespaces pg-sync stream paths by tenant to avoid cross-tenant sharing`, async () => { + const registry = { + tenantId: `tenant-a`, + upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + updatePgSyncBridgeCursor: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + registry as any + ) + const options = { table: `todos` } + const sourceRef = sourceRefForPgSync(options) + + const result = await manager.register(options) + + expect(result.streamUrl).toBe(getPgSyncStreamPath(sourceRef, `tenant-a`)) + expect(registry.upsertPgSyncBridge).toHaveBeenCalledWith( + expect.objectContaining({ + sourceRef, + streamUrl: getPgSyncStreamPath(sourceRef, `tenant-a`), + }) + ) + }) + + it(`persists registration with canonical options and updates cursor after messages`, async () => { + const registry = { + upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), + touchPgSyncBridge: vi.fn(async () => undefined), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + updatePgSyncBridgeCursor: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + registry as any + ) + const options = { table: `todos`, params: { b: `2`, a: `1` } } + const sourceRef = sourceRefForPgSync(options) + + await manager.register(options) + expect(registry.upsertPgSyncBridge).toHaveBeenCalledWith({ + sourceRef, + streamUrl: getPgSyncStreamPath(sourceRef), + options: { + table: `todos`, + params: { a: `1`, b: `2` }, + replica: `default`, + }, + }) + + mockState.streams[0]!.shapeHandle = `shape-handle` + mockState.streams[0]!.lastOffset = `7_0` + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `7_0` }, value: { id: 1 } }, + ]) + expect(registry.updatePgSyncBridgeCursor).toHaveBeenCalledWith( + sourceRef, + `shape-handle`, + `7_0`, + true + ) + }) + + it(`does not persist cursor when append fails`, async () => { + const registry = { + upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), + touchPgSyncBridge: vi.fn(async () => undefined), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + updatePgSyncBridgeCursor: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + registry as any + ) + + await manager.register({ table: `todos` }) + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + registry.updatePgSyncBridgeCursor.mockClear() + mockState.appendError = new Error(`append failed`) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `7_0` }, value: { id: 1 } }, + ]) + + expect(registry.updatePgSyncBridgeCursor).not.toHaveBeenCalled() + expect(mockState.constructedOptions).toHaveLength(2) + }) + + it(`startup resumes existing pgSync bridges from stored cursor`, async () => { + const options = { table: `todos` } + const sourceRef = sourceRefForPgSync(options) + const registry = { + listPgSyncBridges: vi.fn(async () => [ + { + sourceRef, + streamUrl: getPgSyncStreamPath(sourceRef), + options, + shapeHandle: `handle-1`, + shapeOffset: `12_0`, + }, + ]), + } + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + streamClient as any, + undefined, + registry as any + ) + + await manager.start() + await manager.start() + + expect(mockState.constructedOptions).toHaveLength(1) + expect(mockState.constructedOptions[0]).toMatchObject({ + offset: `12_0`, + handle: `handle-1`, + }) + }) + + it(`invalid stored shape cursor falls back to bootstrap and clears cursor`, async () => { + const options = { table: `todos` } + const sourceRef = sourceRefForPgSync(options) + const registry = { + listPgSyncBridges: vi.fn(async () => [ + { + sourceRef, + streamUrl: getPgSyncStreamPath(sourceRef), + options, + shapeHandle: `handle-1`, + shapeOffset: `not-valid`, + }, + ]), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + registry as any + ) + + await manager.start() + + expect(mockState.constructedOptions[0]).toMatchObject({ offset: `-1` }) + expect(mockState.constructedOptions[0]).not.toHaveProperty(`handle`) + expect(registry.clearPgSyncBridgeCursor).toHaveBeenCalledWith(sourceRef) + }) + + it(`must-refetch clears persisted cursor and restarts bootstrap`, async () => { + const options = { table: `todos` } + const sourceRef = sourceRefForPgSync(options) + const registry = { + upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), + touchPgSyncBridge: vi.fn(async () => undefined), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + registry as any + ) + await manager.register(options) + + await mockState.callbacks[0]!([{ headers: { control: `must-refetch` } }]) + + expect(registry.clearPgSyncBridgeCursor).toHaveBeenCalledWith(sourceRef) + expect(mockState.constructedOptions).toHaveLength(2) + expect(mockState.constructedOptions[1]!.offset).toBe(`-1`) + expect(mockState.constructedOptions[1]).not.toHaveProperty(`handle`) + }) + + it(`restarts from -1 on must-refetch`, async () => { + const manager = new PgSyncBridgeManager({ + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any) + await manager.register({ table: `todos` }) + + await mockState.callbacks[0]!([{ headers: { control: `must-refetch` } }]) + + expect(mockState.constructedOptions).toHaveLength(2) + expect(mockState.constructedOptions[1]!.offset).toBe(`-1`) + }) +}) + +describe(`external review red tests`, () => { + it(`continues skipping bootstrap snapshot rows after restart before up-to-date`, async () => { + const registryRows = new Map() + const registry = { + tenantId: `default`, + upsertPgSyncBridge: vi.fn(async (row) => { + const existing = registryRows.get(row.sourceRef) + const next = { ...existing, ...row } + registryRows.set(row.sourceRef, next) + return next + }), + clearPgSyncBridgeCursor: vi.fn(async (sourceRef) => { + const row = registryRows.get(sourceRef) + if (row) { + row.shapeHandle = undefined + row.shapeOffset = undefined + } + }), + updatePgSyncBridgeCursor: vi.fn( + async (sourceRef, shapeHandle, shapeOffset) => { + const row = registryRows.get(sourceRef) + if (row) { + row.shapeHandle = shapeHandle + row.shapeOffset = shapeOffset + } + } + ), + listPgSyncBridges: vi.fn(async () => [...registryRows.values()]), + } + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + + const first = new PgSyncBridgeManager( + streamClient as any, + undefined, + registry as any + ) + await first.register({ table: `todos` }) + mockState.streams[0]!.shapeHandle = `shape-a` + mockState.streams[0]!.lastOffset = `1_0` + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + ]) + expect(mockState.appends).toEqual([]) + await first.stop() + + const second = new PgSyncBridgeManager( + streamClient as any, + undefined, + registry as any + ) + await second.start() + await mockState.callbacks[1]!([ + { headers: { operation: `insert`, offset: `2_0` }, value: { id: 2 } }, + { headers: { control: `up-to-date` } }, + ]) + + expect(mockState.appends).toEqual([]) + }) + + it(`recovers from append failure using last committed cursor, not received stream offset`, async () => { + const registry = { + tenantId: `default`, + upsertPgSyncBridge: vi.fn(async (row) => ({ ...row })), + clearPgSyncBridgeCursor: vi.fn(async () => undefined), + updatePgSyncBridgeCursor: vi.fn(async () => undefined), + } + const streamClient = { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } + const manager = new PgSyncBridgeManager( + streamClient as any, + undefined, + registry as any + ) + + await manager.register({ table: `todos` }) + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + mockState.streams[0]!.shapeHandle = `shape-a` + mockState.streams[0]!.lastOffset = `2_0` + mockState.appendError = new Error(`append failed`) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + { headers: { operation: `insert`, offset: `2_0` }, value: { id: 2 } }, + ]) + + expect(mockState.constructedOptions.at(-1)).toMatchObject({ + offset: `1_0`, + }) + }) + + it(`rejects pg-sync change messages without a stable per-change offset`, () => { + expect( + pgSyncMessageToDurableEvent( + { headers: { operation: `insert` }, value: { id: 1 } } as any, + { table: `todos` } + ) + ).toBeNull() + }) +}) + +describe(`pg-sync production hardening`, () => { + it(`uses configured shape URL and secret server-side`, async () => { + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { + shapeUrl: `https://electric.example/v1/shape`, + secret: `server-secret`, + retry: { initialDelayMs: 0, maxDelayMs: 0 }, + } + ) + + await manager.register({ table: `todos` }) + + expect(mockState.constructedOptions[0]).toMatchObject({ + url: `https://electric.example/v1/shape`, + params: { table: `todos`, replica: `default`, secret: `server-secret` }, + }) + }) + + it(`backs off before recovery retries`, async () => { + const sleeps: number[] = [] + const manager = new PgSyncBridgeManager( + { + baseUrl: `http://durable`, + ensure: vi.fn(async () => undefined), + } as any, + undefined, + undefined, + { + retry: { + initialDelayMs: 10, + maxDelayMs: 10, + random: () => 0, + sleep: async (ms) => { + sleeps.push(ms) + }, + }, + } + ) + + await manager.register({ table: `todos` }) + await mockState.callbacks[0]!([{ headers: { control: `up-to-date` } }]) + mockState.appendError = new Error(`append failed`) + await mockState.callbacks[0]!([ + { headers: { operation: `insert`, offset: `1_0` }, value: { id: 1 } }, + ]) + + expect(sleeps).toEqual([10]) + }) +}) diff --git a/packages/agents-server/test/pg-sync-router.test.ts b/packages/agents-server/test/pg-sync-router.test.ts new file mode 100644 index 0000000000..03ef12b7fa --- /dev/null +++ b/packages/agents-server/test/pg-sync-router.test.ts @@ -0,0 +1,105 @@ +import { describe, expect, it, vi } from 'vitest' +import { + getPgSyncStreamPath, + sourceRefForPgSync, +} from '@electric-ax/agents-runtime' +import { globalRouter } from '../src/routing/global-router' +import type { TenantContext } from '../src/routing/context' + +function request(method: string, path: string, body?: unknown): Request { + return new Request(`http://server${path}`, { + method, + headers: + body === undefined ? undefined : { 'content-type': `application/json` }, + body: body === undefined ? undefined : JSON.stringify(body), + }) +} + +function buildContext(overrides: Partial = {}): TenantContext { + return { + service: `tenant-test`, + principal: { + kind: `user`, + id: `owner@example.com`, + key: `user:owner@example.com`, + url: `/principal/user%3Aowner%40example.com`, + }, + publicUrl: `http://server`, + durableStreamsUrl: `http://durable.local`, + durableStreamsDispatcher: undefined as any, + pgDb: undefined as any, + entityManager: undefined as any, + streamClient: { + ensure: vi.fn(async () => undefined), + } as any, + runtime: undefined as any, + entityBridgeManager: undefined as any, + pgSyncBridgeManager: { + start: vi.fn(async () => undefined), + register: vi.fn(async (options) => ({ + sourceRef: sourceRefForPgSync(options), + streamUrl: getPgSyncStreamPath(sourceRefForPgSync(options)), + })), + stop: vi.fn(async () => undefined), + }, + isShuttingDown: () => false, + ...overrides, + } +} + +describe(`pg-sync routes`, () => { + it(`registers a pg-sync source and returns its stream path`, async () => { + const ctx = buildContext() + const expectedSourceRef = sourceRefForPgSync({ table: `todos` }) + const expectedStreamUrl = getPgSyncStreamPath(expectedSourceRef) + + const response = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { table: `todos` }, + }), + ctx + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + sourceRef: expectedSourceRef, + streamUrl: expectedStreamUrl, + }) + expect(ctx.pgSyncBridgeManager!.register).toHaveBeenCalledWith({ + table: `todos`, + }) + }) + + it(`rejects an empty table`, async () => { + const ctx = buildContext() + + const response = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { + options: { table: ` ` }, + }), + ctx + ) + + expect(response.status).toBe(400) + expect(ctx.pgSyncBridgeManager!.register).not.toHaveBeenCalled() + }) + + it(`computes the same sourceRef as the runtime pgSync helper`, async () => { + const ctx = buildContext() + const options = { + table: `todos`, + where: `priority = 'high'`, + params: { b: `2`, a: `1` }, + replica: `full` as const, + } + + const response = await globalRouter.fetch( + request(`POST`, `/_electric/pg-sync/register`, { options }), + ctx + ) + const body = (await response.json()) as { sourceRef: string } + + expect(response.status).toBe(200) + expect(body.sourceRef).toBe(sourceRefForPgSync(options)) + }) +}) diff --git a/packages/agents-server/test/pg-sync-wake-delivery.test.ts b/packages/agents-server/test/pg-sync-wake-delivery.test.ts new file mode 100644 index 0000000000..5a0b70d60a --- /dev/null +++ b/packages/agents-server/test/pg-sync-wake-delivery.test.ts @@ -0,0 +1,103 @@ +import { describe, expect, it } from 'vitest' +import { WakeRegistry } from '../src/wake-registry' + +function createDb() { + let id = 0 + return { + insert: () => ({ + values: () => ({ + onConflictDoNothing: () => ({ + returning: async () => [{ id: ++id }], + }), + }), + }), + } +} + +function event(operation: `insert` | `update` | `delete`, key = operation) { + return { + type: `pg_sync_change`, + key, + value: { + key, + operation, + value: { id: `entity-1`, status: `running` }, + oldValue: { id: `entity-1`, status: `spawning` }, + }, + headers: { operation, timestamp: new Date().toISOString() }, + } +} + +describe(`pgSync wake delivery matching`, () => { + it(`insert wakes insert subscriber and delete does not wake insert-only subscriber`, async () => { + const registry = new WakeRegistry(createDb() as any, `default`) + await registry.register({ + subscriberUrl: `/horton/a`, + sourceUrl: `/_electric/pg-sync/test`, + condition: { on: `change`, ops: [`insert`] }, + oneShot: false, + }) + + const insertResults = registry.evaluate( + `/_electric/pg-sync/test`, + event(`insert`), + `default` + ) + expect(insertResults.map((r) => r.subscriberUrl)).toEqual([`/horton/a`]) + expect(insertResults[0]!.wakeMessage.changes[0]).toMatchObject({ + value: { id: `entity-1`, status: `running` }, + oldValue: { id: `entity-1`, status: `spawning` }, + }) + expect( + registry.evaluate(`/_electric/pg-sync/test`, event(`delete`), `default`) + ).toEqual([]) + }) + + it(`splits two subscribers on the same source by operation`, async () => { + const registry = new WakeRegistry(createDb() as any, `default`) + await registry.register({ + subscriberUrl: `/horton/a`, + sourceUrl: `/_electric/pg-sync/test`, + condition: { on: `change`, ops: [`insert`] }, + oneShot: false, + }) + await registry.register({ + subscriberUrl: `/horton/b`, + sourceUrl: `/_electric/pg-sync/test`, + condition: { on: `change`, ops: [`delete`] }, + oneShot: false, + }) + + expect( + registry + .evaluate(`/_electric/pg-sync/test`, event(`insert`), `default`) + .map((r) => r.subscriberUrl) + ).toEqual([`/horton/a`]) + expect( + registry + .evaluate(`/_electric/pg-sync/test`, event(`delete`), `default`) + .map((r) => r.subscriberUrl) + ).toEqual([`/horton/b`]) + }) + + it(`filters pgSync events by collection`, async () => { + const registry = new WakeRegistry(createDb() as any, `default`) + await registry.register({ + subscriberUrl: `/horton/a`, + sourceUrl: `/_electric/pg-sync/test`, + condition: { on: `change`, collections: [`pg_sync_change`] }, + oneShot: false, + }) + + expect( + registry.evaluate(`/_electric/pg-sync/test`, event(`insert`), `default`) + ).toHaveLength(1) + expect( + registry.evaluate( + `/_electric/pg-sync/test`, + { ...event(`insert`), type: `other` }, + `default` + ) + ).toEqual([]) + }) +}) diff --git a/packages/agents-server/test/runtime-shutdown.test.ts b/packages/agents-server/test/runtime-shutdown.test.ts new file mode 100644 index 0000000000..890d0911e9 --- /dev/null +++ b/packages/agents-server/test/runtime-shutdown.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it, vi } from 'vitest' +import { ElectricAgentsTenantRuntime } from '../src/runtime' + +describe(`ElectricAgentsTenantRuntime shutdown`, () => { + it(`stops the pg-sync bridge manager`, async () => { + const pgSyncBridgeManager = { + start: vi.fn(async () => undefined), + register: vi.fn(), + stop: vi.fn(async () => undefined), + } + const runtime = new ElectricAgentsTenantRuntime({ + service: `tenant-test`, + db: undefined as any, + streamClient: { baseUrl: `http://durable` } as any, + wakeRegistry: { + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + scheduler: undefined as any, + entityBridgeManager: undefined as any, + pgSyncBridgeManager, + }) + vi.spyOn(runtime.manager, `shutdown`).mockResolvedValue(undefined) + + await runtime.stop() + + expect(runtime.manager.shutdown).toHaveBeenCalledOnce() + expect(pgSyncBridgeManager.stop).toHaveBeenCalledOnce() + }) +}) diff --git a/packages/agents-server/test/standalone-runtime.test.ts b/packages/agents-server/test/standalone-runtime.test.ts new file mode 100644 index 0000000000..7f14d3fedc --- /dev/null +++ b/packages/agents-server/test/standalone-runtime.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, it, vi } from 'vitest' +import { startStandaloneAgentsRuntime } from '../src/standalone-runtime' + +function createEntityBridgeManagerMock() { + return { + start: vi.fn(async () => undefined), + stop: vi.fn(async () => undefined), + register: vi.fn(async () => ({ + sourceRef: `test`, + streamUrl: `/_entities/test`, + })), + onEntityChanged: vi.fn(async () => undefined), + touchByStreamPath: vi.fn(async () => undefined), + beginClientRead: vi.fn(async () => null), + } +} + +describe(`standalone runtime pg-sync startup/shutdown`, () => { + it(`starts pg-sync bridge manager even when entity bridge manager startup is disabled`, async () => { + const entityBridgeManager = createEntityBridgeManagerMock() + const pgSyncBridgeManager = { + start: vi.fn(async () => undefined), + register: vi.fn(), + stop: vi.fn(async () => undefined), + } + + const standalone = await startStandaloneAgentsRuntime({ + service: `tenant-test`, + db: undefined as any, + pgClient: undefined as any, + streamClient: { baseUrl: `http://durable` } as any, + wakeRegistry: { + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + entityBridgeManager, + pgSyncBridgeManager, + startWakeRegistry: false, + rehydrateOnStart: false, + startScheduler: false, + startTagStreamOutboxDrainer: false, + startEntityBridgeManager: false, + }) + + expect(entityBridgeManager.start).not.toHaveBeenCalled() + expect(pgSyncBridgeManager.start).toHaveBeenCalledOnce() + + vi.spyOn(standalone.runtime.manager, `shutdown`).mockResolvedValue( + undefined + ) + await standalone.stop() + }) + + it(`delegates pg-sync bridge manager shutdown to runtime.stop once`, async () => { + const pgSyncBridgeManager = { + start: vi.fn(async () => undefined), + register: vi.fn(), + stop: vi.fn(async () => undefined), + } + const standalone = await startStandaloneAgentsRuntime({ + service: `tenant-test`, + db: undefined as any, + pgClient: undefined as any, + streamClient: { baseUrl: `http://durable` } as any, + wakeRegistry: { + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + entityBridgeManager: createEntityBridgeManagerMock(), + pgSyncBridgeManager, + startWakeRegistry: false, + rehydrateOnStart: false, + startScheduler: false, + startTagStreamOutboxDrainer: false, + }) + vi.spyOn(standalone.runtime.manager, `shutdown`).mockResolvedValue( + undefined + ) + + await standalone.stop() + await standalone.stop() + + expect(pgSyncBridgeManager.stop).toHaveBeenCalledOnce() + }) +}) diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index c6a6b82b5a..1668dc8022 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -3,6 +3,7 @@ import { z } from 'zod' import { serverLog } from '../log' import { createHortonDocsSupport } from '../docs/knowledge-base' import { createSpawnWorkerTool } from '../tools/spawn-worker' +import { createObservePgSyncTool } from '../tools/observe-pg-sync' import { modelInputSchemaDefs, modelChoiceValues, @@ -261,6 +262,9 @@ When a user opens with a greeting ("hi", "hello", "hey", etc.) or a broad statem - web_search: search the web - fetch_url: fetch and convert a URL to markdown - spawn_worker: dispatch a subagent for an isolated task +- send: send a message to an Electric Agent/entity by entity URL +- observe_pg_sync: observe an Electric Postgres sync stream and wake on matching changes +${docsTools}${skillsTools} - send: send a message to an Electric Agent/entity. To schedule future work for yourself, call send with self: true and afterMs. ${eventSourceTools}${docsTools}${skillsTools} @@ -330,6 +334,8 @@ export function createHortonTools( ] : [createFetchUrlTool(sandbox)]), createSpawnWorkerTool(ctx, opts.modelConfig), + createSendTool(ctx.send), + createObservePgSyncTool(ctx), createSendTool(ctx.send, { selfEntityUrl: ctx.entityUrl }), ...(opts.docsSearchTool ? [opts.docsSearchTool] : []), ] diff --git a/packages/agents/src/tools/observe-pg-sync.ts b/packages/agents/src/tools/observe-pg-sync.ts new file mode 100644 index 0000000000..276526c191 --- /dev/null +++ b/packages/agents/src/tools/observe-pg-sync.ts @@ -0,0 +1,103 @@ +import { Type } from '@sinclair/typebox' +import { pgSync, type HandlerContext } from '@electric-ax/agents-runtime' +import type { AgentTool } from '@mariozechner/pi-agent-core' + +function asToolResult(value: unknown) { + return { + content: [ + { + type: `text` as const, + text: + typeof value === `string` ? value : JSON.stringify(value, null, 2), + }, + ], + details: {}, + } +} + +const PgSyncOperation = Type.Union([ + Type.Literal(`insert`), + Type.Literal(`update`), + Type.Literal(`delete`), +]) + +export function createObservePgSyncTool(ctx: HandlerContext): AgentTool { + return { + name: `observe_pg_sync`, + label: `Observe Postgres Sync`, + description: `Observe an Electric Postgres shape stream and wake this agent when matching row changes arrive.`, + parameters: Type.Object({ + table: Type.String({ + minLength: 1, + pattern: `\\S`, + description: `Postgres table name to observe.`, + }), + columns: Type.Optional( + Type.Array(Type.String(), { + description: `Optional list of columns to include in the shape.`, + }) + ), + where: Type.Optional( + Type.String({ description: `Optional Electric shape WHERE clause.` }) + ), + params: Type.Optional( + Type.Union([ + Type.Array(Type.String()), + Type.Record(Type.String(), Type.String()), + ]) + ), + replica: Type.Optional( + Type.Union([Type.Literal(`default`), Type.Literal(`full`)]) + ), + wake: Type.Optional( + Type.Object( + { + ops: Type.Optional(Type.Array(PgSyncOperation)), + debounceMs: Type.Optional(Type.Number()), + }, + { additionalProperties: false } + ) + ), + }), + execute: async (_toolCallId, params) => { + const args = params as { + table: string + columns?: string[] + where?: string + params?: string[] | Record + replica?: `default` | `full` + wake?: { + ops?: Array<`insert` | `update` | `delete`> + debounceMs?: number + } + } + + if (typeof args.table !== `string` || args.table.trim().length === 0) { + throw new Error(`table is required`) + } + + const source = pgSync({ + table: args.table, + columns: args.columns, + where: args.where, + params: args.params, + replica: args.replica, + }) + const wake = { + on: `change` as const, + ...(args.wake?.ops ? { ops: args.wake.ops } : {}), + ...(args.wake?.debounceMs !== undefined + ? { debounceMs: args.wake.debounceMs } + : {}), + } + + await ctx.observe(source, { wake }) + + return asToolResult({ + sourceRef: source.sourceRef, + streamUrl: source.streamUrl, + wake, + }) + }, + } +} diff --git a/packages/agents/test/observe-pg-sync-tool.test.ts b/packages/agents/test/observe-pg-sync-tool.test.ts new file mode 100644 index 0000000000..fa7cbd0f87 --- /dev/null +++ b/packages/agents/test/observe-pg-sync-tool.test.ts @@ -0,0 +1,127 @@ +import { describe, expect, it, vi } from 'vitest' +import { Value } from '@sinclair/typebox/value' +import { pgSync } from '@electric-ax/agents-runtime' +import { createHortonTools } from '../src/agents/horton' +import { createObservePgSyncTool } from '../src/tools/observe-pg-sync' + +function textResult(result: unknown): any { + const text = (result as { content: Array<{ text: string }> }).content[0]!.text + return JSON.parse(text) +} + +describe(`observe_pg_sync tool`, () => { + it(`validates required table`, async () => { + const tool = createObservePgSyncTool({ observe: vi.fn() } as any) + + expect(Value.Check(tool.parameters as any, {})).toBe(false) + await expect(tool.execute(`call`, {})).rejects.toThrow(/table is required/) + }) + + it(`rejects invalid ops and unsupported timeoutMs when schema validates`, () => { + const tool = createObservePgSyncTool({ observe: vi.fn() } as any) + + expect( + Value.Check(tool.parameters as any, { + table: `todos`, + wake: { ops: [`merge`] }, + }) + ).toBe(false) + expect( + Value.Check(tool.parameters as any, { + table: `todos`, + wake: { timeoutMs: 1000 }, + }) + ).toBe(false) + }) + + it(`calls ctx.observe with pgSync source and wake options`, async () => { + const observe = vi.fn(async () => {}) + const tool = createObservePgSyncTool({ observe } as any) + + await tool.execute(`call`, { + table: `todos`, + columns: [`id`, `text`], + where: `priority = $1`, + params: [`high`], + replica: `full`, + wake: { ops: [`insert`], debounceMs: 25 }, + }) + + const expectedSource = pgSync({ + table: `todos`, + columns: [`id`, `text`], + where: `priority = $1`, + params: [`high`], + replica: `full`, + }) + expect(observe).toHaveBeenCalledTimes(1) + const observeCalls = observe.mock.calls as unknown as Array< + [unknown, unknown] + > + expect(observeCalls[0]![0]).toMatchObject({ + sourceType: expectedSource.sourceType, + sourceRef: expectedSource.sourceRef, + streamUrl: expectedSource.streamUrl, + options: expectedSource.options, + }) + expect(observeCalls[0]![1]).toEqual({ + wake: { + on: `change`, + ops: [`insert`], + debounceMs: 25, + }, + }) + }) + + it(`preserves debounceMs: 0`, async () => { + const observe = vi.fn(async () => {}) + const tool = createObservePgSyncTool({ observe } as any) + + const result = textResult( + await tool.execute(`call`, { table: `todos`, wake: { debounceMs: 0 } }) + ) + + expect(observe).toHaveBeenCalledWith(expect.anything(), { + wake: { on: `change`, debounceMs: 0 }, + }) + expect(result.wake).toEqual({ on: `change`, debounceMs: 0 }) + }) + + it(`returns sourceRef, streamUrl, and wake`, async () => { + const observe = vi.fn(async () => {}) + const tool = createObservePgSyncTool({ observe } as any) + + const result = textResult( + await tool.execute(`call`, { table: `todos`, wake: { ops: [`delete`] } }) + ) + const source = pgSync({ table: `todos` }) + + expect(result).toEqual({ + sourceRef: source.sourceRef, + streamUrl: source.streamUrl, + wake: { on: `change`, ops: [`delete`] }, + }) + }) + + it(`defaults wake when wake.ops is omitted`, async () => { + const observe = vi.fn(async () => {}) + const tool = createObservePgSyncTool({ observe } as any) + + const result = textResult(await tool.execute(`call`, { table: `todos` })) + + expect(observe).toHaveBeenCalledWith(expect.anything(), { + wake: { on: `change` }, + }) + expect(result.wake).toEqual({ on: `change` }) + }) + + it(`is included in Horton's tool list`, () => { + const tools = createHortonTools( + { workingDirectory: `/tmp` } as any, + { send: vi.fn(), observe: vi.fn() } as any, + new Set() + ) + + expect(tools.map((tool) => tool.name)).toContain(`observe_pg_sync`) + }) +}) diff --git a/scripts/dev.sh b/scripts/dev.sh index 2f730af67c..b5c90cb3ef 100755 --- a/scripts/dev.sh +++ b/scripts/dev.sh @@ -44,6 +44,10 @@ Subcommands: desktop Run the Electron desktop app (packages/agents-desktop) in the current terminal. Requires the rest of the stack to already be running via 'start'. + isolated [--no-build] [--no-agents] + Start a fully isolated test stack on randomly chosen + ports and run the Electron desktop app against it. + Ctrl-C stops desktop, dev processes, and docker services. stop Stop all dev processes + docker compose down (volumes kept). teardown Stop + docker compose down -v (wipes Postgres volume). status Print which services are running. @@ -123,6 +127,86 @@ wait_for_tcp() { return 1 } +wait_for_tcp_or_exit() { + # Like wait_for_tcp, but returns 2 immediately if the named spawned process exits. + local name="$1" host="$2" port="$3" timeout="${4:-60}" elapsed=0 + local pidfile="$LOG_DIR/$name.pid" + while (( elapsed < timeout )); do + if (exec 3<>"/dev/tcp/$host/$port") 2>/dev/null; then + exec 3<&- 3>&- + return 0 + fi + if [[ -f "$pidfile" ]]; then + local pid + pid=$(cat "$pidfile") + if ! kill -0 "$pid" 2>/dev/null; then + return 2 + fi + fi + sleep 1 + elapsed=$((elapsed + 1)) + done + return 1 +} + +random_ports() { + # Allocate N currently-free, unique localhost ports. Sockets stay open until + # all ports are selected, which avoids duplicates within one isolated run. + # There is still an unavoidable small race after this returns and before + # services bind, but docker/vite/server startup will fail fast if another + # process grabs one. + local count="$1" + python3 - "$count" <<'PYPORT' +import socket +import sys + +count = int(sys.argv[1]) +sockets = [] +try: + for _ in range(count): + s = socket.socket() + s.bind(("127.0.0.1", 0)) + sockets.append(s) + for s in sockets: + print(s.getsockname()[1]) +finally: + for s in sockets: + s.close() +PYPORT +} + +compose_down_project() { + local project="${1:-}" with_volumes="${2:-false}" + [[ -n "$project" ]] || return 0 + cd "$REPO_ROOT" + if [[ "$with_volumes" == "true" ]]; then + docker compose -p "$project" -f "$DOCKER_COMPOSE_FILE" down -v >>"$LOG_DIR/docker.log" 2>&1 || warn "docker compose down -v returned non-zero for $project" + else + docker compose -p "$project" -f "$DOCKER_COMPOSE_FILE" down >>"$LOG_DIR/docker.log" 2>&1 || warn "docker compose down returned non-zero for $project" + fi +} + +ISOLATED_PROJECT="" +ISOLATED_CLEANED_UP=false + +cleanup_isolated() { + if $ISOLATED_CLEANED_UP; then + return 0 + fi + ISOLATED_CLEANED_UP=true + echo + log "shutting down isolated stack..." + stop_processes + compose_down_project "$ISOLATED_PROJECT" true +} + +exit_isolated() { + local exit_code="${1:-$?}" + trap - INT TERM EXIT + cleanup_isolated + exit "$exit_code" +} + _signal_pg() { # Send signal $1 to process group of pid $2 (and to the pid itself # in case it isn't a group leader). Errors suppressed. @@ -277,6 +361,123 @@ EOF tail -F "${tail_files[@]}" } +cmd_isolated() { + local do_build=true with_agents=true + while [[ $# -gt 0 ]]; do + case "$1" in + --no-build) do_build=false ;; + --no-agents) with_agents=false ;; + *) die "unknown option: $1" ;; + esac + shift + done + + cd "$REPO_ROOT" + [[ -f .env ]] || die ".env not found at repo root. Create one with ANTHROPIC_API_KEY or OPENAI_API_KEY." + grep -qE '^(ANTHROPIC_API_KEY|OPENAI_API_KEY)=' .env || die ".env is missing ANTHROPIC_API_KEY or OPENAI_API_KEY." + docker info >/dev/null 2>&1 || die "Docker daemon not reachable. Start Docker Desktop and retry." + + if $do_build; then + cmd_build + else + for pkg in typescript-client agents-runtime agents-mcp agents-server agents; do + [[ -d "packages/$pkg/dist" ]] || die "packages/$pkg/dist is missing. Run without --no-build or run: $0 build" + done + fi + + local branch_name branch_slug run_id project + branch_name=$(git -C "$REPO_ROOT" branch --show-current 2>/dev/null || true) + if [[ -z "$branch_name" ]]; then + branch_name=$(git -C "$REPO_ROOT" rev-parse --short HEAD 2>/dev/null || basename "$REPO_ROOT") + fi + branch_slug=$(printf '%s' "$branch_name" | tr '[:upper:]' '[:lower:]' | sed -E 's/[^a-z0-9]+/-/g; s/^-+//; s/-+$//') + [[ -n "$branch_slug" ]] || branch_slug="worktree" + local run_suffix + run_suffix=$(date +%Y%m%d-%H%M%S)-$RANDOM + run_id="iso-$branch_slug-$run_suffix" + project="agents-$run_id" + ISOLATED_PROJECT="$project" + LOG_DIR="$REPO_ROOT/.dev-logs/$run_id" + mkdir -p "$LOG_DIR" + : > "$LOG_DIR/docker.log" + + local pg_port electric_port jaeger_ui_port jaeger_http_port jaeger_grpc_port + local server_port agents_port ui_port desktop_ui_port + read -r pg_port electric_port jaeger_ui_port jaeger_http_port jaeger_grpc_port \ + server_port agents_port ui_port desktop_ui_port < <(random_ports 9 | tr '\n' ' ') + + cat > "$LOG_DIR/env" <>"$LOG_DIR/docker.log" 2>&1 || die "docker compose up failed (see $LOG_DIR/docker.log)" + + spawn agents-server env DATABASE_URL="postgresql://electric_agents:electric_agents@localhost:$pg_port/electric_agents" ELECTRIC_AGENTS_ELECTRIC_URL="http://localhost:$electric_port" ELECTRIC_AGENTS_PG_SYNC_ELECTRIC_URL="http://localhost:$electric_port/v1/shape" ELECTRIC_AGENTS_PORT="$server_port" ELECTRIC_AGENTS_STREAMS_DATA_DIR="$REPO_ROOT/.streams-data/$run_id" ELECTRIC_INSECURE=true node packages/agents-server/dist/entrypoint.js + + log "waiting for agents-server on :$server_port..." + local wait_status=0 + wait_for_tcp_or_exit agents-server 127.0.0.1 "$server_port" 60 || wait_status=$? + if [[ "$wait_status" == "2" ]]; then + warn "agents-server exited before binding :$server_port" + tail -80 "$LOG_DIR/agents-server.log" >&2 || true + exit_isolated 1 + elif [[ "$wait_status" != "0" ]]; then + warn "agents-server did not bind :$server_port within 60s (see $LOG_DIR/agents-server.log)" + fi + + if $with_agents; then + spawn agents env ELECTRIC_AGENTS_SERVER_URL="http://localhost:$server_port" ELECTRIC_AGENTS_BUILTIN_PORT="$agents_port" ELECTRIC_AGENTS_PULL_WAKE_RUNNER_ID="isolated-$branch_slug" ELECTRIC_AGENTS_REGISTER_PULL_WAKE_RUNNER=true ELECTRIC_AGENTS_PRINCIPAL=system:dev-local node packages/agents/dist/entrypoint.js + fi + + spawn agents-runtime pnpm -C "$REPO_ROOT/packages/agents-runtime" dev + spawn agents-server-build pnpm -C "$REPO_ROOT/packages/agents-server" dev + spawn agents-build pnpm -C "$REPO_ROOT/packages/agents" dev + spawn agents-server-ui env PORT="$ui_port" ELECTRIC_AGENTS_SERVER_URL="http://localhost:$server_port" pnpm -C "$REPO_ROOT/packages/agents-server-ui" dev -- --host 127.0.0.1 --port "$ui_port" + + cat <