diff --git a/apps/aggregator/src/index.ts b/apps/aggregator/src/index.ts index 095bc92f1..32dc1f958 100644 --- a/apps/aggregator/src/index.ts +++ b/apps/aggregator/src/index.ts @@ -16,11 +16,34 @@ */ import type { RecordsJob } from "./env.js"; +import { RECORDS_DO_NAME } from "./records-do.js"; export { RecordsJetstreamDO } from "./records-do.js"; +/** + * Operational bootstrap route. Hitting `/_admin/start` once after deploy + * spins up the Records DO, which opens its outbound WebSocket and starts + * ingesting. The DO's WebSocket keeps it alive thereafter. The route is + * unauthenticated but returns no operational detail — just a fixed 204 — + * so a probing caller learns nothing useful. The action is idempotent on + * an already-running DO. Recommended deploy hook: + * + * wrangler deploy && curl -X POST https://api.emdashcms.com/_admin/start + */ +const BOOTSTRAP_PATH = "/_admin/start"; + export default { - async fetch(_request: Request, _env: Env, _ctx: ExecutionContext): Promise { + async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { + const url = new URL(request.url); + if (url.pathname === BOOTSTRAP_PATH) { + const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); + const stub = env.RECORDS_DO.get(id); + // Fire-and-forget so the response shape doesn't depend on the + // DO's status output. Caller gets the same 204 whether the DO + // was already running, just woke up, or is mid-startup. + ctx.waitUntil(stub.fetch("https://do.internal/bootstrap")); + return new Response(null, { status: 204 }); + } return new Response("emdash-aggregator: not yet implemented", { status: 503, headers: { "content-type": "text/plain" }, @@ -31,7 +54,15 @@ export default { // PDS-verified ingest will land here. }, - async scheduled(_event: ScheduledEvent, _env: Env, _ctx: ExecutionContext): Promise { - // 6h reconciliation pass will land here. + async scheduled(_event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise { + // DO liveness. The records DO is meant to hold an outbound WebSocket + // continuously, but during a Jetstream outage it spends time in + // backoff sleeps — that's when CF can evict it. Hitting the DO from + // the cron wakes it back up; constructor-time `ingestor.run()` + // resumes from the persisted cursor. Reconciliation work will share + // this trigger when it lands. + const id = env.RECORDS_DO.idFromName(RECORDS_DO_NAME); + const stub = env.RECORDS_DO.get(id); + ctx.waitUntil(stub.fetch("https://do.internal/liveness")); }, }; diff --git a/apps/aggregator/src/jetstream-client.ts b/apps/aggregator/src/jetstream-client.ts new file mode 100644 index 000000000..78b606b38 --- /dev/null +++ b/apps/aggregator/src/jetstream-client.ts @@ -0,0 +1,159 @@ +/** + * Jetstream client abstraction. + * + * Production wraps `@atcute/jetstream`'s `JetstreamSubscription`. Tests bind + * `MockJetstream` from `@emdash-cms/atproto-test-utils`. The ingestor only + * depends on this interface, so the same code path runs in both worlds. + * + * The shape mirrors the subset of `JetstreamSubscription` we actually use: + * - async-iterable of commit events (we don't process identity/account + * events today), + * - a `cursor` getter exposing the time_us of the most recent event the + * iterator has yielded — used to persist the cursor for reconnection, + * - an explicit close. + * + * Open question we may revisit: real Jetstream emits identity + account + * events alongside commits. The ingestor narrows to commits today; if we + * grow to care about identity events for handle changes, widen the event + * type here and update the consumer. + */ + +import { JetstreamSubscription } from "@atcute/jetstream"; + +export interface JetstreamCommitEvent { + did: `did:${string}:${string}`; + time_us: number; + kind: "commit"; + commit: + | { + rev: string; + collection: string; + rkey: string; + operation: "create" | "update"; + cid: string; + record: Record; + } + | { + rev: string; + collection: string; + rkey: string; + operation: "delete"; + }; +} + +export interface JetstreamSubscribeOptions { + wantedCollections: readonly string[]; + cursor?: number; +} + +export interface JetstreamSubscriptionHandle extends AsyncIterable { + readonly cursor: number; + close(): void; +} + +export interface JetstreamClient { + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle; +} + +/** + * Production client backed by `@atcute/jetstream`. Filters non-commit events + * before yielding, so the ingestor doesn't have to switch on `kind` every + * iteration. + */ +export class RealJetstreamClient implements JetstreamClient { + constructor(private readonly url: string) {} + + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle { + const sub = new JetstreamSubscription({ + url: this.url, + wantedCollections: [...opts.wantedCollections], + ...(opts.cursor !== undefined ? { cursor: opts.cursor } : {}), + }); + return wrapAtcuteSubscription(sub); + } +} + +/** + * Minimum shape `wrapAtcuteSubscription` needs from its input: a `cursor` + * getter and an async iterable of events with a `kind` discriminator. Both + * `JetstreamSubscription` (production) and a stub-with-never-resolving-next + * (the C2 regression test) satisfy this without casts. + */ +export interface RawJetstreamSubscription extends AsyncIterable { + readonly cursor: number; +} + +/** + * Exported so tests can drive the wrapper against a stub subscription with a + * never-resolving `next()` and verify that `close()` actually cancels the + * pending await. Production callers should construct via `RealJetstreamClient`. + */ +export function wrapAtcuteSubscription( + sub: RawJetstreamSubscription, +): JetstreamSubscriptionHandle { + // Hoist the inner iterator so `close()` can reach it from outside the + // iterator factory. + let inner: AsyncIterator | null = null; + // Shutdown signal raced against `inner.next()`. We can't rely on + // `inner.return()` to unblock a pending `next()` — `@mary-ext/event-iterator` + // drops its resolver on `return()` without invoking it (lib/index.ts:55-67), + // so a quiescent stream's pending `next()` Promise leaks. Racing against + // `closedSignal` lets the consumer wake regardless of the inner iterator's + // behaviour. The orphaned `it.next()` Promise is one of: + // - resolved later when an event arrives (harmless, garbage-collected). + // - leaked forever if Jetstream stays quiescent (no value held; only + // the Promise object is GC-rooted by the inner iterator's #resolve). + let resolveClosed: (() => void) | null = null; + const closedSignal = new Promise((resolve) => { + resolveClosed = resolve; + }); + const fireClosed = () => { + if (resolveClosed) { + const r = resolveClosed; + resolveClosed = null; + r(); + } + }; + + return { + get cursor() { + return sub.cursor; + }, + close: () => { + fireClosed(); + // `.catch` swallows rejections from the inner iterator's cleanup + // (an EventIterator's `return()` shouldn't reject, but a future + // implementation could). Without this, a rejection here would + // surface as an unhandled-promise warning in workerd. + inner?.return?.()?.catch(() => {}); + }, + [Symbol.asyncIterator](): AsyncIterator { + inner ??= sub[Symbol.asyncIterator](); + const it = inner; + return { + async next(): Promise> { + for (;;) { + const result = await Promise.race([ + it.next(), + closedSignal.then((): IteratorResult => ({ value: undefined, done: true })), + ]); + if (result.done) return { value: undefined, done: true }; + const event = result.value; + if (event.kind === "commit") { + // Cast within the function: by `kind === "commit"` we + // know the event is a commit; the generic `E` is too + // wide for the compiler to narrow automatically. + return { value: event as unknown as JetstreamCommitEvent, done: false }; + } + // Skip identity/account events; loop until next commit. + } + }, + async return(): Promise> { + fireClosed(); + await it.return?.(); + return { value: undefined, done: true }; + }, + }; + }, + }; +} diff --git a/apps/aggregator/src/jetstream-ingestor.ts b/apps/aggregator/src/jetstream-ingestor.ts new file mode 100644 index 000000000..abb6daca5 --- /dev/null +++ b/apps/aggregator/src/jetstream-ingestor.ts @@ -0,0 +1,237 @@ +/** + * Jetstream ingestor. Subscribes to a Jetstream client, converts commit + * events into `RecordsJob` messages, enqueues them, and persists the + * cursor so reconnects resume cleanly. + * + * Owns: + * - Connection lifecycle (connect → consume → disconnect → backoff → + * reconnect, indefinitely until `stop()` is called). + * - Cursor persistence after each successful enqueue. Persisting AFTER + * enqueue means a crash can replay the most recent event; downstream + * ingest is idempotent (DO NOTHING on duplicate releases) so this is + * safe and strictly better than the alternative of skipping events. + * - Exponential backoff with jitter, capped, reset on each successful + * event. + * + * Pure constructor injection — no DO/D1/Queue infrastructure imports — so + * unit tests instantiate it directly with `MockJetstream` + an in-memory + * queue + a `Map`-backed storage. + */ + +import { WANTED_COLLECTIONS } from "./constants.js"; +import type { RecordsJob } from "./env.js"; +import type { + JetstreamClient, + JetstreamCommitEvent, + JetstreamSubscriptionHandle, +} from "./jetstream-client.js"; + +const CURSOR_STORAGE_KEY = "jetstream:cursor"; + +/** + * Subset of `Queue.send` we use. Return type is loose because workerd's + * `Queue.send` resolves to `QueueSendResponse` while a hand-rolled + * in-memory test queue resolves to `void` — neither piece of metadata + * matters to the ingestor. + */ +export interface JobQueue { + send(job: RecordsJob): Promise; +} + +/** Subset of DurableObjectStorage we use. Tests pass a Map-backed shim. */ +export interface IngestorStorage { + get(key: string): Promise; + put(key: string, value: number): Promise; +} + +export interface IngestorBackoffConfig { + /** Initial delay after the first disconnect (ms). Default 1s. */ + initialDelayMs?: number; + /** Cap (ms). Default 60s. */ + maxDelayMs?: number; + /** Multiplier per retry. Default 2. */ + multiplier?: number; + /** ±jitter as a fraction of the delay. Default 0.2 (±20%). 0 disables. */ + jitter?: number; +} + +export interface IngestorLogger { + info?(msg: string, ctx?: Record): void; + warn?(msg: string, ctx?: Record): void; + error?(msg: string, ctx?: Record): void; +} + +export interface JetstreamIngestorOptions { + client: JetstreamClient; + queue: JobQueue; + storage: IngestorStorage; + /** Defaults to the protocol-level WANTED_COLLECTIONS constant. */ + wantedCollections?: readonly string[]; + backoff?: IngestorBackoffConfig; + logger?: IngestorLogger; + /** Sleep impl, swap in tests to skip real backoff waits. */ + sleep?: (ms: number) => Promise; + /** Random source for jitter, swap in tests for determinism. */ + random?: () => number; +} + +const DEFAULT_BACKOFF: Required = { + initialDelayMs: 1_000, + maxDelayMs: 60_000, + multiplier: 2, + jitter: 0.2, +}; + +export class JetstreamIngestor { + private readonly client: JetstreamClient; + private readonly queue: JobQueue; + private readonly storage: IngestorStorage; + private readonly wantedCollections: readonly string[]; + private readonly backoff: Required; + private readonly logger: IngestorLogger; + private readonly sleep: (ms: number) => Promise; + private readonly random: () => number; + + private stopped = false; + private currentSub: JetstreamSubscriptionHandle | null = null; + private cursor: number | null = null; + /** Set on every successful enqueue. The reconnect loop resets the + * backoff counter when this is true at the start of a new attempt, so a + * subscription that connects, consumes events, and then drops doesn't + * spiral into ever-larger backoffs. */ + private madeProgress = false; + + constructor(opts: JetstreamIngestorOptions) { + this.client = opts.client; + this.queue = opts.queue; + this.storage = opts.storage; + this.wantedCollections = opts.wantedCollections ?? WANTED_COLLECTIONS; + this.backoff = { ...DEFAULT_BACKOFF, ...opts.backoff }; + this.logger = opts.logger ?? {}; + this.sleep = opts.sleep ?? defaultSleep; + this.random = opts.random ?? Math.random; + } + + /** The cursor most recently enqueued + persisted. `null` until the first event. */ + get currentCursor(): number | null { + return this.cursor; + } + + /** + * Run the connect-consume-reconnect loop until `stop()` is called. + * + * Resolves when `stop()` is called and the current subscription drains. + * Does NOT reject for transient failures — connection drops, parse + * errors, queue.send rejections all increment the backoff counter and + * retry. The DO observes liveness via the `currentCursor` getter and + * the failure counter exposed on the ingestor. We could instead bubble + * queue failures and let the DO crash loud; current choice is to + * absorb them because Cloudflare Queues have transient failures and + * the cron liveness ping recovers the DO either way. + */ + async run(): Promise { + this.cursor = (await this.storage.get(CURSOR_STORAGE_KEY)) ?? null; + + while (!this.stopped) { + try { + await this.connectAndConsume(); + // Subscription ended cleanly. If we consumed at least one + // event, the connection was healthy — reset the counter and + // reconnect with the floor delay. Otherwise treat as a soft + // failure and grow the backoff. + if (this.madeProgress) this._consecutiveFailures = 0; + else this._consecutiveFailures += 1; + } catch (err) { + if (this.madeProgress) this._consecutiveFailures = 0; + else this._consecutiveFailures += 1; + this.logger.warn?.("jetstream subscription failed", { + error: err instanceof Error ? err.message : String(err), + consecutiveFailures: this._consecutiveFailures, + }); + } + if (this.stopped) break; + await this.sleep(this.computeBackoff(this._consecutiveFailures)); + } + } + + /** Number of consecutive failed/empty connection attempts. Exposed for + * liveness probes; `0` means the most recent attempt produced events. */ + get consecutiveFailures(): number { + return this._consecutiveFailures; + } + private _consecutiveFailures = 0; + + stop(): void { + this.stopped = true; + this.currentSub?.close(); + } + + private async connectAndConsume(): Promise { + // Tied to one connection attempt: set true when we actually enqueue + // an event, read by the run loop to decide whether to reset + // backoff. Resetting per-attempt (rather than per-loop-iteration at + // the top of run()) keeps the flag's lifetime crisp. + this.madeProgress = false; + const sub = this.client.subscribe({ + wantedCollections: this.wantedCollections, + ...(this.cursor !== null ? { cursor: this.cursor } : {}), + }); + this.currentSub = sub; + try { + for await (const event of sub) { + if (this.stopped) break; + await this.handleEvent(event); + } + } finally { + sub.close(); + if (this.currentSub === sub) this.currentSub = null; + } + } + + private async handleEvent(event: JetstreamCommitEvent): Promise { + // Defence in depth: Jetstream filters server-side, but a future + // subscription change or a malicious relay could deliver something + // off-list. Trust nothing. + if (!this.wantedCollections.includes(event.commit.collection)) return; + + const job: RecordsJob = { + did: event.did, + collection: event.commit.collection, + rkey: event.commit.rkey, + operation: event.commit.operation, + cid: event.commit.operation === "delete" ? "" : event.commit.cid, + ...(event.commit.operation !== "delete" ? { jetstreamRecord: event.commit.record } : {}), + }; + + await this.queue.send(job); + // Persist cursor only after the queue has accepted the message. + // A crash between enqueue and persist replays the latest event on + // recovery; the consumer's idempotency rules (DO NOTHING on + // duplicate releases, upsert on profiles) absorb the duplicate. + this.cursor = event.time_us; + await this.storage.put(CURSOR_STORAGE_KEY, event.time_us); + this.madeProgress = true; + } + + private computeBackoff(failures: number): number { + // Defensive: `failures` is always >= 1 when called from the run loop + // (the increment happens before computeBackoff), but a future caller + // passing 0 would give `initialDelayMs / multiplier`, which is below + // the floor. Clamp explicitly. + const exp = Math.min( + Math.max( + this.backoff.initialDelayMs, + this.backoff.initialDelayMs * this.backoff.multiplier ** (failures - 1), + ), + this.backoff.maxDelayMs, + ); + if (this.backoff.jitter <= 0) return exp; + const range = exp * this.backoff.jitter; + const offset = (this.random() * 2 - 1) * range; + return Math.max(0, Math.round(exp + offset)); + } +} + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/apps/aggregator/src/records-do.ts b/apps/aggregator/src/records-do.ts index 7a3f8816c..008c6d5c9 100644 --- a/apps/aggregator/src/records-do.ts +++ b/apps/aggregator/src/records-do.ts @@ -3,22 +3,83 @@ * filters our experimental package collections, and enqueues verification * jobs onto the Records Queue. * - * Why a DO at all: outbound WebSockets stay open across requests, but a Worker - * isolate doesn't. A single DO instance keeps the Jetstream connection alive + * Why a DO at all: outbound WebSockets stay open across requests, but a + * Worker isolate doesn't. A single DO instance keeps the connection alive * continuously. The Hibernation API doesn't apply here — it's server-side * only, and our connection is outbound. * - * The class skeleton lands first so the wrangler.jsonc binding resolves; the - * connection + cursor persistence + reconnect backoff logic fills in next. + * The DO is thin by design — all the loop / cursor / backoff logic lives in + * `JetstreamIngestor`. The DO just wires real bindings into the ingestor; + * its `fetch` handler returns the current ingestor status for the + * `/_admin/start` bootstrap path and any later admin/status surface. */ import { DurableObject } from "cloudflare:workers"; +import { RealJetstreamClient } from "./jetstream-client.js"; +import { JetstreamIngestor, type IngestorStorage } from "./jetstream-ingestor.js"; + +/** Singleton DO ID. There's exactly one ingestor per deployment. */ +export const RECORDS_DO_NAME = "main"; + export class RecordsJetstreamDO extends DurableObject { + private readonly ingestor: JetstreamIngestor; + /** Held so the run loop isn't garbage-collected. */ + private readonly runPromise: Promise; + + constructor(state: DurableObjectState, env: Env) { + super(state, env); + this.ingestor = new JetstreamIngestor({ + client: new RealJetstreamClient(env.JETSTREAM_URL), + queue: env.RECORDS_QUEUE, + storage: wrapDoStorage(state.storage), + }); + // Fire-and-forget. The run loop absorbs every error path internally + // today (transient queue failures, connection drops, parse errors + // all retry with backoff). The catch is here defensively — if a + // future change introduces a non-recoverable rejection, we want it + // in the logs rather than as an unhandled promise. + this.runPromise = this.ingestor.run().catch((err) => { + console.error("[aggregator] jetstream ingestor crashed", err); + }); + } + + /** + * Status surface for the `/_admin/start` bootstrap and the 5-minute cron + * liveness pump. Idempotent — calling it on an already-running DO just + * reports the current cursor and consecutive-failure count. `0` means + * the most recent connection attempt produced at least one event; a + * non-zero value indicates the latest reconnect cycle hasn't yet + * delivered an event (Jetstream unreachable, wantedCollections + * mismatch, or queue backpressure). + * + * The bootstrap route in the worker doesn't proxy this body — it + * fires-and-forgets the DO fetch and returns 204 — so this surface is + * effectively internal to the DO + cron pump. + */ override async fetch(_request: Request): Promise { - // Internal admin/debug surface (status, force-reconnect) will land here. - // For now the DO has no surface — instantiation alone is enough to keep - // the Jetstream connection alive once the connection logic is in place. - return new Response("not implemented", { status: 501 }); + return Response.json({ + cursor: this.ingestor.currentCursor, + consecutiveFailures: this.ingestor.consecutiveFailures, + }); } } + +/** + * Adapt the workerd `DurableObjectStorage` (Promise-based key/value with + * unknown values) to the narrow `IngestorStorage` shape (string→number). + * Keeping the adaptation here means the ingestor stays free of workerd + * imports and the DO is the only place that needs to know about storage's + * type-erasure. + */ +function wrapDoStorage(storage: DurableObjectStorage): IngestorStorage { + return { + async get(key: string): Promise { + const value = await storage.get(key); + return typeof value === "number" ? value : undefined; + }, + async put(key: string, value: number): Promise { + await storage.put(key, value); + }, + }; +} diff --git a/apps/aggregator/test/jetstream-client.test.ts b/apps/aggregator/test/jetstream-client.test.ts new file mode 100644 index 000000000..b54e81e60 --- /dev/null +++ b/apps/aggregator/test/jetstream-client.test.ts @@ -0,0 +1,121 @@ +/** + * `wrapAtcuteSubscription` regression test. + * + * The wrapper's `close()` MUST cancel a pending `for await` even when the + * underlying subscription is quiescent (no events arriving). MockJetstream + * actively resolves pending awaiters on close, so it can't catch a + * misbehaving production wrapper — this test pairs the wrapper with a stub + * whose `next()` never resolves, and asserts the for-await terminates within + * a small grace window after `close()`. + * + * Without the fix, the wrapper's `close()` flipped a flag the iterator only + * checked AFTER `inner.next()` resolved, so a quiescent stream would hang + * `stop()` indefinitely. This test failing means we've regressed there. + */ + +import { describe, expect, it } from "vitest"; + +import { wrapAtcuteSubscription, type RawJetstreamSubscription } from "../src/jetstream-client.js"; + +interface QuiescentEvent { + kind: string; +} + +/** + * Subscription stub whose `next()` returns a Promise that NEVER resolves, + * even after `return()` is called. This mirrors `@mary-ext/event-iterator`'s + * actual behaviour: `EventIterator.return()` drops its resolver reference + * without invoking it (`lib/index.ts:55-67`), so a pending `next()` Promise + * is orphaned. If the wrapper relies on `inner.return()` resolving the + * pending await (the C1 mistake), this stub catches it — the for-await + * never wakes from `inner.return()` alone, only the closed-signal race in + * the wrapper can unblock it. + */ +function quiescentSubscription(): RawJetstreamSubscription { + let returned = false; + const innerIter: AsyncIterator = { + next() { + if (returned) return Promise.resolve({ value: undefined, done: true }); + // Return a Promise that never settles. Mirrors EventIterator's + // behaviour: it stashes the resolver in a private field and + // drops it on return() without ever calling it. + return new Promise>(() => {}); + }, + async return() { + returned = true; + return { value: undefined, done: true }; + }, + }; + return { + cursor: 0, + [Symbol.asyncIterator]: () => innerIter, + }; +} + +describe("wrapAtcuteSubscription", () => { + it("close() unblocks a for-await waiting on a quiescent subscription", async () => { + const sub = quiescentSubscription(); + const handle = wrapAtcuteSubscription(sub); + + // Start consuming on a background promise. The for-await will block + // on the first iter.next() because the underlying inner iterator + // only resolves when return() is called. + const consumed: QuiescentEvent[] = []; + const consumePromise = (async () => { + for await (const event of handle) { + consumed.push(event); + } + })(); + + // Yield a few microtasks so the consumer reaches the awaiting state. + await Promise.resolve(); + await Promise.resolve(); + await new Promise((r) => setTimeout(r, 0)); + + // The consumer should be wedged, not done. Use Promise.race to + // detect — if it's wedged, the timeout wins. + const beforeClose = await Promise.race([ + consumePromise.then(() => "done" as const), + new Promise<"pending">((r) => setTimeout(r, 10, "pending")), + ]); + expect(beforeClose).toBe("pending"); + + // close() must cancel the pending await. + handle.close(); + + // Now the consumer should resolve quickly. + await expect( + Promise.race([ + consumePromise.then(() => "done" as const), + new Promise<"timeout">((r) => setTimeout(r, 100, "timeout")), + ]), + ).resolves.toBe("done"); + + expect(consumed).toEqual([]); + }); + + it("filters non-commit events", async () => { + const events: Array<{ kind: string; commit?: { collection: string } }> = [ + { kind: "identity" }, + { kind: "commit", commit: { collection: "x" } }, + { kind: "account" }, + { kind: "commit", commit: { collection: "y" } }, + ]; + let i = 0; + const sub: RawJetstreamSubscription<(typeof events)[number]> = { + cursor: 0, + [Symbol.asyncIterator]: () => ({ + async next() { + if (i >= events.length) return { value: undefined, done: true }; + const value = events[i++]; + return { value: value as (typeof events)[number], done: false }; + }, + }), + }; + const handle = wrapAtcuteSubscription(sub); + const out: unknown[] = []; + for await (const event of handle) out.push(event); + expect(out).toHaveLength(2); + expect(out.every((e) => (e as { kind: string }).kind === "commit")).toBe(true); + }); +}); diff --git a/apps/aggregator/test/jetstream-ingestor.test.ts b/apps/aggregator/test/jetstream-ingestor.test.ts new file mode 100644 index 000000000..e717053e8 --- /dev/null +++ b/apps/aggregator/test/jetstream-ingestor.test.ts @@ -0,0 +1,383 @@ +/** + * JetstreamIngestor unit tests. + * + * Drives the ingestor against MockJetstream, an in-memory queue, and a + * Map-backed storage. No DO/D1/Queue runtime needed; the only Cloudflare + * binding the ingestor depends on is "something with a send method", which + * the mock queue trivially satisfies. + * + * Tests cover the ingestor's whole contract: event-to-job conversion, + * cursor persistence, reconnect with backoff, stop semantics. Each one + * pins a behaviour the production DO needs to honour; if a future change + * regresses any of them the failure surfaces here, not in production. + */ + +// Subpath imports avoid pulling in `@atproto/repo` (Node-crypto only) which +// the package's main entry transitively re-exports via FakeRepo. workerd +// can't load that, but we don't need it here — only MockJetstream + the +// NSID constants. +import { MockJetstream } from "@emdash-cms/atproto-test-utils/jetstream"; +import { PROFILE_NSID, RELEASE_NSID } from "@emdash-cms/atproto-test-utils/nsid"; +import { describe, expect, it } from "vitest"; + +import type { RecordsJob } from "../src/env.js"; +import type { + JetstreamClient, + JetstreamSubscribeOptions, + JetstreamSubscriptionHandle, +} from "../src/jetstream-client.js"; +import { + JetstreamIngestor, + type IngestorStorage, + type JobQueue, +} from "../src/jetstream-ingestor.js"; + +const TEST_DID = "did:plc:test00000000000000000000"; + +class InMemoryQueue implements JobQueue { + readonly jobs: RecordsJob[] = []; + send(job: RecordsJob): Promise { + this.jobs.push(job); + return Promise.resolve(); + } +} + +class MapStorage implements IngestorStorage { + private readonly map = new Map(); + get(key: string): Promise { + return Promise.resolve(this.map.get(key)); + } + put(key: string, value: number): Promise { + this.map.set(key, value); + return Promise.resolve(); + } +} + +/** + * Adapter that turns a MockJetstream into the JetstreamClient interface + * the ingestor uses. MockJetstream's subscribe() already returns an + * AsyncIterable with a cursor + close, so this is a thin pass-through — + * its only job is shaping the type. + */ +class MockJetstreamClient implements JetstreamClient { + constructor(private readonly stream: MockJetstream) {} + subscribe(opts: JetstreamSubscribeOptions): JetstreamSubscriptionHandle { + return this.stream.subscribe({ + wantedCollections: [...opts.wantedCollections], + ...(opts.cursor !== undefined ? { cursor: opts.cursor } : {}), + }); + } +} + +interface Harness { + stream: MockJetstream; + queue: InMemoryQueue; + storage: MapStorage; + ingestor: JetstreamIngestor; + runPromise: Promise; +} + +function buildHarness(opts: { wantedCollections?: readonly string[] } = {}): Harness { + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: opts.wantedCollections ?? [PROFILE_NSID, RELEASE_NSID], + // Tight backoff so tests don't sit in real timers; jitter off for + // deterministic assertions. + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + return { stream, queue, storage, ingestor, runPromise }; +} + +/** Wait until the predicate returns true or the test times out. Polls + * the microtask queue rather than wall-clock; the ingestor reads events + * eagerly inside an async iterator, so a small loop is enough. */ +async function waitFor(predicate: () => boolean, label: string, attempts = 200): Promise { + for (let i = 0; i < attempts; i++) { + if (predicate()) return; + await Promise.resolve(); + await new Promise((r) => setTimeout(r, 0)); + } + throw new Error(`waitFor timed out: ${label}`); +} + +describe("JetstreamIngestor", () => { + it("converts a commit create event into a RecordsJob and enqueues it", async () => { + const h = buildHarness(); + const event = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + cid: "bafyrecord", + record: { slug: "p", license: "MIT" }, + }); + + await waitFor(() => h.queue.jobs.length === 1, "first job enqueued"); + + expect(h.queue.jobs[0]).toEqual({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + operation: "create", + cid: "bafyrecord", + jetstreamRecord: { slug: "p", license: "MIT" }, + }); + expect(h.ingestor.currentCursor).toBe(event.time_us); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("persists cursor to storage after each successful enqueue", async () => { + const h = buildHarness(); + const e1 = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "a", + }); + const e2 = h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "b", + }); + + await waitFor(() => h.queue.jobs.length === 2, "both jobs enqueued"); + + expect(await h.storage.get("jetstream:cursor")).toBe(e2.time_us); + expect(e2.time_us).toBeGreaterThan(e1.time_us); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("resumes from the persisted cursor on a fresh ingestor", async () => { + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const earlier = stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "earlier", + }); + const later = stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "later", + }); + + // Pretend a previous run consumed the earlier event. + await storage.put("jetstream:cursor", earlier.time_us); + + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID, RELEASE_NSID], + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + + await waitFor(() => queue.jobs.length === 1, "later event enqueued"); + + expect(queue.jobs).toHaveLength(1); + expect(queue.jobs[0]?.rkey).toBe("later"); + expect(ingestor.currentCursor).toBe(later.time_us); + + ingestor.stop(); + await runPromise; + }); + + it("handles delete operations (no record body, empty cid)", async () => { + const h = buildHarness(); + h.stream.emit({ + did: TEST_DID, + time_us: Date.now() * 1000, + kind: "commit", + commit: { + rev: "rev-del", + collection: PROFILE_NSID, + rkey: "p", + operation: "delete", + }, + }); + + await waitFor(() => h.queue.jobs.length === 1, "delete job enqueued"); + + expect(h.queue.jobs[0]).toEqual({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "p", + operation: "delete", + cid: "", + }); + expect(h.queue.jobs[0]?.jetstreamRecord).toBeUndefined(); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("filters events outside wantedCollections (defence in depth)", async () => { + // Real Jetstream filters server-side, but a malicious or buggy relay + // could send something off-list. The ingestor must not enqueue it. + const h = buildHarness({ wantedCollections: [PROFILE_NSID] }); + h.stream.emitCommit({ + did: TEST_DID, + collection: RELEASE_NSID, // not in wantedCollections + rkey: "ignored:1.0.0", + }); + h.stream.emitCommit({ + did: TEST_DID, + collection: PROFILE_NSID, + rkey: "kept", + }); + + await waitFor(() => h.queue.jobs.length === 1, "filtered job enqueued"); + + expect(h.queue.jobs).toHaveLength(1); + expect(h.queue.jobs[0]?.rkey).toBe("kept"); + + h.ingestor.stop(); + await h.runPromise; + }); + + it("stop() ends the run loop cleanly", async () => { + const h = buildHarness(); + h.stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "p" }); + await waitFor(() => h.queue.jobs.length === 1, "first job"); + + h.ingestor.stop(); + await expect(h.runPromise).resolves.toBeUndefined(); + }); + + it("consecutiveFailures stays 0 across disconnect-with-events cycles", async () => { + // Per the documented contract: 0 means the most recent connection + // attempt produced at least one event. A connect → consume → close + // cycle must NOT bump the counter to 1. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 1, maxDelayMs: 5, multiplier: 2, jitter: 0 }, + sleep: () => Promise.resolve(), + }); + const runPromise = ingestor.run(); + + // Three full cycles of: connect → emit → close. After each, the + // counter should still be 0 because each attempt made progress. + for (let i = 0; i < 3; i++) { + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: `r${i}` }); + await waitFor(() => queue.jobs.length === i + 1, `event ${i}`); + stream.closeAll(); + // Yield enough microtasks for the run loop to process the close + // and complete its bookkeeping before we inspect. + await new Promise((r) => setTimeout(r, 5)); + expect(ingestor.consecutiveFailures).toBe(0); + } + + ingestor.stop(); + await runPromise; + }); + + it("resets backoff after a successful event, even across reconnects", async () => { + // Without a reset, a subscription that disconnects → reconnects → + // processes an event → disconnects again would back off as if the + // failures were continuous. The "made progress" flag should reset + // the counter so each disconnect that consumed events starts fresh + // from the initial delay. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const sleeps: number[] = []; + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 10, maxDelayMs: 1000, multiplier: 10, jitter: 0 }, + sleep: (ms) => { + sleeps.push(ms); + return Promise.resolve(); + }, + }); + const runPromise = ingestor.run(); + + // Cycle 1: emit event, close. After backoff this should still be + // the initial delay because we made progress. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "a" }); + await waitFor(() => queue.jobs.length === 1, "first job"); + stream.closeAll(); + await waitFor(() => sleeps.length >= 1, "first backoff"); + + // Cycle 2: emit another event, close. Backoff should still be the + // initial delay (10ms), not 100ms (10×10), because progress in + // between resets the counter. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "b" }); + await waitFor(() => queue.jobs.length === 2, "second job", 500); + stream.closeAll(); + await waitFor(() => sleeps.length >= 2, "second backoff", 500); + + expect(sleeps[0]).toBe(10); + expect(sleeps[1]).toBe(10); + + ingestor.stop(); + await runPromise; + }); + + it("computes exponential backoff with cap, no jitter for determinism", async () => { + // Direct unit on the backoff calc via stop()/restart of subscription. + // The straightforward way: drive the stream, close the subscription + // from underneath the ingestor (simulating a Jetstream disconnect), + // observe the sleep delays the ingestor passes to our injected + // `sleep`. Without driving real time we can't easily probe — instead + // verify the run loop survives a series of forced disconnects and + // keeps consuming after each. + const stream = new MockJetstream(); + const queue = new InMemoryQueue(); + const storage = new MapStorage(); + const sleeps: number[] = []; + const ingestor = new JetstreamIngestor({ + client: new MockJetstreamClient(stream), + queue, + storage, + wantedCollections: [PROFILE_NSID], + backoff: { initialDelayMs: 10, maxDelayMs: 80, multiplier: 2, jitter: 0 }, + sleep: (ms) => { + sleeps.push(ms); + return Promise.resolve(); + }, + }); + const runPromise = ingestor.run(); + + // Emit one event so the subscription has work, then close it from the + // MockJetstream side to simulate a server disconnect. The ingestor's + // loop sees the iterator end, sleeps with backoff, reconnects. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "a" }); + await waitFor(() => queue.jobs.length === 1, "first job"); + stream.closeAll(); + + // After the disconnect, post a second event so the new subscription + // has something to consume. Wait until it lands. + stream.emitCommit({ did: TEST_DID, collection: PROFILE_NSID, rkey: "b" }); + await waitFor(() => queue.jobs.length === 2, "post-reconnect job", 500); + + // At least one backoff sleep happened between the disconnect and + // the next successful subscription. + expect(sleeps.length).toBeGreaterThanOrEqual(1); + expect(sleeps[0]).toBeGreaterThanOrEqual(10); + expect(sleeps[0]).toBeLessThanOrEqual(80); + + ingestor.stop(); + await runPromise; + }); +}); diff --git a/apps/aggregator/wrangler.jsonc b/apps/aggregator/wrangler.jsonc index 199306285..ff3716395 100644 --- a/apps/aggregator/wrangler.jsonc +++ b/apps/aggregator/wrangler.jsonc @@ -51,9 +51,14 @@ }, ], "triggers": { - // Reconciliation pass — listRecords per known DID, fix anything - // Jetstream missed. Every 6 hours per the plan. - "crons": ["0 */6 * * *"], + // Liveness ping for the records DO. The DO holds Jetstream open and + // stays alive while the WebSocket is up, but during a Jetstream + // outage it spends time in backoff sleeps with no active connection + // — that's when CF can evict it. The 5-minute cron wakes it back up + // quickly; constructor-time `ingestor.run()` resumes from the + // persisted cursor. Reconciliation work will share this trigger when + // it lands. + "crons": ["*/5 * * * *"], }, "vars": { // Jetstream endpoint. Override per environment for self-hosted relays diff --git a/packages/atproto-test-utils/package.json b/packages/atproto-test-utils/package.json index 4855e97cd..0ed0b882c 100644 --- a/packages/atproto-test-utils/package.json +++ b/packages/atproto-test-utils/package.json @@ -6,7 +6,9 @@ "type": "module", "main": "src/index.ts", "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./jetstream": "./src/mock-jetstream.ts", + "./nsid": "./src/nsid.ts" }, "scripts": { "typecheck": "tsgo --noEmit", diff --git a/packages/atproto-test-utils/src/fake-publisher.ts b/packages/atproto-test-utils/src/fake-publisher.ts index 562388e5c..656f0baf8 100644 --- a/packages/atproto-test-utils/src/fake-publisher.ts +++ b/packages/atproto-test-utils/src/fake-publisher.ts @@ -24,10 +24,10 @@ const DID_KEY_PREFIX_RE = /^did:key:/; import { MockDidResolver, buildDidDocument } from "./mock-did-resolver.js"; import { MockJetstream } from "./mock-jetstream.js"; import { MockPds } from "./mock-pds.js"; +import { PROFILE_NSID, RELEASE_NSID } from "./nsid.js"; import type { AtprotoDid } from "./types.js"; -export const PROFILE_NSID = "com.emdashcms.experimental.package.profile"; -export const RELEASE_NSID = "com.emdashcms.experimental.package.release"; +export { PROFILE_NSID, RELEASE_NSID } from "./nsid.js"; export interface FakePublisherOptions extends FakeRepoOptions { handle?: string; diff --git a/packages/atproto-test-utils/src/nsid.ts b/packages/atproto-test-utils/src/nsid.ts new file mode 100644 index 000000000..53c0ad34c --- /dev/null +++ b/packages/atproto-test-utils/src/nsid.ts @@ -0,0 +1,8 @@ +/** + * NSID constants. Split from `fake-publisher.ts` so they can be imported in + * Worker test contexts (workerd) without pulling in `@atproto/repo` and its + * Node-crypto dependencies. + */ + +export const PROFILE_NSID = "com.emdashcms.experimental.package.profile"; +export const RELEASE_NSID = "com.emdashcms.experimental.package.release";