Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions apps/aggregator/migrations/0001_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
-- EmDash plugin registry aggregator: initial schema.
--
-- Lands every v1 table at once on purpose. Slices 2 and 3 add features that
-- read these tables but do not need new ones, so this migration is the only
-- DDL we expect to ship before Slice 4 (NSID stabilisation), at which point
-- the table contents — not their shape — get rewritten.

------------------------------------------------------------------------------
-- Records: package profiles + releases
------------------------------------------------------------------------------

CREATE TABLE packages (
did TEXT NOT NULL,
slug TEXT NOT NULL,
type TEXT NOT NULL, -- 'emdash-plugin'
name TEXT,
description TEXT,
license TEXT NOT NULL,
authors TEXT NOT NULL, -- JSON array
security TEXT NOT NULL, -- JSON array
keywords TEXT, -- JSON array
sections TEXT, -- JSON map
last_updated TEXT,
-- Denormalised from latest release for query convenience. Updated on every
-- new release insert; readers never compute "latest" by sorting.
latest_version TEXT,
capabilities TEXT, -- JSON array
-- Raw signed record bytes for verification + envelope passthrough. Clients
-- re-verify the MST signature against the publisher's DID document at
-- install time.
record_blob BLOB NOT NULL,
signature_metadata TEXT, -- JSON: head CID, signing key id
verified_at TEXT NOT NULL,
PRIMARY KEY (did, slug)
);

CREATE TABLE releases (
did TEXT NOT NULL,
package TEXT NOT NULL, -- matches the parent profile's rkey/slug (record.package field)
version TEXT NOT NULL, -- canonical (un-percent-encoded) semver from record.version
rkey TEXT NOT NULL, -- exact rkey of the form `<package>:<encoded-version>`
-- Pre-computed semver-precedence-ordered string for ORDER BY. Application
-- code writes this; SQLite cannot compute semver order natively. Format
-- packs zero-padded major.minor.patch with prerelease tags compared per
-- semver precedence rules.
version_sort TEXT NOT NULL,
artifacts TEXT NOT NULL, -- JSON
requires TEXT, -- JSON
suggests TEXT, -- JSON
-- com.emdashcms.experimental.package.releaseExtension contents:
-- { declaredAccess }. The capabilities-shaped projection lives in
-- packages.capabilities for query convenience.
emdash_extension TEXT NOT NULL,
repo_url TEXT,
cts TEXT NOT NULL, -- creation timestamp from the record
record_blob BLOB NOT NULL,
signature_metadata TEXT,
verified_at TEXT NOT NULL,
tombstoned_at TEXT, -- soft delete (publisher deleted record)
PRIMARY KEY (did, package, version),
FOREIGN KEY (did, package) REFERENCES packages(did, slug)
);

CREATE INDEX idx_releases_latest ON releases(did, package, version_sort DESC) WHERE tombstoned_at IS NULL;
CREATE INDEX idx_releases_cts ON releases(cts);

-- Audit trail for rejected duplicate-version attempts. FAIR PR #77 makes
-- versions immutable: a second record at the same (did, package, version) is
-- rejected at the SQL layer and logged here for forensics.
CREATE TABLE release_duplicate_attempts (
did TEXT NOT NULL,
package TEXT NOT NULL,
version TEXT NOT NULL,
rejected_at TEXT NOT NULL,
reason TEXT NOT NULL,
attempted_record_blob BLOB NOT NULL
);

CREATE INDEX idx_release_duplicates ON release_duplicate_attempts(did, package, version);

------------------------------------------------------------------------------
-- Mirror tracking (Slice 3 populates; schema lands now to avoid churn)
------------------------------------------------------------------------------

