Add pg-sync observation support for agents#4421
Open
KyleAMathews wants to merge 8 commits into
Open
Conversation
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
# Conflicts: # packages/agents-runtime/src/client.ts # packages/agents-runtime/src/index.ts # packages/agents-runtime/src/process-wake.ts # packages/agents-runtime/src/runtime-server-client.ts # packages/agents-runtime/src/setup-context.ts # packages/agents-runtime/test/electric-agents-client.test.ts # packages/agents-runtime/test/process-wake.test.ts # packages/agents-server/src/manifest-side-effects.ts # packages/agents-server/src/routing/context.ts # packages/agents-server/src/server.ts # packages/agents-server/test/manifest-side-effects.test.ts # packages/agents/src/agents/horton.ts
Contributor
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4421 +/- ##
==========================================
+ Coverage 60.59% 60.91% +0.31%
==========================================
Files 306 309 +3
Lines 31677 32062 +385
Branches 8611 8733 +122
==========================================
+ Hits 19195 19530 +335
- Misses 12464 12513 +49
- Partials 18 19 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds pg-sync observation — a new way for agents to observe Postgres table changes via Electric shape streams and wake on matching row operations (insert/update/delete). This replaces the need for agents to poll or use webhooks for database change detection.
Approach
Architecture
The feature spans three packages in a layered design:
agents-runtime— Defines thepgSync()observation source factory. Produces aPgSyncObservationSourcewith a deterministicsourceRef(content-addressed hash of canonical options) and registers it with the server viaregisterPgSyncSource().agents-server— HousesPgSyncBridgeManager, which manages the lifecycle of "bridges" — each bridge subscribes to an ElectricShapeStreamfor a given table/shape and forwards change messages into a Durable Stream. Bridges persist their cursor (shapeHandle+shapeOffset) across restarts so they resume from where they left off. ThePgSyncRouterexposes aPOST /_electric/pg-sync/registerendpoint for runtime registration. Wake evaluation filters changes by operation type and collection.agents— Exposes theobserve_pg_synctool to Horton agents, letting the LLM dynamically subscribe to table changes with optional operation filtering and debounce.Key design decisions
Canonical options & sourceRef: Shape options are canonicalized (sorted params, defaulted replica, copied arrays) before hashing. This ensures two registrations for the same logical shape always produce the same
sourceRef, preventing duplicate bridges. The canonicalization is exported fromagents-runtimeand reused byagents-server— single source of truth.Initial snapshot skipping: When a bridge starts fresh, it sets
skipChangesUntilUpToDate = trueto avoid treating the initial Electric snapshot as live changes. Changes before the firstup-to-datecontrol message are silently dropped. Resumed bridges (with a stored cursor) process all changes immediately.Cursor as atomic unit:
shapeHandleandshapeOffsetare modeled as a singlePgSyncCursorobject ({ handle, offset }), making the "both or neither" invariant unrepresentable as an illegal state.Subscription error recovery: If the ShapeStream subscription errors or the async message callback throws, the bridge attempts recovery — resuming from its last known cursor if available, or restarting from scratch in bootstrap-skip mode.
Key invariants
sourceRefper tenant (enforced viabridgesmap +startingdeduplication map)sourceRefis deterministic: same canonical options → same hash, regardless of field orderpg_sync_bridgestableopsfilters — aninsert-only subscriber won't wake ondeleteBigIntvalues from Electric are converted to strings before JSON serializationNon-goals
Verification
Files changed
agents-runtimeobservation-sources.ts— NewpgSync()factory,PgSyncOptions/CanonicalPgSyncConfigtypes,sourceRefForPgSync(), canonical options exportruntime-server-client.ts—registerPgSyncSource()HTTP client methodprocess-wake.ts— Wire pg-sync source registration into wake processingentity-schema.ts— Addvalue/oldValuetoWakeChangeEntryValueindex.ts— Re-export pg-sync types and helpersagents-serverpg-sync-bridge-manager.ts—PgSyncBridge(stream subscription + durable forwarding),PgSyncBridgeManager(lifecycle, deduplication, recovery)pg-sync-router.ts— HTTP registration endpoint with schema validationentity-registry.ts—pg_sync_bridgesCRUD (upsert, cursor update/clear, touch, list)db/schema.ts—pgSyncBridgestable definition + migrationwake-registry.ts— pg-sync wake evaluation with operation filteringruntime.ts/standalone-runtime.ts— Bridge manager integration into server lifecyclerouting/— Mount pg-sync router on global routeragentstools/observe-pg-sync.ts—observe_pg_syncagent tool with TypeBox schemaagents/horton.ts— Register tool in Horton's tool listOther
scripts/dev.sh— Improved dev stack isolation and port configurabilitydocs/agents-pg-sync-observation-plan.md— Design document