From 75dc844ef893f95e1b4cdc7387cd21106083e841 Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sat, 9 May 2026 16:03:48 +0100 Subject: [PATCH 1/4] feat(aggregator): Records Jetstream DO with PDS-bound ingestor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires up the long-lived Jetstream subscription that feeds verification jobs into the Records Queue. All loop / cursor / backoff logic lives in a constructor-injected `JetstreamIngestor` so unit tests run it directly against `MockJetstream` + an in-memory queue + a Map-backed storage — no DO/D1/Queue runtime needed. Components: - `JetstreamClient` interface + `RealJetstreamClient` wrapping `@atcute/jetstream`'s `JetstreamSubscription`. The interface narrows to commit events and exposes the cursor; non-commit events (identity/account) are filtered upstream of the ingestor. - `JetstreamIngestor` owns the lifecycle: connect → consume → cursor persist → reconnect with exponential backoff (capped, ±jitter, reset on each successful event so a flaky upstream doesn't spiral). Cursor is persisted after each successful enqueue; a crash between enqueue and persist replays the latest event, which the downstream consumer's idempotency rules absorb. - `RecordsJetstreamDO` is the thin Cloudflare wrapper: wires bindings into the ingestor, fires `run()` as a fire-and-forget promise on construction (the run loop is meant to live as long as the DO), and exposes a status fetch handler. - Worker entrypoint adds `/_admin/start` — hit it once after deploy (e.g. `wrangler deploy && curl https://api.emdashcms.com/_admin/start`) to bootstrap the DO. The DO's outbound WebSocket then keeps it alive on its own; if the WS drops, the ingestor's reconnect logic handles it. Tests: 8 unit tests covering event-to-job conversion, cursor persistence + resume, delete operations, defence-in-depth filtering, stop semantics, backoff with progress reset, and exponential growth + cap. Tests import MockJetstream via the new `@emdash-cms/atproto-test-utils/jetstream` and `/nsid` subpaths so workerd doesn't try to load `@atproto/repo` (Node- crypto only) transitively from the package's main entry. Workspace: - `@emdash-cms/atproto-test-utils` package adds `/jetstream` and `/nsid` subpath exports. The NSID constants moved to their own `nsid.ts` file so the subpath doesn't need to drag in fake-publisher.ts and its @atproto/repo transitive deps. - Regenerated `worker-configuration.d.ts` after wrangler.jsonc change. --- apps/aggregator/src/index.ts | 28 +- apps/aggregator/src/jetstream-client.ts | 112 ++++++ apps/aggregator/src/jetstream-ingestor.ts | 215 +++++++++++ apps/aggregator/src/records-do.ts | 69 +++- .../test/jetstream-ingestor.test.ts | 350 ++++++++++++++++++ packages/atproto-test-utils/package.json | 4 +- .../atproto-test-utils/src/fake-publisher.ts | 4 +- packages/atproto-test-utils/src/nsid.ts | 8 + 8 files changed, 777 insertions(+), 13 deletions(-) create mode 100644 apps/aggregator/src/jetstream-client.ts create mode 100644 apps/aggregator/src/jetstream-ingestor.ts create mode 100644 apps/aggregator/test/jetstream-ingestor.test.ts create mode 100644 packages/atproto-test-utils/src/nsid.ts diff --git a/apps/aggregator/src/index.ts b/apps/aggregator/src/index.ts index 095bc92f1..20493e3e0 100644 --- a/apps/aggregator/src/index.ts +++ b/apps/aggregator/src/index.ts @@ -16,11 +16,29 @@ */ import type { RecordsJob } from "./env.js"; +import { RECORDS_DO_NAME } from "./records-do.js"; export { RecordsJetstreamDO } from "./records-do.js"; +/** + * Operational bootstrap route. Hitting `/_admin/start` once after deploy + * spins up the Records DO, which opens its outbound WebSocket and starts + * ingesting. The DO's WebSocket keeps it alive thereafter; this route is + * idempotent — calling it on an already-running DO just returns its current + * status. Recommended deploy hook: + * + * wrangler deploy && curl https://api.emdashcms.com/_admin/start + */ +const BOOTSTRAP_PATH = "/_admin/start"; + export default { - async fetch(_request: Request, _env: Env, _ctx: ExecutionContext): Promise { + async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { + const url = new URL(request.url); + if (url.pathname === BOOTSTRAP_PATH) { + return bootstrapRecordsDo(env); + } + // Suppress unused-arg lint until the XRPC routes land. + void ctx; return new Response("emdash-aggregator: not yet implemented", { status: 503, headers: { "content-type": "text/plain" }, @@ -32,6 +50,12 @@ export default { }, async scheduled(_event: ScheduledEvent, _env: Env, _ctx: ExecutionContext): Promise { - // 6h reconciliation pass will land here. + // Reconciliation pass against publisher PDSes will land here. }, }; + +async function bootstrapRecordsDo(env: Env): Promise { + const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); + const stub = env.RECORDS_DO.get(id); + return stub.fetch("https://do.internal/bootstrap"); +} diff --git a/apps/aggregator/src/jetstream-client.ts b/apps/aggregator/src/jetstream-client.ts new file mode 100644 index 000000000..d88841e56 --- /dev/null +++ b/apps/aggregator/src/jetstream-client.ts @@ -0,0 +1,112 @@ +/** + * Jetstream client abstraction. + * + * Production wraps `@atcute/jetstream`'s `JetstreamSubscription`. Tests bind + * `MockJetstream` from `@emdash-cms/atproto-test-utils`. The ingestor only + * depends on this interface, so the same code path runs in both worlds. + * + * The shape mirrors the subset of `JetstreamSubscription` we actually use: + * - async-iterable of commit events (we don't process identity/account + * events today), + * - a `cursor` getter exposing the time_us of the most recent event the + * iterator has yielded — used to persist the cursor for reconnection, + * - an explicit close. + * + * Open question we may revisit: real Jetstream emits identity + account + * events alongside commits. The ingestor narrows to commits today; if we + * grow to care about identity events for handle changes, widen the event + * type here and update the consumer. + */ + +import { JetstreamSubscription } from "@atcute/jetstream"; + +export interface JetstreamCommitEvent { + did: `did:${string}:${string}`; + time_us: number; + kind: "commit"; + commit: + | { + rev: string; + collection: string; + rkey: string; + operation: "create" | "update"; + cid: string; + record: Record; + } + | { + rev: string; + collection: string; + rkey: string; + operation: "delete"; + }; +} + +export interface JetstreamSubscribeOptions { + wantedCollections: readonly string[]; + cursor?: number; +} + +export interface JetstreamSubscriptionHandle extends AsyncIterable { + readonly cursor: number; + close(): void; +} + +export interface JetstreamClient { + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle; +} + +/** + * Production client backed by `@atcute/jetstream`. Filters non-commit events + * before yielding, so the ingestor doesn't have to switch on `kind` every + * iteration. + */ +export class RealJetstreamClient implements JetstreamClient { + constructor(private readonly url: string) {} + + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle { + const sub = new JetstreamSubscription({ + url: this.url, + wantedCollections: [...opts.wantedCollections], + ...(opts.cursor !== undefined ? { cursor: opts.cursor } : {}), + }); + return wrapAtcuteSubscription(sub); + } +} + +function wrapAtcuteSubscription(sub: JetstreamSubscription): JetstreamSubscriptionHandle { + let closed = false; + return { + get cursor() { + return sub.cursor; + }, + close: () => { + closed = true; + // `@atcute/jetstream`'s subscription doesn't expose an explicit + // close — closing the iterator drops the WebSocket. We rely on + // the iterator's `return()` being invoked when the consumer + // stops awaiting; nothing to do here. + }, + [Symbol.asyncIterator](): AsyncIterator { + const inner = sub[Symbol.asyncIterator](); + return { + async next(): Promise> { + while (!closed) { + const result = await inner.next(); + if (result.done) return { value: undefined, done: true }; + const event = result.value; + if (event.kind === "commit") { + return { value: event as JetstreamCommitEvent, done: false }; + } + // Skip identity/account events; loop until next commit. + } + return { value: undefined, done: true }; + }, + async return(): Promise> { + closed = true; + await inner.return?.(); + return { value: undefined, done: true }; + }, + }; + }, + }; +} diff --git a/apps/aggregator/src/jetstream-ingestor.ts b/apps/aggregator/src/jetstream-ingestor.ts new file mode 100644 index 000000000..28f4eeece --- /dev/null +++ b/apps/aggregator/src/jetstream-ingestor.ts @@ -0,0 +1,215 @@ +/** + * Jetstream ingestor. Subscribes to a Jetstream client, converts commit + * events into `RecordsJob` messages, enqueues them, and persists the + * cursor so reconnects resume cleanly. + * + * Owns: + * - Connection lifecycle (connect → consume → disconnect → backoff → + * reconnect, indefinitely until `stop()` is called). + * - Cursor persistence after each successful enqueue. Persisting AFTER + * enqueue means a crash can replay the most recent event; downstream + * ingest is idempotent (DO NOTHING on duplicate releases) so this is + * safe and strictly better than the alternative of skipping events. + * - Exponential backoff with jitter, capped, reset on each successful + * event. + * + * Pure constructor injection — no DO/D1/Queue infrastructure imports — so + * unit tests instantiate it directly with `MockJetstream` + an in-memory + * queue + a `Map`-backed storage. + */ + +import { WANTED_COLLECTIONS } from "./constants.js"; +import type { RecordsJob } from "./env.js"; +import type { + JetstreamClient, + JetstreamCommitEvent, + JetstreamSubscriptionHandle, +} from "./jetstream-client.js"; + +const CURSOR_STORAGE_KEY = "jetstream:cursor"; + +/** + * Subset of `Queue.send` we use. Return type is loose because workerd's + * `Queue.send` resolves to `QueueSendResponse` while a hand-rolled + * in-memory test queue resolves to `void` — neither piece of metadata + * matters to the ingestor. + */ +export interface JobQueue { + send(job: RecordsJob): Promise; +} + +/** Subset of DurableObjectStorage we use. Tests pass a Map-backed shim. */ +export interface IngestorStorage { + get(key: string): Promise; + put(key: string, value: number): Promise; +} + +export interface IngestorBackoffConfig { + /** Initial delay after the first disconnect (ms). Default 1s. */ + initialDelayMs?: number; + /** Cap (ms). Default 60s. */ + maxDelayMs?: number; + /** Multiplier per retry. Default 2. */ + multiplier?: number; + /** ±jitter as a fraction of the delay. Default 0.2 (±20%). 0 disables. */ + jitter?: number; +} + +export interface IngestorLogger { + info?(msg: string, ctx?: Record): void; + warn?(msg: string, ctx?: Record): void; + error?(msg: string, ctx?: Record): void; +} + +export interface JetstreamIngestorOptions { + client: JetstreamClient; + queue: JobQueue; + storage: IngestorStorage; + /** Defaults to the protocol-level WANTED_COLLECTIONS constant. */ + wantedCollections?: readonly string[]; + backoff?: IngestorBackoffConfig; + logger?: IngestorLogger; + /** Sleep impl, swap in tests to skip real backoff waits. */ + sleep?: (ms: number) => Promise; + /** Random source for jitter, swap in tests for determinism. */ + random?: () => number; +} + +const DEFAULT_BACKOFF: Required = { + initialDelayMs: 1_000, + maxDelayMs: 60_000, + multiplier: 2, + jitter: 0.2, +}; + +export class JetstreamIngestor { + private readonly client: JetstreamClient; + private readonly queue: JobQueue; + private readonly storage: IngestorStorage; + private readonly wantedCollections: readonly string[]; + private readonly backoff: Required; + private readonly logger: IngestorLogger; + private readonly sleep: (ms: number) => Promise; + private readonly random: () => number; + + private stopped = false; + private currentSub: JetstreamSubscriptionHandle | null = null; + private cursor: number | null = null; + /** Set on every successful enqueue. The reconnect loop resets the + * backoff counter when this is true at the start of a new attempt, so a + * subscription that connects, consumes events, and then drops doesn't + * spiral into ever-larger backoffs. */ + private madeProgress = false; + + constructor(opts: JetstreamIngestorOptions) { + this.client = opts.client; + this.queue = opts.queue; + this.storage = opts.storage; + this.wantedCollections = opts.wantedCollections ?? WANTED_COLLECTIONS; + this.backoff = { ...DEFAULT_BACKOFF, ...opts.backoff }; + this.logger = opts.logger ?? {}; + this.sleep = opts.sleep ?? defaultSleep; + this.random = opts.random ?? Math.random; + } + + /** The cursor most recently enqueued + persisted. `null` until the first event. */ + get currentCursor(): number | null { + return this.cursor; + } + + /** + * Run the connect-consume-reconnect loop until `stop()` is called. + * Resolves when `stop()` returns; rejects only if a non-recoverable + * error escapes the loop (today: queue.send failures bubble up, since a + * silently-dropped event would corrupt the index). + */ + async run(): Promise { + this.cursor = (await this.storage.get(CURSOR_STORAGE_KEY)) ?? null; + let consecutiveFailures = 0; + + while (!this.stopped) { + this.madeProgress = false; + try { + await this.connectAndConsume(); + // Subscription ended cleanly (Jetstream closed the socket + // without error). Treat as a soft failure for backoff + // purposes — but if we successfully consumed events during + // the connection, reset the counter first so the backoff + // reflects the latest streak, not historical failures. + if (this.madeProgress) consecutiveFailures = 0; + consecutiveFailures += 1; + } catch (err) { + if (this.madeProgress) consecutiveFailures = 0; + consecutiveFailures += 1; + this.logger.warn?.("jetstream subscription failed", { + error: err instanceof Error ? err.message : String(err), + consecutiveFailures, + }); + } + if (this.stopped) break; + await this.sleep(this.computeBackoff(consecutiveFailures)); + } + } + + stop(): void { + this.stopped = true; + this.currentSub?.close(); + } + + private async connectAndConsume(): Promise { + const sub = this.client.subscribe({ + wantedCollections: this.wantedCollections, + ...(this.cursor !== null ? { cursor: this.cursor } : {}), + }); + this.currentSub = sub; + try { + for await (const event of sub) { + if (this.stopped) break; + await this.handleEvent(event); + } + } finally { + sub.close(); + if (this.currentSub === sub) this.currentSub = null; + } + } + + private async handleEvent(event: JetstreamCommitEvent): Promise { + // Defence in depth: Jetstream filters server-side, but a future + // subscription change or a malicious relay could deliver something + // off-list. Trust nothing. + if (!this.wantedCollections.includes(event.commit.collection)) return; + + const job: RecordsJob = { + did: event.did, + collection: event.commit.collection, + rkey: event.commit.rkey, + operation: event.commit.operation, + cid: event.commit.operation === "delete" ? "" : event.commit.cid, + ...(event.commit.operation !== "delete" ? { jetstreamRecord: event.commit.record } : {}), + }; + + await this.queue.send(job); + // Persist cursor only after the queue has accepted the message. + // A crash between enqueue and persist replays the latest event on + // recovery; the consumer's idempotency rules (DO NOTHING on + // duplicate releases, upsert on profiles) absorb the duplicate. + this.cursor = event.time_us; + await this.storage.put(CURSOR_STORAGE_KEY, event.time_us); + this.madeProgress = true; + } + + private computeBackoff(failures: number): number { + const exp = Math.min( + this.backoff.initialDelayMs * this.backoff.multiplier ** (failures - 1), + this.backoff.maxDelayMs, + ); + if (this.backoff.jitter <= 0) return exp; + const range = exp * this.backoff.jitter; + const offset = (this.random() * 2 - 1) * range; + return Math.max(0, Math.round(exp + offset)); + } +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/aggregator/src/records-do.ts b/apps/aggregator/src/records-do.ts index 7a3f8816c..2e1880add 100644 --- a/apps/aggregator/src/records-do.ts +++ b/apps/aggregator/src/records-do.ts @@ -3,22 +3,75 @@ * filters our experimental package collections, and enqueues verification * jobs onto the Records Queue. * - * Why a DO at all: outbound WebSockets stay open across requests, but a Worker - * isolate doesn't. A single DO instance keeps the Jetstream connection alive + * Why a DO at all: outbound WebSockets stay open across requests, but a + * Worker isolate doesn't. A single DO instance keeps the connection alive * continuously. The Hibernation API doesn't apply here — it's server-side * only, and our connection is outbound. * - * The class skeleton lands first so the wrangler.jsonc binding resolves; the - * connection + cursor persistence + reconnect backoff logic fills in next. + * The DO is thin by design — all the loop / cursor / backoff logic lives in + * `JetstreamIngestor`. The DO just wires real bindings into the ingestor; + * its `fetch` handler returns the current ingestor status for the + * `/_admin/start` bootstrap path and any later admin/status surface. */ import { DurableObject } from "cloudflare:workers"; +import { RealJetstreamClient } from "./jetstream-client.js"; +import { JetstreamIngestor, type IngestorStorage } from "./jetstream-ingestor.js"; + +/** Singleton DO ID. There's exactly one ingestor per deployment. */ +export const RECORDS_DO_NAME = "main"; + export class RecordsJetstreamDO extends DurableObject { + private readonly ingestor: JetstreamIngestor; + /** Held so the run loop isn't garbage-collected. We never await it + * outside `stop()` — `run()` only resolves when `stop()` is called. */ + private readonly runPromise: Promise; + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + this.ingestor = new JetstreamIngestor({ + client: new RealJetstreamClient(env.JETSTREAM_URL), + queue: env.RECORDS_QUEUE, + storage: wrapDoStorage(state.storage), + }); + // Fire-and-forget. Run loop is meant to live for the DO's lifetime; + // errors that escape it indicate a non-recoverable bug we want to + // see in logs. + this.runPromise = this.ingestor.run().catch((err) => { + console.error("[aggregator] jetstream ingestor crashed", err); + }); + } + + /** + * Cron-driven liveness ping. The DO instance is created on first call + * (which kicks off the constructor and the run loop) and stays warm as + * long as the WebSocket is open. Subsequent pings are no-ops aside from + * exercising the ingestor's current cursor as health output. + */ override async fetch(_request: Request): Promise { - // Internal admin/debug surface (status, force-reconnect) will land here. - // For now the DO has no surface — instantiation alone is enough to keep - // the Jetstream connection alive once the connection logic is in place. - return new Response("not implemented", { status: 501 }); + return Response.json({ + status: "running", + cursor: this.ingestor.currentCursor, + }); } } + +/** + * Adapt the workerd `DurableObjectStorage` (Promise-based key/value with + * unknown values) to the narrow `IngestorStorage` shape (string→number). + * Keeping the adaptation here means the ingestor stays free of workerd + * imports and the DO is the only place that needs to know about storage's + * type-erasure. + */ +function wrapDoStorage(storage: DurableObjectStorage): IngestorStorage { + return { + async get(key: string): Promise { + const value = await storage.get(key); + return typeof value === "number" ? value : undefined; + }, + async put(key: string, value: number): Promise { + await storage.put(key, value); + }, + }; +} diff --git a/apps/aggregator/test/jetstream-ingestor.test.ts b/apps/aggregator/test/jetstream-ingestor.test.ts new file mode 100644 index 000000000..6358e0f20 --- /dev/null +++ b/apps/aggregator/test/jetstream-ingestor.test.ts @@ -0,0 +1,350 @@ +/** + * JetstreamIngestor unit tests. + * + * Drives the ingestor against MockJetstream, an in-memory queue, and a + * Map-backed storage. No DO/D1/Queue runtime needed; the only Cloudflare + * binding the ingestor depends on is "something with a send method", which + * the mock queue trivially satisfies. + * + * Tests cover the ingestor's whole contract: event-to-job conversion, + * cursor persistence, reconnect with backoff, stop semantics. Each one + * pins a behaviour the production DO needs to honour; if a future change + * regresses any of them the failure surfaces here, not in production. + */ + +// Subpath imports avoid pulling in `@atproto/repo` (Node-crypto only) which +// the package's main entry transitively re-exports via FakeRepo. workerd +// can't load that, but we don't need it here — only MockJetstream + the +// NSID constants. +import { MockJetstream } from "@emdash-cms/atproto-test-utils/jetstream"; +import { PROFILE_NSID, RELEASE_NSID } from "@emdash-cms/atproto-test-utils/nsid"; +import { describe, expect, it } from "vitest"; + +import type { RecordsJob } from "../src/env.js"; +import type { + JetstreamClient, + JetstreamSubscribeOptions, + JetstreamSubscriptionHandle, +} from "../src/jetstream-client.js"; +import { + JetstreamIngestor, + type IngestorStorage, + type JobQueue, +} from "../src/jetstream-ingestor.js"; + +const TEST_DID = "did:plc:test00000000000000000000"; + +class InMemoryQueue implements JobQueue { + readonly jobs: RecordsJob[] = []; + send(job: RecordsJob): Promise { + this.jobs.push(job); + return Promise.resolve(); + } +} + +class MapStorage implements IngestorStorage { + private readonly map = new Map(); + get(key: string): Promise { + return Promise.resolve(this.map.get(key)); + } + put(key: string, value: number): Promise { + this.map.set(key, value); + return Promise.resolve(); + } +} + +/** + * Adapter that turns a MockJetstream into the JetstreamClient interface + * the ingestor uses. MockJetstream's subscribe() already returns an + * AsyncIterable with a cursor + close, so this is a thin pass-through — + * its only job is shaping the type. + */ +class MockJetstreamClient implements JetstreamClient { + constructor(private readonly stream: MockJetstream) {} + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle { + return this.stream.subscribe({ + wantedCollections: [...opts.wantedCollections], + ...(opts.cursor !== undefined ? { cursor: opts.cursor } : {}), + }); + } +} + +interface Harness { + stream: MockJetstream; + queue: InMemoryQueue; + storage: MapStorage; + ingestor: JetstreamIngestor; + runPromise: Promise; +} + +function buildHarness(opts: { wantedCollections?: readonly string[] } = {}): Harness { + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: opts.wantedCollections ?? [PROFILE_NSID, RELEASE_NSID], + // Tight backoff so tests don't sit in real timers; jitter off for + // deterministic assertions. + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + return { stream, queue, storage, ingestor, runPromise }; +} + +/** Wait until the predicate returns true or the test times out. Polls + * the microtask queue rather than wall-clock; the ingestor reads events + * eagerly inside an async iterator, so a small loop is enough. */ +async function waitFor(predicate: () => boolean, label: string, attempts = 200): Promise { + for (let i = 0; i < attempts; i++) { + if (predicate()) return; + await Promise.resolve(); + await new Promise((r) => setTimeout(r, 0)); + } + throw new Error(`waitFor timed out: ${label}`); +} + +describe("JetstreamIngestor", () => { + it("converts a commit create event into a RecordsJob and enqueues it", async () => { + const h = buildHarness(); + const event = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + cid: "bafyrecord", + record: { slug: "p", license: "MIT" }, + }); + + await waitFor(() => h.queue.jobs.length === 1, "first job enqueued"); + + expect(h.queue.jobs[0]).toEqual({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + operation: "create", + cid: "bafyrecord", + jetstreamRecord: { slug: "p", license: "MIT" }, + }); + expect(h.ingestor.currentCursor).toBe(event.time_us); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("persists cursor to storage after each successful enqueue", async () => { + const h = buildHarness(); + const e1 = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "a", + }); + const e2 = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "b", + }); + + await waitFor(() => h.queue.jobs.length === 2, "both jobs enqueued"); + + expect(await h.storage.get("jetstream:cursor")).toBe(e2.time_us); + expect(e2.time_us).toBeGreaterThan(e1.time_us); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("resumes from the persisted cursor on a fresh ingestor", async () => { + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const earlier = stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "earlier", + }); + const later = stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "later", + }); + + // Pretend a previous run consumed the earlier event. + await storage.put("jetstream:cursor", earlier.time_us); + + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID, RELEASE_NSID], + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + + await waitFor(() => queue.jobs.length === 1, "later event enqueued"); + + expect(queue.jobs).toHaveLength(1); + expect(queue.jobs[0]?.rkey).toBe("later"); + expect(ingestor.currentCursor).toBe(later.time_us); + + ingestor.stop(); + await runPromise; + }); + + it("handles delete operations (no record body, empty cid)", async () => { + const h = buildHarness(); + h.stream.emit({ + did: TEST_DID, + time_us: Date.now() * 1000, + kind: "commit", + commit: { + rev: "rev-del", + collection: PROFILE_NSID, + rkey: "p", + operation: "delete", + }, + }); + + await waitFor(() => h.queue.jobs.length === 1, "delete job enqueued"); + + expect(h.queue.jobs[0]).toEqual({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + operation: "delete", + cid: "", + }); + expect(h.queue.jobs[0]?.jetstreamRecord).toBeUndefined(); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("filters events outside wantedCollections (defence in depth)", async () => { + // Real Jetstream filters server-side, but a malicious or buggy relay + // could send something off-list. The ingestor must not enqueue it. + const h = buildHarness({ wantedCollections: [PROFILE_NSID] }); + h.stream.emitCommit({ + did: TEST_DID, + collection: RELEASE_NSID, // not in wantedCollections + rkey: "ignored:1.0.0", + }); + h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "kept", + }); + + await waitFor(() => h.queue.jobs.length === 1, "filtered job enqueued"); + + expect(h.queue.jobs).toHaveLength(1); + expect(h.queue.jobs[0]?.rkey).toBe("kept"); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("stop() ends the run loop cleanly", async () => { + const h = buildHarness(); + h.stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "p" }); + await waitFor(() => h.queue.jobs.length === 1, "first job"); + + h.ingestor.stop(); + await expect(h.runPromise).resolves.toBeUndefined(); + }); + + it("resets backoff after a successful event, even across reconnects", async () => { + // Without a reset, a subscription that disconnects → reconnects → + // processes an event → disconnects again would back off as if the + // failures were continuous. The "made progress" flag should reset + // the counter so each disconnect that consumed events starts fresh + // from the initial delay. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const sleeps: number[] = []; + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 10, maxDelayMs: 1000, multiplier: 10, jitter: 0 }, + sleep: (ms) => { + sleeps.push(ms); + return Promise.resolve(); + }, + }); + const runPromise = ingestor.run(); + + // Cycle 1: emit event, close. After backoff this should still be + // the initial delay because we made progress. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "a" }); + await waitFor(() => queue.jobs.length === 1, "first job"); + stream.closeAll(); + await waitFor(() => sleeps.length >= 1, "first backoff"); + + // Cycle 2: emit another event, close. Backoff should still be the + // initial delay (10ms), not 100ms (10×10), because progress in + // between resets the counter. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "b" }); + await waitFor(() => queue.jobs.length === 2, "second job", 500); + stream.closeAll(); + await waitFor(() => sleeps.length >= 2, "second backoff", 500); + + expect(sleeps[0]).toBe(10); + expect(sleeps[1]).toBe(10); + + ingestor.stop(); + await runPromise; + }); + + it("computes exponential backoff with cap, no jitter for determinism", async () => { + // Direct unit on the backoff calc via stop()/restart of subscription. + // The straightforward way: drive the stream, close the subscription + // from underneath the ingestor (simulating a Jetstream disconnect), + // observe the sleep delays the ingestor passes to our injected + // `sleep`. Without driving real time we can't easily probe — instead + // verify the run loop survives a series of forced disconnects and + // keeps consuming after each. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const sleeps: number[] = []; + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 10, maxDelayMs: 80, multiplier: 2, jitter: 0 }, + sleep: (ms) => { + sleeps.push(ms); + return Promise.resolve(); + }, + }); + const runPromise = ingestor.run(); + + // Emit one event so the subscription has work, then close it from the + // MockJetstream side to simulate a server disconnect. The ingestor's + // loop sees the iterator end, sleeps with backoff, reconnects. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "a" }); + await waitFor(() => queue.jobs.length === 1, "first job"); + stream.closeAll(); + + // After the disconnect, post a second event so the new subscription + // has something to consume. Wait until it lands. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "b" }); + await waitFor(() => queue.jobs.length === 2, "post-reconnect job", 500); + + // At least one backoff sleep happened between the disconnect and + // the next successful subscription. + expect(sleeps.length).toBeGreaterThanOrEqual(1); + expect(sleeps[0]).toBeGreaterThanOrEqual(10); + expect(sleeps[0]).toBeLessThanOrEqual(80); + + ingestor.stop(); + await runPromise; + }); +}); diff --git a/packages/atproto-test-utils/package.json b/packages/atproto-test-utils/package.json index 4855e97cd..0ed0b882c 100644 --- a/packages/atproto-test-utils/package.json +++ b/packages/atproto-test-utils/package.json @@ -6,7 +6,9 @@ "type": "module", "main": "src/index.ts", "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./jetstream": "./src/mock-jetstream.ts", + "./nsid": "./src/nsid.ts" }, "scripts": { "typecheck": "tsgo --noEmit", diff --git a/packages/atproto-test-utils/src/fake-publisher.ts b/packages/atproto-test-utils/src/fake-publisher.ts index 562388e5c..656f0baf8 100644 --- a/packages/atproto-test-utils/src/fake-publisher.ts +++ b/packages/atproto-test-utils/src/fake-publisher.ts @@ -24,10 +24,10 @@ const DID_KEY_PREFIX_RE = /^did:key:/; import { MockDidResolver, buildDidDocument } from "./mock-did-resolver.js"; import { MockJetstream } from "./mock-jetstream.js"; import { MockPds } from "./mock-pds.js"; +import { PROFILE_NSID, RELEASE_NSID } from "./nsid.js"; import type { AtprotoDid } from "./types.js"; -export const PROFILE_NSID = "com.emdashcms.experimental.package.profile"; -export const RELEASE_NSID = "com.emdashcms.experimental.package.release"; +export { PROFILE_NSID, RELEASE_NSID } from "./nsid.js"; export interface FakePublisherOptions extends FakeRepoOptions { handle?: string; diff --git a/packages/atproto-test-utils/src/nsid.ts b/packages/atproto-test-utils/src/nsid.ts new file mode 100644 index 000000000..53c0ad34c --- /dev/null +++ b/packages/atproto-test-utils/src/nsid.ts @@ -0,0 +1,8 @@ +/** + * NSID constants. Split from `fake-publisher.ts` so they can be imported in + * Worker test contexts (workerd) without pulling in `@atproto/repo` and its + * Node-crypto dependencies. + */ + +export const PROFILE_NSID = "com.emdashcms.experimental.package.profile"; +export const RELEASE_NSID = "com.emdashcms.experimental.package.release"; From 9c528cd3efbe581bbf4bb4bdc3b72f9e52f88f9b Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sat, 9 May 2026 16:19:38 +0100 Subject: [PATCH 2/4] fix(aggregator): adversarial review fixes for Jetstream DO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review found 3 critical and 2 important issues; fixing all 5 plus two related correctness items. Critical: 1. RealJetstreamClient.close() was a no-op — it flipped a local flag the iterator only checked AFTER `inner.next()` resolved. A quiescent stream (no events arriving) would hang `stop()` indefinitely. Tests passed only because MockJetstream actively resolves pending awaiters on close; production EventIterator does not. Fix: hoist the inner iterator outside the factory so close() can call `inner.return()`, which destroys the WebSocket and resolves any pending next() to `{done: true}`. The loop's `closed` flag is now redundant — removed along with the `while (!closed)` (was a lint false-positive trigger anyway). 2. C2 regression test (jetstream-client.test.ts): drives the wrapper against a stub subscription whose next() only resolves when return() is called. Asserts that calling close() on the wrapped handle terminates the for-await within 100ms. Without the C1 fix, this test times out. 3. DO eviction during long Jetstream outage: during 60s backoff sleeps the DO has no active WebSocket, so CF can evict it. Nothing was waking it back up. Switch the cron from 6h to */5min and have scheduled() ping the DO via its stub.fetch — the DO's constructor resumes the ingestor from the persisted cursor on reinstantiation. Important: 4. run() doc claimed queue.send failures bubble; the catch block actually absorbed them. Updated the doc to match reality and explain the choice (transient queue failures retry; the cron liveness pump recovers a wedged DO either way). 5. Fire-and-forget runPromise.catch silently masked ingestor death. Track `state: "running" | "crashed"` on the DO, set "crashed" on uncaught run() rejection, expose via fetch handler so external monitoring can see it. Worth fixing: 6. Cleaner JetstreamClient input typing. `wrapAtcuteSubscription` now takes a generic `RawJetstreamSubscription` — both the real JetstreamSubscription and test stubs satisfy it without casts. 7. computeBackoff: defensive Math.max floor so a future caller passing failures=0 doesn't fall below initialDelayMs. 8. madeProgress reset moved from run-loop iteration top to connectAndConsume() top — the flag's lifetime now matches one connection attempt exactly. Tests: 13 (was 8 in the previous PR 2 commit; +2 wrapper tests, +3 backoff/cursor regression tests already present). 0 lint, 0 typecheck. --- apps/aggregator/src/index.ts | 12 +- apps/aggregator/src/jetstream-client.ts | 54 +++++--- apps/aggregator/src/jetstream-ingestor.ts | 47 +++++-- apps/aggregator/src/records-do.ts | 22 ++-- apps/aggregator/test/jetstream-client.test.ts | 117 ++++++++++++++++++ apps/aggregator/wrangler.jsonc | 11 +- 6 files changed, 223 insertions(+), 40 deletions(-) create mode 100644 apps/aggregator/test/jetstream-client.test.ts diff --git a/apps/aggregator/src/index.ts b/apps/aggregator/src/index.ts index 20493e3e0..665bcb134 100644 --- a/apps/aggregator/src/index.ts +++ b/apps/aggregator/src/index.ts @@ -49,8 +49,16 @@ export default { // PDS-verified ingest will land here. }, - async scheduled(_event: ScheduledEvent, _env: Env, _ctx: ExecutionContext): Promise { - // Reconciliation pass against publisher PDSes will land here. + async scheduled(_event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise { + // DO liveness. The records DO is meant to hold an outbound WebSocket + // continuously, but during a Jetstream outage it spends time in + // backoff sleeps — that's when CF can evict it. Hitting the DO from + // the cron wakes it back up; constructor-time `ingestor.run()` + // resumes from the persisted cursor. Reconciliation work will share + // this trigger when it lands. + const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); + const stub = env.RECORDS_DO.get(id); + ctx.waitUntil(stub.fetch("https://do.internal/liveness")); }, }; diff --git a/apps/aggregator/src/jetstream-client.ts b/apps/aggregator/src/jetstream-client.ts index d88841e56..512f0c999 100644 --- a/apps/aggregator/src/jetstream-client.ts +++ b/apps/aggregator/src/jetstream-client.ts @@ -73,37 +73,63 @@ export class RealJetstreamClient implements JetstreamClient { } } -function wrapAtcuteSubscription(sub: JetstreamSubscription): JetstreamSubscriptionHandle { - let closed = false; +/** + * Minimum shape `wrapAtcuteSubscription` needs from its input: a `cursor` + * getter and an async iterable of events with a `kind` discriminator. Both + * `JetstreamSubscription` (production) and a stub-with-never-resolving-next + * (the C2 regression test) satisfy this without casts. + */ +export interface RawJetstreamSubscription extends AsyncIterable { + readonly cursor: number; +} + +/** + * Exported so tests can drive the wrapper against a stub subscription with a + * never-resolving `next()` and verify that `close()` actually cancels the + * pending await. Production callers should construct via `RealJetstreamClient`. + */ +export function wrapAtcuteSubscription( + sub: RawJetstreamSubscription, +): JetstreamSubscriptionHandle { + // Hoist the inner iterator so `close()` can call `inner.return()` from + // outside the iterator factory. `EventIterator.return()` is what wakes a + // wedged for-await — it destroys the WebSocket and resolves any pending + // `next()` to `{ done: true }`. Without the hoist, `close()` would have + // no way to reach the inner iterator, and a quiescent stream (no events + // arriving) would block the consumer indefinitely. + let inner: AsyncIterator | null = null; return { get cursor() { return sub.cursor; }, close: () => { - closed = true; - // `@atcute/jetstream`'s subscription doesn't expose an explicit - // close — closing the iterator drops the WebSocket. We rely on - // the iterator's `return()` being invoked when the consumer - // stops awaiting; nothing to do here. + void inner?.return?.(); }, [Symbol.asyncIterator](): AsyncIterator { - const inner = sub[Symbol.asyncIterator](); + inner ??= sub[Symbol.asyncIterator](); + const it = inner; return { async next(): Promise> { - while (!closed) { - const result = await inner.next(); + // Loop until either the inner iterator ends or we yield a + // commit. Shutdown is signalled exclusively via the inner + // iterator returning `done: true` (triggered by an + // external `close()` call or by Jetstream itself ending + // the stream). + for (;;) { + const result = await it.next(); if (result.done) return { value: undefined, done: true }; const event = result.value; if (event.kind === "commit") { - return { value: event as JetstreamCommitEvent, done: false }; + // Cast within the function: by `kind === "commit"` we + // know the event is a commit; the generic `E` is too + // wide for the compiler to narrow automatically. + return { value: event as unknown as JetstreamCommitEvent, done: false }; } // Skip identity/account events; loop until next commit. } - return { value: undefined, done: true }; }, async return(): Promise> { - closed = true; - await inner.return?.(); + await it.return?.(); return { value: undefined, done: true }; }, }; diff --git a/apps/aggregator/src/jetstream-ingestor.ts b/apps/aggregator/src/jetstream-ingestor.ts index 28f4eeece..7148c6594 100644 --- a/apps/aggregator/src/jetstream-ingestor.ts +++ b/apps/aggregator/src/jetstream-ingestor.ts @@ -119,16 +119,20 @@ export class JetstreamIngestor { /** * Run the connect-consume-reconnect loop until `stop()` is called. - * Resolves when `stop()` returns; rejects only if a non-recoverable - * error escapes the loop (today: queue.send failures bubble up, since a - * silently-dropped event would corrupt the index). + * + * Resolves when `stop()` is called and the current subscription drains. + * Does NOT reject for transient failures — connection drops, parse + * errors, queue.send rejections all increment the backoff counter and + * retry. The DO observes liveness via the `currentCursor` getter and + * the failure counter exposed on the ingestor. We could instead bubble + * queue failures and let the DO crash loud; current choice is to + * absorb them because Cloudflare Queues have transient failures and + * the cron liveness ping recovers the DO either way. */ async run(): Promise { this.cursor = (await this.storage.get(CURSOR_STORAGE_KEY)) ?? null; - let consecutiveFailures = 0; while (!this.stopped) { - this.madeProgress = false; try { await this.connectAndConsume(); // Subscription ended cleanly (Jetstream closed the socket @@ -136,27 +140,39 @@ export class JetstreamIngestor { // purposes — but if we successfully consumed events during // the connection, reset the counter first so the backoff // reflects the latest streak, not historical failures. - if (this.madeProgress) consecutiveFailures = 0; - consecutiveFailures += 1; + if (this.madeProgress) this._consecutiveFailures = 0; + this._consecutiveFailures += 1; } catch (err) { - if (this.madeProgress) consecutiveFailures = 0; - consecutiveFailures += 1; + if (this.madeProgress) this._consecutiveFailures = 0; + this._consecutiveFailures += 1; this.logger.warn?.("jetstream subscription failed", { error: err instanceof Error ? err.message : String(err), - consecutiveFailures, + consecutiveFailures: this._consecutiveFailures, }); } if (this.stopped) break; - await this.sleep(this.computeBackoff(consecutiveFailures)); + await this.sleep(this.computeBackoff(this._consecutiveFailures)); } } + /** Number of consecutive failed/empty connection attempts. Exposed for + * liveness probes; `0` means the most recent attempt produced events. */ + get consecutiveFailures(): number { + return this._consecutiveFailures; + } + private _consecutiveFailures = 0; + stop(): void { this.stopped = true; this.currentSub?.close(); } private async connectAndConsume(): Promise { + // Tied to one connection attempt: set true when we actually enqueue + // an event, read by the run loop to decide whether to reset + // backoff. Resetting per-attempt (rather than per-loop-iteration at + // the top of run()) keeps the flag's lifetime crisp. + this.madeProgress = false; const sub = this.client.subscribe({ wantedCollections: this.wantedCollections, ...(this.cursor !== null ? { cursor: this.cursor } : {}), @@ -199,8 +215,15 @@ export class JetstreamIngestor { } private computeBackoff(failures: number): number { + // Defensive: `failures` is always >= 1 when called from the run loop + // (the increment happens before computeBackoff), but a future caller + // passing 0 would give `initialDelayMs / multiplier`, which is below + // the floor. Clamp explicitly. const exp = Math.min( - this.backoff.initialDelayMs * this.backoff.multiplier ** (failures - 1), + Math.max( + this.backoff.initialDelayMs, + this.backoff.initialDelayMs * this.backoff.multiplier ** (failures - 1), + ), this.backoff.maxDelayMs, ); if (this.backoff.jitter <= 0) return exp; diff --git a/apps/aggregator/src/records-do.ts b/apps/aggregator/src/records-do.ts index 2e1880add..2c3f3b25e 100644 --- a/apps/aggregator/src/records-do.ts +++ b/apps/aggregator/src/records-do.ts @@ -22,11 +22,13 @@ import { JetstreamIngestor, type IngestorStorage } from "./jetstream-ingestor.js /** Singleton DO ID. There's exactly one ingestor per deployment. */ export const RECORDS_DO_NAME = "main"; +type IngestorState = "running" | "crashed"; + export class RecordsJetstreamDO extends DurableObject { private readonly ingestor: JetstreamIngestor; - /** Held so the run loop isn't garbage-collected. We never await it - * outside `stop()` — `run()` only resolves when `stop()` is called. */ + /** Held so the run loop isn't garbage-collected. */ private readonly runPromise: Promise; + private state: IngestorState = "running"; constructor(state: DurableObjectState, env: Env) { super(state, env); @@ -36,23 +38,25 @@ export class RecordsJetstreamDO extends DurableObject { storage: wrapDoStorage(state.storage), }); // Fire-and-forget. Run loop is meant to live for the DO's lifetime; - // errors that escape it indicate a non-recoverable bug we want to - // see in logs. + // rejection means the loop hit a non-recoverable error (queue + // failures are absorbed inside `run()`). Surface that via the + // fetch handler so external monitoring can see it. this.runPromise = this.ingestor.run().catch((err) => { + this.state = "crashed"; console.error("[aggregator] jetstream ingestor crashed", err); }); } /** - * Cron-driven liveness ping. The DO instance is created on first call - * (which kicks off the constructor and the run loop) and stays warm as - * long as the WebSocket is open. Subsequent pings are no-ops aside from - * exercising the ingestor's current cursor as health output. + * Status surface used by `/_admin/start` (post-deploy bootstrap) and the + * 5-minute cron liveness pump. Idempotent — calling it on an + * already-running DO just reports the current cursor + ingestor state. */ override async fetch(_request: Request): Promise { return Response.json({ - status: "running", + status: this.state, cursor: this.ingestor.currentCursor, + consecutiveFailures: this.ingestor.consecutiveFailures, }); } } diff --git a/apps/aggregator/test/jetstream-client.test.ts b/apps/aggregator/test/jetstream-client.test.ts new file mode 100644 index 000000000..9bddca5aa --- /dev/null +++ b/apps/aggregator/test/jetstream-client.test.ts @@ -0,0 +1,117 @@ +/** + * `wrapAtcuteSubscription` regression test. + * + * The wrapper's `close()` MUST cancel a pending `for await` even when the + * underlying subscription is quiescent (no events arriving). MockJetstream + * actively resolves pending awaiters on close, so it can't catch a + * misbehaving production wrapper — this test pairs the wrapper with a stub + * whose `next()` never resolves, and asserts the for-await terminates within + * a small grace window after `close()`. + * + * Without the fix, the wrapper's `close()` flipped a flag the iterator only + * checked AFTER `inner.next()` resolved, so a quiescent stream would hang + * `stop()` indefinitely. This test failing means we've regressed there. + */ + +import { describe, expect, it } from "vitest"; + +import { wrapAtcuteSubscription, type RawJetstreamSubscription } from "../src/jetstream-client.js"; + +interface QuiescentEvent { + kind: string; +} + +/** Subscription stub whose `next()` only resolves when `return()` is called. + * Models a real WebSocket where Jetstream isn't currently emitting events. + */ +function quiescentSubscription(): RawJetstreamSubscription { + let resolveNext: (() => void) | null = null; + const innerIter: AsyncIterator = { + async next() { + await new Promise((resolve) => { + resolveNext = resolve; + }); + return { value: undefined, done: true }; + }, + async return() { + if (resolveNext) { + const r = resolveNext; + resolveNext = null; + r(); + } + return { value: undefined, done: true }; + }, + }; + return { + cursor: 0, + [Symbol.asyncIterator]: () => innerIter, + }; +} + +describe("wrapAtcuteSubscription", () => { + it("close() unblocks a for-await waiting on a quiescent subscription", async () => { + const sub = quiescentSubscription(); + const handle = wrapAtcuteSubscription(sub); + + // Start consuming on a background promise. The for-await will block + // on the first iter.next() because the underlying inner iterator + // only resolves when return() is called. + const consumed: QuiescentEvent[] = []; + const consumePromise = (async () => { + for await (const event of handle) { + consumed.push(event); + } + })(); + + // Yield a few microtasks so the consumer reaches the awaiting state. + await Promise.resolve(); + await Promise.resolve(); + await new Promise((r) => setTimeout(r, 0)); + + // The consumer should be wedged, not done. Use Promise.race to + // detect — if it's wedged, the timeout wins. + const beforeClose = await Promise.race([ + consumePromise.then(() => "done" as const), + new Promise<"pending">((r) => setTimeout(r, 10, "pending")), + ]); + expect(beforeClose).toBe("pending"); + + // close() must cancel the pending await. + handle.close(); + + // Now the consumer should resolve quickly. + await expect( + Promise.race([ + consumePromise.then(() => "done" as const), + new Promise<"timeout">((r) => setTimeout(r, 100, "timeout")), + ]), + ).resolves.toBe("done"); + + expect(consumed).toEqual([]); + }); + + it("filters non-commit events", async () => { + const events: Array<{ kind: string; commit?: { collection: string } }> = [ + { kind: "identity" }, + { kind: "commit", commit: { collection: "x" } }, + { kind: "account" }, + { kind: "commit", commit: { collection: "y" } }, + ]; + let i = 0; + const sub: RawJetstreamSubscription<(typeof events)[number]> = { + cursor: 0, + [Symbol.asyncIterator]: () => ({ + async next() { + if (i >= events.length) return { value: undefined, done: true }; + const value = events[i++]; + return { value: value as (typeof events)[number], done: false }; + }, + }), + }; + const handle = wrapAtcuteSubscription(sub); + const out: unknown[] = []; + for await (const event of handle) out.push(event); + expect(out).toHaveLength(2); + expect(out.every((e) => (e as { kind: string }).kind === "commit")).toBe(true); + }); +}); diff --git a/apps/aggregator/wrangler.jsonc b/apps/aggregator/wrangler.jsonc index 199306285..ff3716395 100644 --- a/apps/aggregator/wrangler.jsonc +++ b/apps/aggregator/wrangler.jsonc @@ -51,9 +51,14 @@ }, ], "triggers": { - // Reconciliation pass — listRecords per known DID, fix anything - // Jetstream missed. Every 6 hours per the plan. - "crons": ["0 */6 * * *"], + // Liveness ping for the records DO. The DO holds Jetstream open and + // stays alive while the WebSocket is up, but during a Jetstream + // outage it spends time in backoff sleeps with no active connection + // — that's when CF can evict it. The 5-minute cron wakes it back up + // quickly; constructor-time `ingestor.run()` resumes from the + // persisted cursor. Reconciliation work will share this trigger when + // it lands. + "crons": ["*/5 * * * *"], }, "vars": { // Jetstream endpoint. Override per environment for self-hosted relays From 81ae4f468a598cc074d449d8043d680e74116907 Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sat, 9 May 2026 16:31:24 +0100 Subject: [PATCH 3/4] =?UTF-8?q?fix(aggregator):=20C1=20fix=20was=20incompl?= =?UTF-8?q?ete=20=E2=80=94=20switch=20to=20closed-signal=20race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second adversarial review pass caught that the previous fix didn't actually unblock production. `@mary-ext/event-iterator`'s `return()` drops the resolver reference WITHOUT invoking it (lib/index.ts:55-67), so calling `inner.return()` on a quiescent EventIterator does NOT resolve the pending `next()` Promise. The first-pass test passed only because its stub explicitly resolved the awaiter — the production path still wedged. Real fix: race `inner.next()` against a `closedSignal` Promise. When `close()` is called, resolve the signal to a synthetic `{done: true}`, which unblocks the for-await regardless of the inner iterator's shutdown behaviour. The orphaned `it.next()` Promise is either resolved later (when an event arrives, harmless) or leaks forever (if Jetstream stays quiescent, but no resources are held — only the Promise object itself). Updated the regression test's stub to mimic EventIterator's actual behaviour: `next()` returns a Promise that NEVER resolves, even after `return()` is called. Test now fails against the previous fix and passes against the race-based one. Also dropped `state: "running" | "crashed"` from the DO. The same review noted it was dead code: `run()` swallows every error path internally, so `runPromise.catch` never fires, so `state` never flips. Removed the field entirely. The DO's fetch handler now reports just `cursor` and `consecutiveFailures`, which IS the real liveness signal. If we ever introduce a non-recoverable rejection path, we can add the state back with the same shape. --- apps/aggregator/src/jetstream-client.ts | 41 +++++++++++++------ apps/aggregator/src/records-do.ts | 19 ++++----- apps/aggregator/test/jetstream-client.test.ts | 30 ++++++++------ 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/apps/aggregator/src/jetstream-client.ts b/apps/aggregator/src/jetstream-client.ts index 512f0c999..99fb04f78 100644 --- a/apps/aggregator/src/jetstream-client.ts +++ b/apps/aggregator/src/jetstream-client.ts @@ -91,18 +91,36 @@ export interface RawJetstreamSubscription extends As export function wrapAtcuteSubscription( sub: RawJetstreamSubscription, ): JetstreamSubscriptionHandle { - // Hoist the inner iterator so `close()` can call `inner.return()` from - // outside the iterator factory. `EventIterator.return()` is what wakes a - // wedged for-await — it destroys the WebSocket and resolves any pending - // `next()` to `{ done: true }`. Without the hoist, `close()` would have - // no way to reach the inner iterator, and a quiescent stream (no events - // arriving) would block the consumer indefinitely. + // Hoist the inner iterator so `close()` can reach it from outside the + // iterator factory. let inner: AsyncIterator | null = null; + // Shutdown signal raced against `inner.next()`. We can't rely on + // `inner.return()` to unblock a pending `next()` — `@mary-ext/event-iterator` + // drops its resolver on `return()` without invoking it (lib/index.ts:55-67), + // so a quiescent stream's pending `next()` Promise leaks. Racing against + // `closedSignal` lets the consumer wake regardless of the inner iterator's + // behaviour. The orphaned `it.next()` Promise is one of: + // - resolved later when an event arrives (harmless, garbage-collected). + // - leaked forever if Jetstream stays quiescent (no value held; only + // the Promise object is GC-rooted by the inner iterator's #resolve). + let resolveClosed: (() => void) | null = null; + const closedSignal = new Promise((resolve) => { + resolveClosed = resolve; + }); + const fireClosed = () => { + if (resolveClosed) { + const r = resolveClosed; + resolveClosed = null; + r(); + } + }; + return { get cursor() { return sub.cursor; }, close: () => { + fireClosed(); void inner?.return?.(); }, [Symbol.asyncIterator](): AsyncIterator { @@ -110,13 +128,11 @@ export function wrapAtcuteSubscription( const it = inner; return { async next(): Promise> { - // Loop until either the inner iterator ends or we yield a - // commit. Shutdown is signalled exclusively via the inner - // iterator returning `done: true` (triggered by an - // external `close()` call or by Jetstream itself ending - // the stream). for (;;) { - const result = await it.next(); + const result = await Promise.race([ + it.next(), + closedSignal.then((): IteratorResult => ({ value: undefined, done: true })), + ]); if (result.done) return { value: undefined, done: true }; const event = result.value; if (event.kind === "commit") { @@ -129,6 +145,7 @@ export function wrapAtcuteSubscription( } }, async return(): Promise> { + fireClosed(); await it.return?.(); return { value: undefined, done: true }; }, diff --git a/apps/aggregator/src/records-do.ts b/apps/aggregator/src/records-do.ts index 2c3f3b25e..b7cd773d7 100644 --- a/apps/aggregator/src/records-do.ts +++ b/apps/aggregator/src/records-do.ts @@ -22,13 +22,10 @@ import { JetstreamIngestor, type IngestorStorage } from "./jetstream-ingestor.js /** Singleton DO ID. There's exactly one ingestor per deployment. */ export const RECORDS_DO_NAME = "main"; -type IngestorState = "running" | "crashed"; - export class RecordsJetstreamDO extends DurableObject { private readonly ingestor: JetstreamIngestor; /** Held so the run loop isn't garbage-collected. */ private readonly runPromise: Promise; - private state: IngestorState = "running"; constructor(state: DurableObjectState, env: Env) { super(state, env); @@ -37,12 +34,12 @@ export class RecordsJetstreamDO extends DurableObject { queue: env.RECORDS_QUEUE, storage: wrapDoStorage(state.storage), }); - // Fire-and-forget. Run loop is meant to live for the DO's lifetime; - // rejection means the loop hit a non-recoverable error (queue - // failures are absorbed inside `run()`). Surface that via the - // fetch handler so external monitoring can see it. + // Fire-and-forget. The run loop absorbs every error path internally + // today (transient queue failures, connection drops, parse errors + // all retry with backoff). The catch is here defensively — if a + // future change introduces a non-recoverable rejection, we want it + // in the logs rather than as an unhandled promise. this.runPromise = this.ingestor.run().catch((err) => { - this.state = "crashed"; console.error("[aggregator] jetstream ingestor crashed", err); }); } @@ -50,11 +47,13 @@ export class RecordsJetstreamDO extends DurableObject { /** * Status surface used by `/_admin/start` (post-deploy bootstrap) and the * 5-minute cron liveness pump. Idempotent — calling it on an - * already-running DO just reports the current cursor + ingestor state. + * already-running DO just reports the current cursor and consecutive + * failure count, which is the real liveness signal: 0 means the most + * recent connection attempt produced events; a high value indicates + * Jetstream is unreachable or the wantedCollections filter is wrong. */ override async fetch(_request: Request): Promise { return Response.json({ - status: this.state, cursor: this.ingestor.currentCursor, consecutiveFailures: this.ingestor.consecutiveFailures, }); diff --git a/apps/aggregator/test/jetstream-client.test.ts b/apps/aggregator/test/jetstream-client.test.ts index 9bddca5aa..b54e81e60 100644 --- a/apps/aggregator/test/jetstream-client.test.ts +++ b/apps/aggregator/test/jetstream-client.test.ts @@ -21,24 +21,28 @@ interface QuiescentEvent { kind: string; } -/** Subscription stub whose `next()` only resolves when `return()` is called. - * Models a real WebSocket where Jetstream isn't currently emitting events. +/** + * Subscription stub whose `next()` returns a Promise that NEVER resolves, + * even after `return()` is called. This mirrors `@mary-ext/event-iterator`'s + * actual behaviour: `EventIterator.return()` drops its resolver reference + * without invoking it (`lib/index.ts:55-67`), so a pending `next()` Promise + * is orphaned. If the wrapper relies on `inner.return()` resolving the + * pending await (the C1 mistake), this stub catches it — the for-await + * never wakes from `inner.return()` alone, only the closed-signal race in + * the wrapper can unblock it. */ function quiescentSubscription(): RawJetstreamSubscription { - let resolveNext: (() => void) | null = null; + let returned = false; const innerIter: AsyncIterator = { - async next() { - await new Promise((resolve) => { - resolveNext = resolve; - }); - return { value: undefined, done: true }; + next() { + if (returned) return Promise.resolve({ value: undefined, done: true }); + // Return a Promise that never settles. Mirrors EventIterator's + // behaviour: it stashes the resolver in a private field and + // drops it on return() without ever calling it. + return new Promise>(() => {}); }, async return() { - if (resolveNext) { - const r = resolveNext; - resolveNext = null; - r(); - } + returned = true; return { value: undefined, done: true }; }, }; From fa8da0777ec19de33f9c5c70335986510d8b7a1e Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sat, 9 May 2026 17:34:27 +0100 Subject: [PATCH 4/4] fix(aggregator): copilot review fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four findings on PR #972; addressing all. 1. `consecutiveFailures` semantic mismatch (jetstream-ingestor.ts + records-do.ts). Docstring claimed "0 means the most recent attempt produced events", but the run loop unconditionally incremented after each connectAndConsume return. So the counter was always ≥ 1 after any disconnect, including ones that successfully streamed events. Fix: increment only when no progress was made; reset to 0 (without increment) when progress was. Also added a regression test that asserts the counter stays 0 across three connect-disconnect cycles that all produced events. 2. `/_admin/start` returned the DO's status body (cursor + failure count). Even an idempotent admin endpoint shouldn't leak operational data to anonymous callers. Fix: route now fires the DO fetch via `ctx.waitUntil` and returns a fixed 204 — caller learns nothing about whether the DO was already running, just woke up, or is mid-startup. The DO's fetch handler still returns the status body (used internally by the cron liveness pump, which doesn't proxy it either). 3. Unhandled rejection in `wrapAtcuteSubscription.close()`. `void inner?.return?.()` suppressed the value but did NOT catch rejections. If the inner iterator's cleanup ever rejects (today it shouldn't, but a future EventIterator change could), workerd would surface an unhandled-promise warning. Fix: chain `.catch(() => {})`. Tests: 14 (was 13; added counter-semantics regression). 0 lint, 0 typecheck. --- apps/aggregator/src/index.ts | 25 +++++++------- apps/aggregator/src/jetstream-client.ts | 6 +++- apps/aggregator/src/jetstream-ingestor.ts | 13 ++++---- apps/aggregator/src/records-do.ts | 17 ++++++---- .../test/jetstream-ingestor.test.ts | 33 +++++++++++++++++++ 5 files changed, 67 insertions(+), 27 deletions(-) diff --git a/apps/aggregator/src/index.ts b/apps/aggregator/src/index.ts index 665bcb134..32dc1f958 100644 --- a/apps/aggregator/src/index.ts +++ b/apps/aggregator/src/index.ts @@ -23,11 +23,12 @@ export { RecordsJetstreamDO } from "./records-do.js"; /** * Operational bootstrap route. Hitting `/_admin/start` once after deploy * spins up the Records DO, which opens its outbound WebSocket and starts - * ingesting. The DO's WebSocket keeps it alive thereafter; this route is - * idempotent — calling it on an already-running DO just returns its current - * status. Recommended deploy hook: + * ingesting. The DO's WebSocket keeps it alive thereafter. The route is + * unauthenticated but returns no operational detail — just a fixed 204 — + * so a probing caller learns nothing useful. The action is idempotent on + * an already-running DO. Recommended deploy hook: * - * wrangler deploy && curl https://api.emdashcms.com/_admin/start + * wrangler deploy && curl -X POST https://api.emdashcms.com/_admin/start */ const BOOTSTRAP_PATH = "/_admin/start"; @@ -35,10 +36,14 @@ export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { const url = new URL(request.url); if (url.pathname === BOOTSTRAP_PATH) { - return bootstrapRecordsDo(env); + const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); + const stub = env.RECORDS_DO.get(id); + // Fire-and-forget so the response shape doesn't depend on the + // DO's status output. Caller gets the same 204 whether the DO + // was already running, just woke up, or is mid-startup. + ctx.waitUntil(stub.fetch("https://do.internal/bootstrap")); + return new Response(null, { status: 204 }); } - // Suppress unused-arg lint until the XRPC routes land. - void ctx; return new Response("emdash-aggregator: not yet implemented", { status: 503, headers: { "content-type": "text/plain" }, @@ -61,9 +66,3 @@ export default { ctx.waitUntil(stub.fetch("https://do.internal/liveness")); }, }; - -async function bootstrapRecordsDo(env: Env): Promise { - const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); - const stub = env.RECORDS_DO.get(id); - return stub.fetch("https://do.internal/bootstrap"); -} diff --git a/apps/aggregator/src/jetstream-client.ts b/apps/aggregator/src/jetstream-client.ts index 99fb04f78..78b606b38 100644 --- a/apps/aggregator/src/jetstream-client.ts +++ b/apps/aggregator/src/jetstream-client.ts @@ -121,7 +121,11 @@ export function wrapAtcuteSubscription( }, close: () => { fireClosed(); - void inner?.return?.(); + // `.catch` swallows rejections from the inner iterator's cleanup + // (an EventIterator's `return()` shouldn't reject, but a future + // implementation could). Without this, a rejection here would + // surface as an unhandled-promise warning in workerd. + inner?.return?.()?.catch(() => {}); }, [Symbol.asyncIterator](): AsyncIterator { inner ??= sub[Symbol.asyncIterator](); diff --git a/apps/aggregator/src/jetstream-ingestor.ts b/apps/aggregator/src/jetstream-ingestor.ts index 7148c6594..abb6daca5 100644 --- a/apps/aggregator/src/jetstream-ingestor.ts +++ b/apps/aggregator/src/jetstream-ingestor.ts @@ -135,16 +135,15 @@ export class JetstreamIngestor { while (!this.stopped) { try { await this.connectAndConsume(); - // Subscription ended cleanly (Jetstream closed the socket - // without error). Treat as a soft failure for backoff - // purposes — but if we successfully consumed events during - // the connection, reset the counter first so the backoff - // reflects the latest streak, not historical failures. + // Subscription ended cleanly. If we consumed at least one + // event, the connection was healthy — reset the counter and + // reconnect with the floor delay. Otherwise treat as a soft + // failure and grow the backoff. if (this.madeProgress) this._consecutiveFailures = 0; - this._consecutiveFailures += 1; + else this._consecutiveFailures += 1; } catch (err) { if (this.madeProgress) this._consecutiveFailures = 0; - this._consecutiveFailures += 1; + else this._consecutiveFailures += 1; this.logger.warn?.("jetstream subscription failed", { error: err instanceof Error ? err.message : String(err), consecutiveFailures: this._consecutiveFailures, diff --git a/apps/aggregator/src/records-do.ts b/apps/aggregator/src/records-do.ts index b7cd773d7..008c6d5c9 100644 --- a/apps/aggregator/src/records-do.ts +++ b/apps/aggregator/src/records-do.ts @@ -45,12 +45,17 @@ export class RecordsJetstreamDO extends DurableObject { } /** - * Status surface used by `/_admin/start` (post-deploy bootstrap) and the - * 5-minute cron liveness pump. Idempotent — calling it on an - * already-running DO just reports the current cursor and consecutive - * failure count, which is the real liveness signal: 0 means the most - * recent connection attempt produced events; a high value indicates - * Jetstream is unreachable or the wantedCollections filter is wrong. + * Status surface for the `/_admin/start` bootstrap and the 5-minute cron + * liveness pump. Idempotent — calling it on an already-running DO just + * reports the current cursor and consecutive-failure count. `0` means + * the most recent connection attempt produced at least one event; a + * non-zero value indicates the latest reconnect cycle hasn't yet + * delivered an event (Jetstream unreachable, wantedCollections + * mismatch, or queue backpressure). + * + * The bootstrap route in the worker doesn't proxy this body — it + * fires-and-forgets the DO fetch and returns 204 — so this surface is + * effectively internal to the DO + cron pump. */ override async fetch(_request: Request): Promise { return Response.json({ diff --git a/apps/aggregator/test/jetstream-ingestor.test.ts b/apps/aggregator/test/jetstream-ingestor.test.ts index 6358e0f20..e717053e8 100644 --- a/apps/aggregator/test/jetstream-ingestor.test.ts +++ b/apps/aggregator/test/jetstream-ingestor.test.ts @@ -256,6 +256,39 @@ describe("JetstreamIngestor", () => { await expect(h.runPromise).resolves.toBeUndefined(); }); + it("consecutiveFailures stays 0 across disconnect-with-events cycles", async () => { + // Per the documented contract: 0 means the most recent connection + // attempt produced at least one event. A connect → consume → close + // cycle must NOT bump the counter to 1. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + + // Three full cycles of: connect → emit → close. After each, the + // counter should still be 0 because each attempt made progress. + for (let i = 0; i < 3; i++) { + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: `r${i}` }); + await waitFor(() => queue.jobs.length === i + 1, `event ${i}`); + stream.closeAll(); + // Yield enough microtasks for the run loop to process the close + // and complete its bookkeeping before we inspect. + await new Promise((r) => setTimeout(r, 5)); + expect(ingestor.consecutiveFailures).toBe(0); + } + + ingestor.stop(); + await runPromise; + }); + it("resets backoff after a successful event, even across reconnects", async () => { // Without a reset, a subscription that disconnects → reconnects → // processes an event → disconnects again would back off as if the