CREATE TABLE mirrored_artifacts (
did TEXT NOT NULL,
slug TEXT NOT NULL,
version TEXT NOT NULL,
artifact_id TEXT NOT NULL, -- 'package', 'icon', etc.
r2_key TEXT NOT NULL,
bytes INTEGER NOT NULL,
content_type TEXT NOT NULL,
mirrored_at TEXT NOT NULL,
PRIMARY KEY (did, slug, version, artifact_id)
);

------------------------------------------------------------------------------
-- Labels (Slice 2 populates; schema lands now)
------------------------------------------------------------------------------

-- Append-only label history. Every label received is written here, including
-- negations. Current state is derived from latest cts per (src, uri, val) and
-- projected into label_state below for hot-path lookups.
CREATE TABLE labels (
src TEXT NOT NULL, -- labeller DID
uri TEXT NOT NULL, -- AT URI of subject
cid TEXT, -- optional version-specific CID
val TEXT NOT NULL, -- e.g. 'security:yanked', '!takedown'
neg INTEGER NOT NULL DEFAULT 0,
cts TEXT NOT NULL,
exp TEXT, -- optional expiry (RFC 3339)
sig BLOB NOT NULL, -- raw signature for client re-verification
ver INTEGER NOT NULL DEFAULT 1,
trusted INTEGER NOT NULL DEFAULT 0,
received_at TEXT NOT NULL,
PRIMARY KEY (src, uri, val, cts)
);

CREATE INDEX idx_labels_subject ON labels(uri);
CREATE INDEX idx_labels_latest ON labels(src, uri, val, cts DESC);

-- Latest-state projection: one row per (src, uri, val) holding the most recent
-- cts seen, including the neg flag and exp timestamp. Updated on every label
-- write within the same transaction. Query-time filters apply
-- `neg = 0 AND (exp IS NULL OR exp > now())` to determine whether a label is
-- currently in force.
--
-- Why retain rows for negated/expired labels rather than deleting them: an
-- out-of-order delivery (a positive label arriving after its negation) could
-- otherwise reinsert a row we'd already retracted. Keeping the row with its
-- `cts` lets the upsert reject the older positive.
CREATE TABLE label_state (
src TEXT NOT NULL,
uri TEXT NOT NULL,
val TEXT NOT NULL,
cid TEXT,
neg INTEGER NOT NULL DEFAULT 0,
cts TEXT NOT NULL,
exp TEXT,
trusted INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (src, uri, val)
);

-- Hot path for hard filters (yanked, takedown, etc.) from trusted issuers.
-- Partial index keeps the index small by storing only currently-active rows.
CREATE INDEX idx_label_state_enforce ON label_state(uri, val, trusted)
WHERE neg = 0 AND trusted = 1;

-- Trusted/known labellers (operator config, edited via deployment).
CREATE TABLE labellers (
did TEXT PRIMARY KEY,
endpoint TEXT NOT NULL, -- subscribeLabels URL
signing_key TEXT NOT NULL, -- cached #atproto_label key
signing_key_id TEXT NOT NULL,
trusted INTEGER NOT NULL DEFAULT 0,
added_at TEXT NOT NULL,
last_resolved_at TEXT NOT NULL,
notes TEXT
);

------------------------------------------------------------------------------
-- Search: FTS5 over packages
------------------------------------------------------------------------------

CREATE VIRTUAL TABLE packages_fts USING fts5(
name,
description,
keywords,
authors,
sections,
content='packages',
content_rowid='rowid',
tokenize='porter unicode61 remove_diacritics 2'
);

CREATE TRIGGER packages_ai AFTER INSERT ON packages BEGIN
INSERT INTO packages_fts(rowid, name, description, keywords, authors, sections)
VALUES (new.rowid, new.name, new.description, new.keywords, new.authors, new.sections);
END;

CREATE TRIGGER packages_au AFTER UPDATE ON packages BEGIN
INSERT INTO packages_fts(packages_fts, rowid, name, description, keywords, authors, sections)
VALUES ('delete', old.rowid, old.name, old.description, old.keywords, old.authors, old.sections);
INSERT INTO packages_fts(rowid, name, description, keywords, authors, sections)
VALUES (new.rowid, new.name, new.description, new.keywords, new.authors, new.sections);
END;

CREATE TRIGGER packages_ad AFTER DELETE ON packages BEGIN
INSERT INTO packages_fts(packages_fts, rowid, name, description, keywords, authors, sections)
VALUES ('delete', old.rowid, old.name, old.description, old.keywords, old.authors, old.sections);
END;

------------------------------------------------------------------------------
-- Ingest cursor state
------------------------------------------------------------------------------

-- Cursor state for ingest sources (Jetstream microsecond timestamp,
-- subscribeLabels seq cursors per labeller, etc.).
CREATE TABLE ingest_state (
source TEXT PRIMARY KEY, -- 'jetstream', 'labeller:did:web:labels.example.com', etc.
cursor TEXT NOT NULL,
updated_at TEXT NOT NULL
);

-- Known publisher DIDs we've seen via Jetstream or Constellation. Reconciliation
-- iterates this table; cold-start backfill seeds it from Constellation.
CREATE TABLE known_publishers (
did TEXT PRIMARY KEY,
pds TEXT, -- cached PDS endpoint from DID document
pds_resolved_at TEXT,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL
);
46 changes: 46 additions & 0 deletions apps/aggregator/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"name": "@emdash-cms/aggregator",
"version": "0.0.0",
"private": true,
"description": "EmDash plugin registry aggregator. Indexes com.emdashcms.experimental.package.* records via Jetstream + PDS-verified ingest, exposes XRPC reads.",
"type": "module",
"scripts": {
"dev": "vite dev",
"build": "vite build",
"preview": "vite preview",
"deploy": "vite build && wrangler deploy",
"typecheck": "tsgo --noEmit",
"test": "vitest run",
"db:migrate:local": "wrangler d1 migrations apply emdash-aggregator --local",
"db:migrate": "wrangler d1 migrations apply emdash-aggregator --remote"
},
"dependencies": {
"@atcute/atproto": "catalog:",
"@atcute/car": "catalog:",
"@atcute/cbor": "catalog:",
"@atcute/cid": "catalog:",
"@atcute/client": "catalog:",
"@atcute/crypto": "catalog:",
"@atcute/firehose": "catalog:",
"@atcute/identity-resolver": "catalog:",
"@atcute/jetstream": "catalog:",
"@atcute/lexicons": "catalog:",
"@atcute/mst": "catalog:",
"@atcute/multibase": "catalog:",
"@atcute/repo": "catalog:",
"@atcute/xrpc-server": "catalog:",
"@atcute/xrpc-server-cloudflare": "catalog:",
"@emdash-cms/registry-lexicons": "workspace:*"
},
"devDependencies": {
"@cloudflare/vite-plugin": "catalog:",
"@cloudflare/vitest-pool-workers": "catalog:",
"@cloudflare/workers-types": "catalog:",
"@emdash-cms/atproto-test-utils": "workspace:*",
"@types/node": "catalog:",
"typescript": "catalog:",
"vite": "catalog:",
"vitest": "catalog:",
"wrangler": "catalog:"
}
}
37 changes: 37 additions & 0 deletions apps/aggregator/src/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Worker environment bindings.
*
* Production wires real Cloudflare resources via wrangler.jsonc. Tests use
* the same Env type but bind in-memory fakes from `@emdash-cms/atproto-test-utils`.
*
* The Env type is the dependency-injection seam for the whole aggregator:
* any external service the code calls is reached through a binding here, and
* tests substitute fakes by populating the test pool's miniflare bindings.
*/

import type { D1Database, DurableObjectNamespace, Queue } from "@cloudflare/workers-types";

import type { RecordsJetstreamDO } from "./records-do.js";

export interface RecordsJob {
did: string;
collection: string;
rkey: string;
operation: "create" | "update" | "delete";
cid: string;
/**
* The Jetstream-supplied (unverified) record bytes. Compared against the
* verified PDS copy after fetch as a Jetstream-correctness signal; the
* verified copy always wins.
*/
jetstreamRecord?: unknown;
}

export interface Env {
DB: D1Database;
RECORDS_QUEUE: Queue<RecordsJob>;
RECORDS_DO: DurableObjectNamespace<RecordsJetstreamDO>;
JETSTREAM_URL: string;
CONSTELLATION_URL: string;
WANTED_COLLECTIONS: string;
}
42 changes: 42 additions & 0 deletions apps/aggregator/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* EmDash plugin registry aggregator: Worker entrypoint.
*
* Slice 1 ships ingest + read API. Subsequent slices add the labeller (Slice 2),
* the artifact mirror + web directory (Slice 3), and NSID stabilisation (Slice 4).
*
* Reading order for someone learning this code:
* 1. `env.ts` — the dependency-injection seam. Every external service the
* aggregator talks to is bound here.
* 2. `records-do.ts` — Jetstream connection (PR 2 of Slice 1).
* 3. `records-consumer.ts` — PDS-verified ingest (PR 3 of Slice 1).
* 4. `routes/*.ts` — XRPC read endpoints (PR 5 of Slice 1).
*
* The fetch/queue/scheduled handlers below are wired but no-op at PR 1.
* Tests against an empty database round-trip cleanly through them; the smoke
* test in `test/smoke.test.ts` proves migrations apply and the Worker boots.
*/

import type { ExecutionContext, MessageBatch, ScheduledEvent } from "@cloudflare/workers-types";

import type { Env, RecordsJob } from "./env.js";

export { RecordsJetstreamDO } from "./records-do.js";

export default {
async fetch(_request: Request, _env: Env, _ctx: ExecutionContext): Promise<Response> {
// Slice 1 PR 5 wires the XRPC routes here. Until then the only
// callable surface is the smoke test's database probes.
return new Response("emdash-aggregator: not yet implemented", {
status: 503,
headers: { "content-type": "text/plain" },
});
},

async queue(_batch: MessageBatch<RecordsJob>, _env: Env, _ctx: ExecutionContext): Promise<void> {
// Slice 1 PR 3 implements PDS-verified ingest here.
},

async scheduled(_event: ScheduledEvent, _env: Env, _ctx: ExecutionContext): Promise<void> {
// Slice 1 PR 4 implements the 6h reconciliation pass here.
},
};
28 changes: 28 additions & 0 deletions apps/aggregator/src/records-do.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Records Jetstream DO: holds a long-lived outbound WebSocket to Jetstream,
* filters our experimental package collections, and enqueues verification
* jobs onto the Records Queue.
*
* Slice 1 PR 1 (this commit) ships the DO class skeleton only — no WebSocket
* connection, no event handling. The wrangler.jsonc binding refers to this
* class; PR 2 fills in the connection + cursor persistence + reconnect
* backoff logic.
*
* Why a DO at all: outbound WebSockets stay open across requests, but a Worker
* isolate doesn't. A single DO instance keeps the Jetstream connection alive
* continuously. The Hibernation API doesn't apply here — it's server-side
* only, and our connection is outbound.
*/

import { DurableObject } from "cloudflare:workers";

import type { Env } from "./env.js";

export class RecordsJetstreamDO extends DurableObject<Env> {
override async fetch(_request: Request): Promise<Response> {
// PR 2: handle internal admin/debug requests (e.g. status, force-reconnect).
// For now the DO has no surface — instantiation alone is enough to keep
// the Jetstream connection alive (PR 2 starts it from `ctx.blockConcurrencyWhile`).
return new Response("not implemented", { status: 501 });
}
}
1 change: 1 addition & 0 deletions apps/aggregator/test/env.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/// <reference types="@cloudflare/vitest-pool-workers/types" />
Loading
Loading