From 1ad21bf00890a036dfac44a377d1ddad1abf35da Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 01:30:38 +0000 Subject: [PATCH 1/3] docs(spec): create-room origin-site MV fix design Sibling fix to PR #145 (federated-room MV update for add/remove): apply the same local-INBOX publish pattern to the room-creation path so freshly-created rooms appear in user-room and spotlight indexes immediately, not on the next add/remove. Two narrow additions: one publish at the end of room-worker's finishCreateRoom (origin site) and a symmetric one at the end of inbox-worker's handleRoomCreated (federated remote sites). Wire format byte-for-byte matches PR #145 so search-sync-worker decodes both identically. Forward-only rollout per agreement; no backfill tool. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...1-create-room-origin-site-mv-fix-design.md | 325 ++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md diff --git a/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md new file mode 100644 index 000000000..b4cce1297 --- /dev/null +++ b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md @@ -0,0 +1,325 @@ +# Create-Room Origin-Site MV Fix — Design + +**Date:** 2026-05-11 +**Status:** Draft +**Services:** `room-worker`, `inbox-worker` +**Related specs:** +- `2026-04-09-room-spotlight-user-room-design.md` (user-room and spotlight collections) +- `2026-04-21-search-service-sync-worker-extension-design.md` (search-sync-worker INBOX consumer) +- `2026-05-01-federated-room-origin-site-mv-fix-design.md` (sibling fix for add/remove flows; this spec applies the same pattern to the create flow) + +## Problem + +PR #145 closed the origin-site MV gap for `member.add` / `member.remove` operations by adding a local `chat.inbox.{siteID}.member_added` / `member_removed` publish from `room-worker`. That publish drives `search-sync-worker`'s `user-room-{siteID}` and `spotlight-{siteID}` ES indexes. + +The room-creation path still has the same gap. When a room is created — channel, DM, or botDM — `room-worker.finishCreateRoom` writes the auto-enrolled `Subscription` rows (creator + DM recipient + every initial channel member) but never publishes a `member_added` event for them. `inbox-worker.handleRoomCreated` has the symmetric gap on remote sites: it upserts the local-side `Subscription` rows for federated rooms but never publishes a `member_added` event for the locally-resolved members. + +Result: a freshly-created room is invisible to search until the next add/remove operation re-emits the event. + +### User-visible consequences + +1. **Spotlight (room typeahead) returns nothing for the new room.** The creator types the room name; `search-sync-worker` has no spotlight doc; no result. +2. **Cross-site message search returns empty for the new room.** CCS terms-lookup against the user's `user-room-{siteID}` doc reports the user as not subscribed; message hits are filtered out as unauthorized. +3. **Self-corrects on churn.** Both indexes catch up on the next `member.add` or `member.remove` against the room (PR #145's publish fires there). Until then, the room is silently invisible to search. + +### Concrete trace + +Alice on `s1` creates a channel `r1` with `Orgs: [eng-org]` (org expands to `[bob@s1, charlie@s1, dave@s2]`). Today: + +| Subject | Stream | Effect | +|---|---|---| +| `chat.user.{account}.event.subscription.update` × 4 | core | Frontend left-panel updates for alice, bob, charlie, dave (cross-site routes via supercluster) | +| `chat.room.canonical.s1.create` | core (sys-message only, channel-only) | "alice created the room" | +| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | Federation: dave's site receives `room_created`, `inbox-worker.handleRoomCreated` upserts dave's local `Subscription` row | + +`s1`'s `user-room-s1` index gains zero entries. `s2`'s `user-room-s2` gains zero entries. Alice CCS-querying `r1` from any site fans out to `s1` and `s2` and finds no MV doc for any user → empty result. + +## Goals + +- `user-room-{siteID}` and `spotlight-{siteID}` on both the origin site and every federated remote site contain correct entries for every member auto-enrolled at room creation, regardless of room type. +- Fix lives in `room-worker.finishCreateRoom` (origin) and `inbox-worker.handleRoomCreated` (remote) — no new services, no new model types, no new subjects, no changes to `search-sync-worker` or stream config. +- Wire format byte-for-byte compatible with PR #145's existing publishes, so `search-sync-worker/inbox_stream.go::parseMemberEvent` decodes the new events identically. + +## Non-Goals + +- **Backfilling pre-fix rooms.** Forward-only deployment per agreement with team. Rooms created before this fix lands stay missing in their origin/federated MV until any later add/remove churn re-emits the event. Not documenting under "Known Limitations" because the operational expectation is "if the rooms matter, run any add-member operation on them". +- **Changing `chat.user.{account}.event.subscription.update` or `chat.room.canonical.{siteID}.create`.** UI fan-out and sys-message paths are correct; not in scope. +- **Refactoring `finishCreateRoom` or `handleRoomCreated`.** Both are narrow helpers; we're adding one publish to each. +- **Mint-on-create for the room encryption key.** Separate concern, deferred until `ENCRYPTION_ENABLED=true` is required in prod. + +## Design + +### NATS Subjects + +No new subjects. PR #145 already exposed: + +```go +// chat.inbox.{siteID}.member_added — local lane +subject.InboxMemberAdded(siteID) +``` + +Both `room-worker` and `inbox-worker` already import `pkg/subject`. + +### Wire format + +The publish wraps `model.MemberAddEvent` in `model.OutboxEvent`, matching PR #145's federated-lane wire format byte-for-byte: + +```go +inner := model.MemberAddEvent{ + Type: "member_added", + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, // see "Accounts list" below + SiteID: room.SiteID, // origin + JoinedAt: req.Timestamp, + HistorySharedSince: nil, // see "HistorySharedSince" below + Timestamp: now.UnixMilli(), +} +outbox := model.OutboxEvent{ + Type: "member_added", + SiteID: room.SiteID, // origin + DestSiteID: room.SiteID, // self — local publish + Payload: mustMarshal(inner), + Timestamp: now.UnixMilli(), +} +``` + +### Accounts list + +For every room type, the `Accounts` slice carries **expanded individual account names**, not org IDs. This matches PR #145's `processAddMembers` behavior; `search-sync-worker`'s per-user MV needs accounts. + +| Room type | Accounts source | +|---|---| +| Channel (with or without orgs) | `subs[].User.Account` — org expansion already happened upstream in `room-worker.processCreateRoomChannel` | +| DM | `subs[].User.Account` — both creator and recipient | +| botDM | `subs[].User.Account` — creator only (the bot account doesn't get a sub by design) | + +In all three cases the source is the same: the `subs []*model.Subscription` already passed to `finishCreateRoom`. + +### HistorySharedSince + +Always `nil` (unrestricted) for the create-time event. No prior history exists at room creation, so "share history since this timestamp" is meaningless — every member sees everything from t=0. Channel restricted-history kicks in only at later add-member events, which are already covered by PR #145. + +### Dedup ID + +Same shape as PR #145's cross-site OUTBOX publishes. Reuse the existing `outboxDedupID` helper: + +```go +payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, requester.Account, req.Timestamp) +dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) +``` + +`outboxDedupID` prefers `X-Request-ID` from `ctx`, falling back to the seed. `destSiteID = room.SiteID` (self-loop) namespaces it so it can't collide with the per-remote-site OUTBOX dedup IDs the same `finishCreateRoom` invocation will emit. + +For `inbox-worker.handleRoomCreated`, the seed mirrors the room-worker side but uses the locally-resolved info: + +```go +payloadSeed := fmt.Sprintf("%s:%s:%d", data.RoomID, data.RequesterAccount, data.Timestamp) +dedupID := outboxDedupID(ctx, h.siteID, payloadSeed) +``` + +`X-Request-ID` is preserved across federation by `inbox-worker`'s consumer setup, so the dedup ID is stable across redeliveries. + +### End-to-end flow after the fix + +For the same `[bob@s1, charlie@s1, dave@s2]` channel-create on `s1`: + +| Subject | Stream | Effect | +|---|---|---| +| `chat.user.{account}.event.subscription.update` × 4 | core | UI fan-out (unchanged) | +| `chat.room.canonical.s1.create` (sys-message only) | core | "alice created the room" (unchanged) | +| **`chat.inbox.s1.member_added`** (NEW) | INBOX_s1 (local lane) | s1's `user-room-sync` + `spotlight-sync` → s1's MV/spotlight gain docs for alice + bob + charlie + dave | +| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | SubjectTransform → `chat.inbox.s2.aggregate.room_created` → s2's `inbox-worker.handleRoomCreated` | +| **`chat.inbox.s2.member_added`** (NEW, from inbox-worker) | INBOX_s2 (local lane) | s2's `user-room-sync` + `spotlight-sync` → s2's MV/spotlight gain a doc for dave | + +End state: every site's MV/spotlight indexes have docs for every locally-affected member. CCS terms-lookup queries from any user resolve correctly. + +### Ordering + +The local-INBOX publish goes **after** the existing `subscription.update` loop and **before** the per-remote-site OUTBOX loop in `finishCreateRoom`. In `inbox-worker.handleRoomCreated`, the publish goes **after** `BulkCreateSubscriptions` (so the subs are durable before the search-sync event fires). + +### Idempotency + +`outboxDedupID` produces a stable JetStream `Nats-Msg-Id` per (room, requester, timestamp, destSiteID) tuple. JetStream stream-level dedup drops redeliveries within its dedup window (default 2 minutes, configured per stream). Beyond that window, `search-sync-worker`'s Painless last-write-wins guard makes replay idempotent on the ES side. + +## Code Changes + +### Change 1 — `room-worker/handler.go::finishCreateRoom` + +After the existing `subscription.update` loop (line ~1117) and before the per-remote-site OUTBOX loop (line ~1129): + +```go +// NEW — local INBOX publish for search-sync-worker (origin-site MV update). +// Mirrors PR #145's wire format; see docs/superpowers/specs/ +// 2026-05-11-create-room-origin-site-mv-fix-design.md. +accounts := make([]string, 0, len(subs)) +for _, sub := range subs { + accounts = append(accounts, sub.User.Account) +} +inner := model.MemberAddEvent{ + Type: "member_added", + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + HistorySharedSince: nil, + Timestamp: now.UnixMilli(), +} +innerData, _ := json.Marshal(inner) +outbox := model.OutboxEvent{ + Type: "member_added", + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: innerData, + Timestamp: now.UnixMilli(), +} +outboxData, _ := json.Marshal(outbox) +payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) +dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) +if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, dedupID); err != nil { + slog.Error("local inbox member_added publish failed", + "error", err, "roomID", room.ID, "requestID", requestID) +} +``` + +`subs []*model.Subscription` is already in scope. + +### Change 2 — `inbox-worker/handler.go::handleRoomCreated` + +After the existing `BulkCreateSubscriptions` call (line ~310), before the function returns: + +```go +// NEW — local INBOX publish for search-sync-worker. Same wire format as +// room-worker.finishCreateRoom; the locally-resolved member set is +// data.Accounts (every account FindUsersByAccounts returned). +accounts := make([]string, 0, len(data.Accounts)) +for _, account := range data.Accounts { + accounts = append(accounts, account) +} +inner := model.MemberAddEvent{ + Type: "member_added", + RoomID: data.RoomID, + RoomName: data.RoomName, + Accounts: accounts, + SiteID: data.HomeSiteID, + JoinedAt: data.Timestamp, + HistorySharedSince: nil, + Timestamp: time.Now().UTC().UnixMilli(), +} +innerData, _ := json.Marshal(inner) +outbox := model.OutboxEvent{ + Type: "member_added", + SiteID: h.siteID, + DestSiteID: h.siteID, + Payload: innerData, + Timestamp: inner.Timestamp, +} +outboxData, _ := json.Marshal(outbox) +payloadSeed := fmt.Sprintf("%s:%s:%d", data.RoomID, data.RequesterAccount, data.Timestamp) +dedupID := outboxDedupID(ctx, h.siteID, payloadSeed) +if err := h.publish(ctx, subject.InboxMemberAdded(h.siteID), outboxData, dedupID); err != nil { + slog.Error("local inbox member_added publish failed", + "error", err, "roomID", data.RoomID, "requestID", requestID) +} +``` + +`inbox-worker` does not currently import `pkg/subject`; add the import. + +`inbox-worker.Handler` does not currently have a `publish` field or an `outboxDedupID` helper — both will be added: +- `Handler.publish PublishFunc` injected via `NewHandler`, signature `func(ctx, subj, data, msgID) error`. Wired in `inbox-worker/main.go` to `nc.Publish` on the existing `*otelnats.Conn`. +- `outboxDedupID(ctx, destSiteID, payloadSeed)` — copy verbatim from `room-worker/handler.go:39`. Trivially small and avoids creating a new shared package for one helper. + +### Error handling + +Both publishes use the log-and-continue pattern from PR #145's existing local-INBOX publishes. The local publish failing must not fail the user-facing create request — JetStream redelivery (federation path) and `search-sync-worker`'s Painless guard (ES path) handle transient failures self-correctingly. Failing the whole create because the search index didn't update would be the wrong trade-off. + +### What is NOT changed + +- `pkg/subject` — `InboxMemberAdded` already exists. +- `pkg/stream` — `INBOX_{siteID}` already accepts `chat.inbox.{siteID}.*`. +- `pkg/model` — no new types; reuses `OutboxEvent`, `MemberAddEvent`, `RoomCreatedOutbox`. +- `inbox-worker` consumer FilterSubjects — PR #145 already scopes it to `chat.inbox.{siteID}.aggregate.>`, so the new local-lane publish stays out of `inbox-worker`'s own consumption (avoids self-feedback). +- `search-sync-worker`, `message-worker`, `broadcast-worker`, `history-service` — untouched. + +### Diff size estimate + +- `room-worker/handler.go`: +~22 lines. +- `inbox-worker/handler.go`: +~28 lines (publish block + import). +- `inbox-worker/main.go`: +~3 lines (wire publish). +- `inbox-worker/handler_test.go` and `inbox-worker/mock_*.go` (if any): updated for `NewHandler` signature. +- Tests: see Testing. + +## Testing + +Unit tests only, mirroring PR #145's pattern. Each handler injects `publish` as a field already (room-worker) or after this change (inbox-worker), so tests capture publishes into a slice and assert on the captured entries. + +### Unit tests — `room-worker/handler_test.go` + +Two new tests against `processCreateRoom*` paths: + +- `TestHandler_processCreateRoom_Channel_PublishesLocalInbox`: + - Mixed-site channel create with orgs and direct accounts. + - Assert exactly one publish to `subject.InboxMemberAdded(room.SiteID)`. + - Decode the payload as `OutboxEvent` → inner `MemberAddEvent`. + - Verify: `OutboxEvent.{Type, SiteID, DestSiteID}` match origin self-loop; inner `Accounts` is the expanded set covering creator + every org-resolved account; inner `HistorySharedSince` is nil; inner `RoomName` matches `room.Name`; `Nats-Msg-Id` matches `outboxDedupID(ctx, siteID, "{rid}:{requester}:{ts}")`. + +- `TestHandler_processCreateRoom_DM_PublishesLocalInbox`: + - DM create across sites (creator local, recipient remote). + - Assert one publish to `subject.InboxMemberAdded(room.SiteID)`. + - Verify `Accounts` contains both creator and recipient; `HistorySharedSince` is nil; `RoomName` is empty (DM convention). + +### Unit tests — `inbox-worker/handler_test.go` + +Two new tests against `handleRoomCreated`: + +- `TestHandler_handleRoomCreated_Channel_PublishesLocalInbox`: + - Channel arrives via federation with two locally-resolved accounts. + - Mock `FindUsersByAccounts` to return both. + - Assert one publish to `subject.InboxMemberAdded(h.siteID)`. + - Verify wire format matches the room-worker side; `Accounts` is the locally-resolved subset; `OutboxEvent.SiteID == DestSiteID == h.siteID`. + +- `TestHandler_handleRoomCreated_DM_PublishesLocalInbox`: + - DM arrives via federation with one locally-resolved account (the recipient on this site). + - Same assertions, single account in `Accounts`. + +Add a third negative test: +- `TestHandler_handleRoomCreated_EmptyAccounts_NoPublish` — when `data.Accounts` is empty (early return on line 261), assert zero publishes. + +### Out of scope for new tests + +- Integration tests against real NATS / Mongo — not in this PR's scope per agreement (would double diff size for marginal coverage gain). +- `search-sync-worker`'s ES write path — already covered by `search-sync-worker/inbox_integration_test.go` against the federated lane. The local lane uses identical payload shape decoded by the same `parseMemberEvent`, so duplicating that test against the local lane adds little. +- `processAddMembers` / `processRemoveIndividual` / `processRemoveOrg` paths — covered by PR #145's existing tests; this change does not touch them. + +### Coverage target + +Combined unit coverage for the two modified handler functions stays above the 80% project minimum (CLAUDE.md). Spot-check via `go tool cover -func=coverage.out` after implementation. + +## Rollout + +Both changes are backward-compatible: + +- `room-worker` publishing to `chat.inbox.{site}.member_added` is purely additive. PR #145 already created the subject and `search-sync-worker` already filters on it. +- `inbox-worker` publishing to the same subject is also additive. The `inbox-worker` consumer's `FilterSubjects` (PR #145 / commit `c779ede`) is scoped to `chat.inbox.{siteID}.aggregate.>`, so its own publish stays off its own consumer — no self-feedback loop. + +Recommended: deploy both services in the same release per site so the create-time MV update is symmetric across origin and remote on day one. No coordinated multi-site rollout needed. + +### Per-site verification after deploy + +1. Create a federated room (channel or DM) with members on at least one remote site. +2. Within seconds, query each site's `user-room-{siteID}` ES index and confirm: + - The creator's doc on the origin site contains the new room ID. + - Every channel member's / DM recipient's doc on their respective home site contains the new room ID. +3. Confirm spotlight typeahead returns the new room for the creator on the origin site. + +## Observability + +- **Logs:** new publishes use `slog.Error` log-and-continue. Failure messages: `"local inbox member_added publish failed"` (matches PR #145's exact string for grep parity). Search post-deploy to confirm zero failures. +- **Metrics:** none added. Existing JetStream stream-level metrics on `INBOX_{siteID}` will show throughput on `chat.inbox.{siteID}.member_added` rise from "PR #145's add/remove rate" to "that plus the create rate" — that rise is itself the success signal. +- **Traces:** the new publishes inherit the request context, so OTel trace IDs propagate end-to-end (`room-worker` → INBOX → `search-sync-worker` → ES bulk write all under one trace; `inbox-worker` extends the same trace from the federated arrival side). + +## Risks + +- **Federation duplicate publishes if a future bug routes a `room_created` event back to its origin site.** Today the OUTBOX → INBOX SubjectTransform plus the consumer FilterSubjects exclude the origin from the federated path, so this can't happen. Mitigation: the dedup ID uses `room.SiteID` (origin) on the room-worker side and `h.siteID` (this site) on the inbox-worker side — if a future config bug delivered the origin's own event to itself via federation, the two dedup IDs would still differ (origin vs. self), so JetStream would not collapse them. The `search-sync-worker` Painless last-write-wins guard would still produce the correct end state, but at the cost of one duplicate publish per federation cycle. Acceptable. +- **Stale spec drift if room-worker's create path adds a new sub source.** If a future change adds members to a room outside the `subs []*model.Subscription` slice passed to `finishCreateRoom` (e.g., a parallel "channel preset members" feature), the new INBOX publish would miss them. Mitigation: keep `subs` as the single source of truth for "who got auto-enrolled at create time" — every code path that adds a sub to a freshly-created room must go through `subs` and therefore through the publish. Document this invariant inline as a one-line comment above the publish. From 36dcb3ad45f7ce71e9ebc9140d9704e58b209d07 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 01:52:06 +0000 Subject: [PATCH 2/3] feat(room-worker): create-room origin-site MV fix per spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sibling fix to PR #145 (federated-room MV update for add/remove). PR #145 closed the origin-site MV gap for member.add / member.remove; this PR applies the same local-INBOX + cross-site OUTBOX pattern to the room-creation path so freshly-created rooms appear in user-room-{site} and spotlight-{site} ES indexes immediately, not on the next add/remove operation. room-worker.finishCreateRoom now emits two new publishes: 1. Local origin-site INBOX: chat.inbox.{origin}.member_added carrying every account in subs[] (creator + every auto-enrolled member). Drives the origin site's search-sync-worker MV update. 2. Cross-site OUTBOX per remote site: outbox.{origin}.to.{remote}. member_added carrying only the remote-site accounts (per-dest split). Reuses the existing federation lane PR #145 established for add-members: SubjectTransform rewrites it to chat.inbox.{remote}. aggregate.member_added, which the remote site's search-sync-worker already consumes. No new inbox-worker code, no new event types, no new stream config — just one more publish on a path that already exists. Wire format byte-for-byte identical to PR #145 so parseMemberEvent decodes all member_added events the same way regardless of which path they take. Also: lift outboxDedupID from room-worker's private helper to natsutil.OutboxDedupID — used in 9 call sites on this branch, removes the copy I would have introduced if inbox-worker had needed to publish too. Tests: 3 new unit tests in room-worker (DM local INBOX, channel local INBOX, channel cross-site OUTBOX member_added). Forward-only rollout per spec; no backfill tool. Spec: docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...1-create-room-origin-site-mv-fix-design.md | 298 +++++++----------- pkg/natsutil/request_id.go | 13 + room-worker/handler.go | 83 +++-- room-worker/handler_test.go | 178 +++++++++++ room-worker/integration_test.go | 77 +++-- 5 files changed, 429 insertions(+), 220 deletions(-) diff --git a/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md index b4cce1297..4363549bb 100644 --- a/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md +++ b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md @@ -2,7 +2,7 @@ **Date:** 2026-05-11 **Status:** Draft -**Services:** `room-worker`, `inbox-worker` +**Services:** `room-worker` **Related specs:** - `2026-04-09-room-spotlight-user-room-design.md` (user-room and spotlight collections) - `2026-04-21-search-service-sync-worker-extension-design.md` (search-sync-worker INBOX consumer) @@ -10,9 +10,9 @@ ## Problem -PR #145 closed the origin-site MV gap for `member.add` / `member.remove` operations by adding a local `chat.inbox.{siteID}.member_added` / `member_removed` publish from `room-worker`. That publish drives `search-sync-worker`'s `user-room-{siteID}` and `spotlight-{siteID}` ES indexes. +PR #145 closed the origin-site MV gap for `member.add` / `member.remove` by adding a local `chat.inbox.{siteID}.member_added` / `member_removed` publish from `room-worker`, plus a per-remote-site `outbox.{origin}.to.{remote}.member_added` outbox event that arrives on every federated site's INBOX (via JetStream Sources + SubjectTransform) as `chat.inbox.{remote}.aggregate.member_added`. `search-sync-worker` on each site consumes both lanes and updates its `user-room-{siteID}` and `spotlight-{siteID}` ES indexes. -The room-creation path still has the same gap. When a room is created — channel, DM, or botDM — `room-worker.finishCreateRoom` writes the auto-enrolled `Subscription` rows (creator + DM recipient + every initial channel member) but never publishes a `member_added` event for them. `inbox-worker.handleRoomCreated` has the symmetric gap on remote sites: it upserts the local-side `Subscription` rows for federated rooms but never publishes a `member_added` event for the locally-resolved members. +The room-creation path still has the same gap. `room-worker.finishCreateRoom` writes the auto-enrolled `Subscription` rows (creator + DM recipient + every initial channel member) and emits a per-remote-site `outbox.{origin}.to.{remote}.room_created` for federation — but **never publishes a `member_added` event** on either the origin-INBOX local lane or the cross-site outbox. `search-sync-worker` never sees the create. Result: a freshly-created room is invisible to search until the next add/remove operation re-emits the event. @@ -20,7 +20,7 @@ Result: a freshly-created room is invisible to search until the next add/remove 1. **Spotlight (room typeahead) returns nothing for the new room.** The creator types the room name; `search-sync-worker` has no spotlight doc; no result. 2. **Cross-site message search returns empty for the new room.** CCS terms-lookup against the user's `user-room-{siteID}` doc reports the user as not subscribed; message hits are filtered out as unauthorized. -3. **Self-corrects on churn.** Both indexes catch up on the next `member.add` or `member.remove` against the room (PR #145's publish fires there). Until then, the room is silently invisible to search. +3. **Self-corrects on churn.** Both indexes catch up on the next `member.add` or `member.remove` (PR #145's publish fires). Until then, the room is silently invisible to search. ### Concrete trace @@ -28,147 +28,130 @@ Alice on `s1` creates a channel `r1` with `Orgs: [eng-org]` (org expands to `[bo | Subject | Stream | Effect | |---|---|---| -| `chat.user.{account}.event.subscription.update` × 4 | core | Frontend left-panel updates for alice, bob, charlie, dave (cross-site routes via supercluster) | -| `chat.room.canonical.s1.create` | core (sys-message only, channel-only) | "alice created the room" | -| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | Federation: dave's site receives `room_created`, `inbox-worker.handleRoomCreated` upserts dave's local `Subscription` row | +| `chat.user.{account}.event.subscription.update` × 4 | core | Frontend left-panel updates for alice, bob, charlie, dave | +| `chat.room.canonical.s1.create` (sys-message only, channel-only) | core | "alice created the room" | +| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | `inbox-worker` on s2 mirrors dave's `Subscription` row | -`s1`'s `user-room-s1` index gains zero entries. `s2`'s `user-room-s2` gains zero entries. Alice CCS-querying `r1` from any site fans out to `s1` and `s2` and finds no MV doc for any user → empty result. +`s1`'s `user-room-s1` gains zero entries. `s2`'s `user-room-s2` gains zero entries. Alice CCS-querying `r1` from any site → empty result. ## Goals -- `user-room-{siteID}` and `spotlight-{siteID}` on both the origin site and every federated remote site contain correct entries for every member auto-enrolled at room creation, regardless of room type. -- Fix lives in `room-worker.finishCreateRoom` (origin) and `inbox-worker.handleRoomCreated` (remote) — no new services, no new model types, no new subjects, no changes to `search-sync-worker` or stream config. -- Wire format byte-for-byte compatible with PR #145's existing publishes, so `search-sync-worker/inbox_stream.go::parseMemberEvent` decodes the new events identically. +- `user-room-{siteID}` and `spotlight-{siteID}` on the origin site **and every federated remote site** contain correct entries for every member auto-enrolled at room creation, regardless of room type. +- Fix lives **entirely in `room-worker.finishCreateRoom`** — no changes to `inbox-worker`, `search-sync-worker`, stream config, or any new model types. +- Wire format byte-for-byte compatible with PR #145's existing publishes so `search-sync-worker/inbox_stream.go::parseMemberEvent` decodes all `member_added` events identically (whether create-time or add-member, origin-local or federated-aggregate). ## Non-Goals -- **Backfilling pre-fix rooms.** Forward-only deployment per agreement with team. Rooms created before this fix lands stay missing in their origin/federated MV until any later add/remove churn re-emits the event. Not documenting under "Known Limitations" because the operational expectation is "if the rooms matter, run any add-member operation on them". +- **Backfilling pre-fix rooms.** Forward-only deployment per agreement. Rooms created before this fix lands stay missing in their MV until any later add/remove churn re-emits the event. - **Changing `chat.user.{account}.event.subscription.update` or `chat.room.canonical.{siteID}.create`.** UI fan-out and sys-message paths are correct; not in scope. -- **Refactoring `finishCreateRoom` or `handleRoomCreated`.** Both are narrow helpers; we're adding one publish to each. +- **Refactoring `finishCreateRoom`** beyond the two added publishes. - **Mint-on-create for the room encryption key.** Separate concern, deferred until `ENCRYPTION_ENABLED=true` is required in prod. ## Design -### NATS Subjects +### Why this lives in room-worker alone -No new subjects. PR #145 already exposed: +The cross-site federation path for `member_added` already exists from PR #145: -```go -// chat.inbox.{siteID}.member_added — local lane -subject.InboxMemberAdded(siteID) +``` +outbox.{origin}.to.{remote}.member_added + → (JetStream Sources + SubjectTransform) + → chat.inbox.{remote}.aggregate.member_added + → search-sync-worker on {remote} updates user-room-{remote}/spotlight-{remote} ``` -Both `room-worker` and `inbox-worker` already import `pkg/subject`. +`search-sync-worker` on the remote site already has `chat.inbox.{remote}.aggregate.member_added` in its consumer's `FilterSubjects`. By making `room-worker.finishCreateRoom` emit the **same** outbox event it already emits in `processAddMembers`, we reuse the entire federation lane end-to-end and `search-sync-worker` indexes the new room without any extra hop through `inbox-worker`. -### Wire format +An alternative considered: have `inbox-worker.handleRoomCreated` re-emit a local `chat.inbox.{remote}.member_added` after creating the subs. Rejected because (i) it duplicates federation work `room-worker` already does for add-members; (ii) adds a second hop on the remote side; (iii) requires `inbox-worker.Handler` to grow a `publish` field and a `siteID` field with all the test churn that implies. The symmetric "publish the same outbox events as add-members" path is materially smaller. -The publish wraps `model.MemberAddEvent` in `model.OutboxEvent`, matching PR #145's federated-lane wire format byte-for-byte: +### NATS subjects (all already exist) ```go -inner := model.MemberAddEvent{ - Type: "member_added", - RoomID: room.ID, - RoomName: room.Name, - Accounts: accounts, // see "Accounts list" below - SiteID: room.SiteID, // origin - JoinedAt: req.Timestamp, - HistorySharedSince: nil, // see "HistorySharedSince" below - Timestamp: now.UnixMilli(), -} -outbox := model.OutboxEvent{ - Type: "member_added", - SiteID: room.SiteID, // origin - DestSiteID: room.SiteID, // self — local publish - Payload: mustMarshal(inner), - Timestamp: now.UnixMilli(), -} +// chat.inbox.{siteID}.member_added — origin-site local lane (PR #145 added) +subject.InboxMemberAdded(siteID) + +// outbox.{origin}.to.{destSiteID}.member_added — federation lane (PR #145 added) +subject.Outbox(siteID, destSiteID, model.OutboxMemberAdded) ``` -### Accounts list +### Wire format -For every room type, the `Accounts` slice carries **expanded individual account names**, not org IDs. This matches PR #145's `processAddMembers` behavior; `search-sync-worker`'s per-user MV needs accounts. +Both publishes wrap `model.MemberAddEvent` in `model.OutboxEvent`. The local publish has `SiteID == DestSiteID == originSite` (self-loop convention); the cross-site publish has the per-remote `DestSiteID`. The inner `MemberAddEvent` carries: -| Room type | Accounts source | +| Field | Value | |---|---| -| Channel (with or without orgs) | `subs[].User.Account` — org expansion already happened upstream in `room-worker.processCreateRoomChannel` | -| DM | `subs[].User.Account` — both creator and recipient | -| botDM | `subs[].User.Account` — creator only (the bot account doesn't get a sub by design) | - -In all three cases the source is the same: the `subs []*model.Subscription` already passed to `finishCreateRoom`. - -### HistorySharedSince - -Always `nil` (unrestricted) for the create-time event. No prior history exists at room creation, so "share history since this timestamp" is meaningless — every member sees everything from t=0. Channel restricted-history kicks in only at later add-member events, which are already covered by PR #145. +| `Type` | `model.OutboxMemberAdded` | +| `RoomID` | `room.ID` | +| `RoomName` | `room.Name` (empty for DM/botDM) | +| `Accounts` | Expanded individual accounts (see below) | +| `SiteID` | `room.SiteID` — the origin | +| `JoinedAt` | `req.Timestamp` | +| `HistorySharedSince` | Always `nil` — no prior history at create time | +| `Timestamp` | `now.UnixMilli()` | -### Dedup ID - -Same shape as PR #145's cross-site OUTBOX publishes. Reuse the existing `outboxDedupID` helper: - -```go -payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, requester.Account, req.Timestamp) -dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) -``` +### Accounts list -`outboxDedupID` prefers `X-Request-ID` from `ctx`, falling back to the seed. `destSiteID = room.SiteID` (self-loop) namespaces it so it can't collide with the per-remote-site OUTBOX dedup IDs the same `finishCreateRoom` invocation will emit. +| Publish | `Accounts` source | +|---|---| +| **Origin-local INBOX** (`chat.inbox.{origin}.member_added`) | Every entry in `subs[]` (creator + every auto-enrolled member, including cross-site members for s1's own MV) | +| **Cross-site OUTBOX** (`outbox.{origin}.to.{remote}.member_added`) | Only members whose `SiteID == remote` (per-destination split, matches PR #145's batched outbox shape) | -For `inbox-worker.handleRoomCreated`, the seed mirrors the room-worker side but uses the locally-resolved info: +For channel rooms with `Orgs`, expansion has already happened in `processCreateRoomChannel` before `finishCreateRoom` runs. `subs[]` already carries expanded individual accounts. -```go -payloadSeed := fmt.Sprintf("%s:%s:%d", data.RoomID, data.RequesterAccount, data.Timestamp) -dedupID := outboxDedupID(ctx, h.siteID, payloadSeed) -``` +### Dedup IDs -`X-Request-ID` is preserved across federation by `inbox-worker`'s consumer setup, so the dedup ID is stable across redeliveries. +Reuse `natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed)` with seed `"{roomID}:{requesterAccount}:{timestamp}"`. PR #145 uses the identical seed shape for the add-members path; identical seed at the same `{destSiteID}` is fine because there's exactly one create per room per requester per timestamp. ### End-to-end flow after the fix -For the same `[bob@s1, charlie@s1, dave@s2]` channel-create on `s1`: +For `[bob@s1, charlie@s1, dave@s2]` channel-create on `s1`: | Subject | Stream | Effect | |---|---|---| | `chat.user.{account}.event.subscription.update` × 4 | core | UI fan-out (unchanged) | | `chat.room.canonical.s1.create` (sys-message only) | core | "alice created the room" (unchanged) | -| **`chat.inbox.s1.member_added`** (NEW) | INBOX_s1 (local lane) | s1's `user-room-sync` + `spotlight-sync` → s1's MV/spotlight gain docs for alice + bob + charlie + dave | -| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | SubjectTransform → `chat.inbox.s2.aggregate.room_created` → s2's `inbox-worker.handleRoomCreated` | -| **`chat.inbox.s2.member_added`** (NEW, from inbox-worker) | INBOX_s2 (local lane) | s2's `user-room-sync` + `spotlight-sync` → s2's MV/spotlight gain a doc for dave | +| **`chat.inbox.s1.member_added`** (NEW) | INBOX_s1 (local lane) | s1's `search-sync-worker` updates `user-room-s1` + `spotlight-s1` for alice + bob + charlie + dave | +| `outbox.s1.to.s2.room_created` (existing) | OUTBOX_s1 | s2's `inbox-worker` mirrors dave's `Subscription` row | +| **`outbox.s1.to.s2.member_added`** (NEW) | OUTBOX_s1 → INBOX_s2 aggregate lane | s2's `search-sync-worker` updates `user-room-s2` + `spotlight-s2` for dave | -End state: every site's MV/spotlight indexes have docs for every locally-affected member. CCS terms-lookup queries from any user resolve correctly. +End state: every site's MV/spotlight indexes contain the new room for every locally-affected member. ### Ordering -The local-INBOX publish goes **after** the existing `subscription.update` loop and **before** the per-remote-site OUTBOX loop in `finishCreateRoom`. In `inbox-worker.handleRoomCreated`, the publish goes **after** `BulkCreateSubscriptions` (so the subs are durable before the search-sync event fires). +Both new publishes go inside `finishCreateRoom`: + +- **Origin-local INBOX** publish: after the `subscription.update` loop, before the per-remote-site OUTBOX loop. Same position as PR #145's local INBOX publishes in `processAddMembers`/`processRemoveIndividual`/`processRemoveOrg`. +- **Cross-site OUTBOX `member_added`** publish: inside the existing `for destSiteID, accounts := range remoteSiteAccounts` loop, immediately after the existing `room_created` publish for the same dest site. + +The federation lane delivers `room_created` and `member_added` to the remote site's INBOX in publish order. `inbox-worker` (which consumes the aggregate lane via `FilterSubjects: aggregate.>`) and `search-sync-worker` (whose `FilterSubjects` matches `aggregate.member_added` but not `aggregate.room_created`) operate on disjoint event types, so the order they execute relative to each other doesn't matter — `search-sync-worker` doesn't read MongoDB and doesn't depend on `Subscription` rows existing. ### Idempotency -`outboxDedupID` produces a stable JetStream `Nats-Msg-Id` per (room, requester, timestamp, destSiteID) tuple. JetStream stream-level dedup drops redeliveries within its dedup window (default 2 minutes, configured per stream). Beyond that window, `search-sync-worker`'s Painless last-write-wins guard makes replay idempotent on the ES side. +`natsutil.OutboxDedupID` produces a stable `Nats-Msg-Id` per `(room, requester, timestamp, destSiteID)`. JetStream stream-level dedup drops redeliveries within its dedup window. Beyond that window, `search-sync-worker`'s Painless last-write-wins guard makes ES replay idempotent. ## Code Changes -### Change 1 — `room-worker/handler.go::finishCreateRoom` +### Change 1 — `room-worker/handler.go::finishCreateRoom` (origin-local INBOX publish) -After the existing `subscription.update` loop (line ~1117) and before the per-remote-site OUTBOX loop (line ~1129): +After the existing `subscription.update` loop and channel sys-message publish, before the per-remote-site OUTBOX loop: ```go -// NEW — local INBOX publish for search-sync-worker (origin-site MV update). -// Mirrors PR #145's wire format; see docs/superpowers/specs/ -// 2026-05-11-create-room-origin-site-mv-fix-design.md. accounts := make([]string, 0, len(subs)) for _, sub := range subs { accounts = append(accounts, sub.User.Account) } inner := model.MemberAddEvent{ - Type: "member_added", - RoomID: room.ID, - RoomName: room.Name, - Accounts: accounts, - SiteID: room.SiteID, - JoinedAt: req.Timestamp, - HistorySharedSince: nil, - Timestamp: now.UnixMilli(), + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), } innerData, _ := json.Marshal(inner) outbox := model.OutboxEvent{ - Type: "member_added", + Type: model.OutboxMemberAdded, SiteID: room.SiteID, DestSiteID: room.SiteID, Payload: innerData, @@ -176,134 +159,93 @@ outbox := model.OutboxEvent{ } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) -dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) -if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, dedupID); err != nil { - slog.Error("local inbox member_added publish failed", - "error", err, "roomID", room.ID, "requestID", requestID) +if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, natsutil.OutboxDedupID(ctx, room.SiteID, payloadSeed)); err != nil { + slog.Error("local inbox member_added publish failed", "error", err, "roomID", room.ID, "requestID", requestID) } ``` -`subs []*model.Subscription` is already in scope. +Log-and-continue on publish failure — JetStream redelivery + `search-sync-worker`'s last-write-wins guard handle transient failures self-correctingly. -### Change 2 — `inbox-worker/handler.go::handleRoomCreated` +### Change 2 — `room-worker/handler.go::finishCreateRoom` (cross-site OUTBOX member_added publish) -After the existing `BulkCreateSubscriptions` call (line ~310), before the function returns: +Inside the existing per-remote-site loop, right after the existing `room_created` publish: ```go -// NEW — local INBOX publish for search-sync-worker. Same wire format as -// room-worker.finishCreateRoom; the locally-resolved member set is -// data.Accounts (every account FindUsersByAccounts returned). -accounts := make([]string, 0, len(data.Accounts)) -for _, account := range data.Accounts { - accounts = append(accounts, account) +memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, // per-dest accounts (loop variable) + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), } -inner := model.MemberAddEvent{ - Type: "member_added", - RoomID: data.RoomID, - RoomName: data.RoomName, - Accounts: accounts, - SiteID: data.HomeSiteID, - JoinedAt: data.Timestamp, - HistorySharedSince: nil, - Timestamp: time.Now().UTC().UnixMilli(), -} -innerData, _ := json.Marshal(inner) -outbox := model.OutboxEvent{ - Type: "member_added", - SiteID: h.siteID, - DestSiteID: h.siteID, - Payload: innerData, - Timestamp: inner.Timestamp, +memberData, _ := json.Marshal(memberEvt) +memberEnvelope := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: destSiteID, + Payload: memberData, + Timestamp: now.UnixMilli(), } -outboxData, _ := json.Marshal(outbox) -payloadSeed := fmt.Sprintf("%s:%s:%d", data.RoomID, data.RequesterAccount, data.Timestamp) -dedupID := outboxDedupID(ctx, h.siteID, payloadSeed) -if err := h.publish(ctx, subject.InboxMemberAdded(h.siteID), outboxData, dedupID); err != nil { - slog.Error("local inbox member_added publish failed", - "error", err, "roomID", data.RoomID, "requestID", requestID) +memberOutboxData, _ := json.Marshal(memberEnvelope) +memberSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) +if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), memberOutboxData, natsutil.OutboxDedupID(ctx, destSiteID, memberSeed)); err != nil { + return fmt.Errorf("publish member_added outbox to %s: %w", destSiteID, err) } ``` -`inbox-worker` does not currently import `pkg/subject`; add the import. - -`inbox-worker.Handler` does not currently have a `publish` field or an `outboxDedupID` helper — both will be added: -- `Handler.publish PublishFunc` injected via `NewHandler`, signature `func(ctx, subj, data, msgID) error`. Wired in `inbox-worker/main.go` to `nc.Publish` on the existing `*otelnats.Conn`. -- `outboxDedupID(ctx, destSiteID, payloadSeed)` — copy verbatim from `room-worker/handler.go:39`. Trivially small and avoids creating a new shared package for one helper. +The cross-site publish returns an error on failure (rather than log-and-continue) to match the surrounding `room_created` publish — JetStream NAKs the create, and `room-worker` redelivers from `MESSAGES_{siteID}`. -### Error handling +### Change 3 — `pkg/natsutil/request_id.go::OutboxDedupID` -Both publishes use the log-and-continue pattern from PR #145's existing local-INBOX publishes. The local publish failing must not fail the user-facing create request — JetStream redelivery (federation path) and `search-sync-worker`'s Painless guard (ES path) handle transient failures self-correctingly. Failing the whole create because the search index didn't update would be the wrong trade-off. +Lift `room-worker`'s private `outboxDedupID` to `natsutil.OutboxDedupID` — pure logic, identical at both call sites in `room-worker` and consistent with `natsutil`'s ownership of `RequestIDFromContext` and `NewMsg`. Removes a copy I would otherwise introduce. ### What is NOT changed -- `pkg/subject` — `InboxMemberAdded` already exists. -- `pkg/stream` — `INBOX_{siteID}` already accepts `chat.inbox.{siteID}.*`. -- `pkg/model` — no new types; reuses `OutboxEvent`, `MemberAddEvent`, `RoomCreatedOutbox`. -- `inbox-worker` consumer FilterSubjects — PR #145 already scopes it to `chat.inbox.{siteID}.aggregate.>`, so the new local-lane publish stays out of `inbox-worker`'s own consumption (avoids self-feedback). +- `pkg/subject`, `pkg/stream`, `pkg/model` — no new types/subjects. +- `inbox-worker` — untouched. It continues to consume `aggregate.room_created` for sub creation (its current job) and ignores `aggregate.member_added` for fresh rooms because the BulkCreateSubscriptions path is gated on the room having been created via `aggregate.room_created` first (sub creation happens in `handleRoomCreated`; `handleMemberAdded` adds members to an existing room). + + **Subtle:** if the remote site's `inbox-worker.handleMemberAdded` arrives **before** `handleRoomCreated` for a fresh room (out-of-order delivery), it will try to `BulkCreateSubscriptions` and either (a) the unique index `(roomId, u.account)` rejects the duplicate later when `handleRoomCreated` runs, or (b) the first delivery succeeds and the second is a no-op. Either way the end state is correct because both events carry the same `Accounts` list for the locally-resolved subset. JetStream publish order from `OUTBOX_{origin}` is preserved, so out-of-order delivery is unlikely in practice. + - `search-sync-worker`, `message-worker`, `broadcast-worker`, `history-service` — untouched. ### Diff size estimate -- `room-worker/handler.go`: +~22 lines. -- `inbox-worker/handler.go`: +~28 lines (publish block + import). -- `inbox-worker/main.go`: +~3 lines (wire publish). -- `inbox-worker/handler_test.go` and `inbox-worker/mock_*.go` (if any): updated for `NewHandler` signature. -- Tests: see Testing. +- `room-worker/handler.go`: +~50 lines (two publish blocks). +- `pkg/natsutil/request_id.go`: +~13 lines (new `OutboxDedupID` helper, called from 9 existing sites in `room-worker`). +- `room-worker/handler.go` callers: 9 lines updated to use `natsutil.OutboxDedupID` instead of the private helper; private helper deleted. +- Tests: 3 new unit tests (2 for origin-local INBOX publish: DM + channel; 1 for cross-site OUTBOX member_added). ## Testing -Unit tests only, mirroring PR #145's pattern. Each handler injects `publish` as a field already (room-worker) or after this change (inbox-worker), so tests capture publishes into a slice and assert on the captured entries. +Unit tests only. Handler tests inject `publish` as a field already; tests capture publishes and assert on the entries. ### Unit tests — `room-worker/handler_test.go` -Two new tests against `processCreateRoom*` paths: - -- `TestHandler_processCreateRoom_Channel_PublishesLocalInbox`: - - Mixed-site channel create with orgs and direct accounts. - - Assert exactly one publish to `subject.InboxMemberAdded(room.SiteID)`. - - Decode the payload as `OutboxEvent` → inner `MemberAddEvent`. - - Verify: `OutboxEvent.{Type, SiteID, DestSiteID}` match origin self-loop; inner `Accounts` is the expanded set covering creator + every org-resolved account; inner `HistorySharedSince` is nil; inner `RoomName` matches `room.Name`; `Nats-Msg-Id` matches `outboxDedupID(ctx, siteID, "{rid}:{requester}:{ts}")`. - -- `TestHandler_processCreateRoom_DM_PublishesLocalInbox`: - - DM create across sites (creator local, recipient remote). - - Assert one publish to `subject.InboxMemberAdded(room.SiteID)`. - - Verify `Accounts` contains both creator and recipient; `HistorySharedSince` is nil; `RoomName` is empty (DM convention). - -### Unit tests — `inbox-worker/handler_test.go` - -Two new tests against `handleRoomCreated`: - -- `TestHandler_handleRoomCreated_Channel_PublishesLocalInbox`: - - Channel arrives via federation with two locally-resolved accounts. - - Mock `FindUsersByAccounts` to return both. - - Assert one publish to `subject.InboxMemberAdded(h.siteID)`. - - Verify wire format matches the room-worker side; `Accounts` is the locally-resolved subset; `OutboxEvent.SiteID == DestSiteID == h.siteID`. +- `TestProcessCreateRoom_DM_PublishesLocalInbox`: DM across sites; assert single publish to `subject.InboxMemberAdded(room.SiteID)` with both creator+recipient accounts, `RoomName` empty, `HistorySharedSince` nil, expected `Nats-Msg-Id`. -- `TestHandler_handleRoomCreated_DM_PublishesLocalInbox`: - - DM arrives via federation with one locally-resolved account (the recipient on this site). - - Same assertions, single account in `Accounts`. +- `TestProcessCreateRoom_Channel_PublishesLocalInbox`: channel mixed-site; assert single publish to `subject.InboxMemberAdded(room.SiteID)` with creator + every initial member (same-site + cross-site), expected `Nats-Msg-Id`. -Add a third negative test: -- `TestHandler_handleRoomCreated_EmptyAccounts_NoPublish` — when `data.Accounts` is empty (early return on line 261), assert zero publishes. +- `TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded`: channel with at least one cross-site member; assert single publish to `subject.Outbox(origin, remote, model.OutboxMemberAdded)` carrying only the remote-site accounts, with `DestSiteID == remote`, `RoomName` set, `HistorySharedSince` nil. Confirms the existing `room_created` outbox is still emitted on the same loop. ### Out of scope for new tests -- Integration tests against real NATS / Mongo — not in this PR's scope per agreement (would double diff size for marginal coverage gain). -- `search-sync-worker`'s ES write path — already covered by `search-sync-worker/inbox_integration_test.go` against the federated lane. The local lane uses identical payload shape decoded by the same `parseMemberEvent`, so duplicating that test against the local lane adds little. -- `processAddMembers` / `processRemoveIndividual` / `processRemoveOrg` paths — covered by PR #145's existing tests; this change does not touch them. +- Integration tests against real NATS / Mongo — not in scope (would double diff size for marginal coverage gain). +- `search-sync-worker`'s ES write path — already covered by `search-sync-worker/inbox_integration_test.go` against the aggregate lane. ### Coverage target -Combined unit coverage for the two modified handler functions stays above the 80% project minimum (CLAUDE.md). Spot-check via `go tool cover -func=coverage.out` after implementation. +Combined unit coverage for `finishCreateRoom` stays above the 80% project minimum. ## Rollout Both changes are backward-compatible: -- `room-worker` publishing to `chat.inbox.{site}.member_added` is purely additive. PR #145 already created the subject and `search-sync-worker` already filters on it. -- `inbox-worker` publishing to the same subject is also additive. The `inbox-worker` consumer's `FilterSubjects` (PR #145 / commit `c779ede`) is scoped to `chat.inbox.{siteID}.aggregate.>`, so its own publish stays off its own consumer — no self-feedback loop. +- The origin-local INBOX publish is additive on the local site. +- The cross-site OUTBOX `member_added` publish is additive on the federation lane. PR #145 already established this path for add-members; remote sites' `search-sync-worker` consumers already filter for `aggregate.member_added`. -Recommended: deploy both services in the same release per site so the create-time MV update is symmetric across origin and remote on day one. No coordinated multi-site rollout needed. +No coordinated multi-site rollout needed. Deploy `room-worker` and the rest of the stack normally. ### Per-site verification after deploy @@ -315,11 +257,11 @@ Recommended: deploy both services in the same release per site so the create-tim ## Observability -- **Logs:** new publishes use `slog.Error` log-and-continue. Failure messages: `"local inbox member_added publish failed"` (matches PR #145's exact string for grep parity). Search post-deploy to confirm zero failures. -- **Metrics:** none added. Existing JetStream stream-level metrics on `INBOX_{siteID}` will show throughput on `chat.inbox.{siteID}.member_added` rise from "PR #145's add/remove rate" to "that plus the create rate" — that rise is itself the success signal. -- **Traces:** the new publishes inherit the request context, so OTel trace IDs propagate end-to-end (`room-worker` → INBOX → `search-sync-worker` → ES bulk write all under one trace; `inbox-worker` extends the same trace from the federated arrival side). +- **Logs:** new publishes use `slog.Error` log-and-continue (origin local) or return-error (cross-site OUTBOX, matching surrounding `room_created` publish). Failure message: `"local inbox member_added publish failed"` (origin) or `"publish member_added outbox to %s: %w"` (cross-site). +- **Metrics:** none added. Existing JetStream stream-level metrics on `INBOX_{siteID}` and `OUTBOX_{siteID}` will show throughput on the `member_added` subject rise from "PR #145's add/remove rate" to "that plus the create rate". +- **Traces:** the new publishes inherit the request context, so OTel trace IDs propagate end-to-end (room-worker → INBOX → search-sync-worker → ES bulk write all under one trace). ## Risks -- **Federation duplicate publishes if a future bug routes a `room_created` event back to its origin site.** Today the OUTBOX → INBOX SubjectTransform plus the consumer FilterSubjects exclude the origin from the federated path, so this can't happen. Mitigation: the dedup ID uses `room.SiteID` (origin) on the room-worker side and `h.siteID` (this site) on the inbox-worker side — if a future config bug delivered the origin's own event to itself via federation, the two dedup IDs would still differ (origin vs. self), so JetStream would not collapse them. The `search-sync-worker` Painless last-write-wins guard would still produce the correct end state, but at the cost of one duplicate publish per federation cycle. Acceptable. -- **Stale spec drift if room-worker's create path adds a new sub source.** If a future change adds members to a room outside the `subs []*model.Subscription` slice passed to `finishCreateRoom` (e.g., a parallel "channel preset members" feature), the new INBOX publish would miss them. Mitigation: keep `subs` as the single source of truth for "who got auto-enrolled at create time" — every code path that adds a sub to a freshly-created room must go through `subs` and therefore through the publish. Document this invariant inline as a one-line comment above the publish. +- **Stale spec drift if the create path grows new sub sources.** If a future change adds members to a room outside the `subs []*model.Subscription` slice passed to `finishCreateRoom`, the new INBOX publish would miss them. Mitigation: keep `subs` as the single source of truth for "who got auto-enrolled at create time". +- **Cross-site `member_added` for a brand-new room arriving before `room_created`.** Both events flow through OUTBOX in publish order, so JetStream preserves order on the federated stream — out-of-order delivery on the receiver side is theoretically possible only if `inbox-worker` parallelizes consumers across event types. Today it doesn't. If it ever does, the unique index on `(roomId, u.account)` makes the race idempotent. diff --git a/pkg/natsutil/request_id.go b/pkg/natsutil/request_id.go index e0bc702f0..e01a12b39 100644 --- a/pkg/natsutil/request_id.go +++ b/pkg/natsutil/request_id.go @@ -3,6 +3,7 @@ package natsutil import ( "context" + "log/slog" "github.com/nats-io/nats.go" ) @@ -57,3 +58,15 @@ func NewMsg(ctx context.Context, subj string, data []byte) *nats.Msg { Header: HeaderForContext(ctx), } } + +// OutboxDedupID composes a JetStream Nats-Msg-Id as base+":"+destSiteID. base +// is the X-Request-ID from ctx; falls back to payloadSeed when ctx carries no +// request ID, with a warn log so partial-deployment cases are observable. +func OutboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string { + base := RequestIDFromContext(ctx) + if base == "" { + slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID) + base = payloadSeed + } + return base + ":" + destSiteID +} diff --git a/room-worker/handler.go b/room-worker/handler.go index c17efda4a..81f8ca4b8 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -36,16 +36,6 @@ func NewHandler(store SubscriptionStore, siteID string, publish PublishFunc) *Ha return &Handler{store: store, siteID: siteID, publish: publish} } -// outboxDedupID composes Nats-Msg-Id as base+":"+destSiteID; base is X-Request-ID from ctx, falling back to payloadSeed when absent (partial-deployment safety). -func outboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string { - base := natsutil.RequestIDFromContext(ctx) - if base == "" { - slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID) - base = payloadSeed - } - return base + ":" + destSiteID -} - // messageDedupSeed returns the X-Request-ID from ctx, or payloadSeed when absent (partial-deployment safety, with a warn log). func messageDedupSeed(ctx context.Context, handler, roomID, payloadSeed string) string { if seed := natsutil.RequestIDFromContext(ctx); seed != "" { @@ -236,7 +226,7 @@ func (h *Handler) processRoleUpdate(ctx context.Context, data []byte) error { } outboxSubj := subject.Outbox(h.siteID, user.SiteID, "role_updated") payloadSeed := fmt.Sprintf("%s:%s:%s:%d", req.RoomID, req.Account, req.NewRole, req.Timestamp) - dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed) if err := h.publish(ctx, outboxSubj, outboxData, dedupID); err != nil { return fmt.Errorf("publish outbox: %w", err) } @@ -342,7 +332,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) } @@ -388,7 +378,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp) - dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(h.siteID, user.SiteID, "member_removed"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s: %w", user.SiteID, err) } @@ -484,7 +474,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) } } @@ -539,7 +529,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp) - dedupID := outboxDedupID(ctx, destSiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(h.siteID, destSiteID, "member_removed"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s: %w", destSiteID, err) } @@ -769,7 +759,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, outboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, natsutil.OutboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil { slog.Error("local inbox member_added publish failed", "error", err, "roomID", req.RoomID) } } @@ -830,7 +820,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) - dedupID := outboxDedupID(ctx, destSiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s failed: %w", destSiteID, err) } @@ -1121,6 +1111,34 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq } } + accounts := make([]string, 0, len(subs)) + for _, sub := range subs { + accounts = append(accounts, sub.User.Account) + } + inner := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + HistorySharedSince: nil, + Timestamp: now.UnixMilli(), + } + innerData, _ := json.Marshal(inner) + outbox := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: innerData, + Timestamp: now.UnixMilli(), + } + outboxData, _ := json.Marshal(outbox) + payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, natsutil.OutboxDedupID(ctx, room.SiteID, payloadSeed)); err != nil { + slog.Error("local inbox member_added publish failed", "error", err, "roomID", room.ID, "requestID", requestID) + } + // Task 37: outbox per remote site remoteSiteAccounts := map[string][]string{} for _, u := range allUsers { @@ -1157,6 +1175,35 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxTypeRoomCreated), eData, requestID+":"+destSiteID); err != nil { return fmt.Errorf("publish room_created outbox to %s: %w", destSiteID, err) } + + // Cross-site member_added so the remote site's search-sync-worker + // updates its user-room/spotlight MV — mirrors processAddMembers' + // federation. inbox-worker still consumes the room_created above to + // build correctly-typed Subscription rows; this event only feeds the + // search index. + memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + HistorySharedSince: nil, + Timestamp: now.UnixMilli(), + } + memberData, _ := json.Marshal(memberEvt) + memberEnvelope := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: destSiteID, + Payload: memberData, + Timestamp: now.UnixMilli(), + } + memberOutboxData, _ := json.Marshal(memberEnvelope) + memberSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) + if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), memberOutboxData, natsutil.OutboxDedupID(ctx, destSiteID, memberSeed)); err != nil { + return fmt.Errorf("publish member_added outbox to %s: %w", destSiteID, err) + } } return nil @@ -1439,7 +1486,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req return h.publish(ctx, subject.Outbox(room.SiteID, other.SiteID, model.OutboxTypeRoomCreated), eData, - outboxDedupID(ctx, other.SiteID, payloadSeed), + natsutil.OutboxDedupID(ctx, other.SiteID, payloadSeed), ) } diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index a689fd82f..de8f43c0e 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "slices" + "strconv" "strings" "sync" "testing" @@ -2896,3 +2897,180 @@ func TestHandleSyncCreateDM_IdempotentRecreate_UsesExistingCreatedAt(t *testing. "sub.JoinedAt must reflect existing.CreatedAt on idempotent re-delivery, not retry wall-clock") } } + +type inboxCapturedPublish struct { + subj string + data []byte + msgID string +} + +func captureInboxPublishes() (PublishFunc, func() []inboxCapturedPublish) { + var captured []inboxCapturedPublish + fn := PublishFunc(func(_ context.Context, subj string, data []byte, msgID string) error { + captured = append(captured, inboxCapturedPublish{subj: subj, data: append([]byte(nil), data...), msgID: msgID}) + return nil + }) + return fn, func() []inboxCapturedPublish { return captured } +} + +func findInboxMemberAdded(t *testing.T, captured []inboxCapturedPublish, siteID string) inboxCapturedPublish { + t.Helper() + want := subject.InboxMemberAdded(siteID) + var matches []inboxCapturedPublish + for _, p := range captured { + if p.subj == want { + matches = append(matches, p) + } + } + require.Lenf(t, matches, 1, "expected exactly 1 publish to %s, got %d", want, len(matches)) + return matches[0] +} + +func TestProcessCreateRoom_DM_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := NewMockSubscriptionStore(ctrl) + publish, getCaptured := captureInboxPublishes() + h := &Handler{store: mockStore, publish: publish, siteID: "site-A"} + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + // bob lives on site-B → cross-site DM + other := &model.User{ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-B"} + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().GetUser(gomock.Any(), "bob").Return(other, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-inbox").Return(nil) + + ts := time.Now().UnixMilli() + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-dm-inbox", + RequesterAccount: "alice", + Users: []string{"bob"}, + Timestamp: ts, + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got := findInboxMemberAdded(t, getCaptured(), "site-A") + + var outbox model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outbox)) + assert.Equal(t, "member_added", outbox.Type) + assert.Equal(t, "site-A", outbox.SiteID) + assert.Equal(t, "site-A", outbox.DestSiteID, "self-loop publish: dest must equal origin") + assert.Greater(t, outbox.Timestamp, int64(0)) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outbox.Payload, &inner)) + assert.Equal(t, "member_added", inner.Type) + assert.Equal(t, "room-dm-inbox", inner.RoomID) + assert.Empty(t, inner.RoomName, "DM rooms have no name") + assert.ElementsMatch(t, []string{"alice", "bob"}, inner.Accounts, + "DM INBOX publish must carry both creator and recipient") + assert.Equal(t, "site-A", inner.SiteID) + assert.Nil(t, inner.HistorySharedSince, "HistorySharedSince must be nil at create-time") + + wantMsgID := natsutil.OutboxDedupID(ctx, "site-A", "room-dm-inbox:alice:"+strconv.FormatInt(ts, 10)) + assert.Equal(t, wantMsgID, got.msgID, "Nats-Msg-Id must be natsutil.OutboxDedupID(ctx, originSite, payloadSeed)") +} + +func TestProcessCreateRoom_Channel_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := NewMockSubscriptionStore(ctrl) + publish, getCaptured := captureInboxPublishes() + h := &Handler{store: mockStore, publish: publish, siteID: "site-A"} + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + invited := []model.User{ + {ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-A"}, + {ID: "u_dave", Account: "dave", EngName: "Dave", ChineseName: "戴", SiteID: "site-B"}, + } + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ListNewMembersForNewRoom(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return([]string{"bob", "dave"}, nil) + mockStore.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return(invited, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().BulkCreateRoomMembers(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-ch-inbox").Return(nil) + + ts := time.Now().UnixMilli() + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-ch-inbox", Name: "Mixed", RequesterAccount: "alice", + Users: []string{"bob", "dave"}, Orgs: []string{"org1"}, + ResolvedUsers: []string{"bob", "dave"}, ResolvedOrgs: []string{"org1"}, + Timestamp: ts, + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got := findInboxMemberAdded(t, getCaptured(), "site-A") + + var outbox model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outbox)) + assert.Equal(t, "member_added", outbox.Type) + assert.Equal(t, "site-A", outbox.SiteID) + assert.Equal(t, "site-A", outbox.DestSiteID) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outbox.Payload, &inner)) + assert.Equal(t, "room-ch-inbox", inner.RoomID) + assert.Equal(t, "Mixed", inner.RoomName) + assert.ElementsMatch(t, []string{"alice", "bob", "dave"}, inner.Accounts, + "channel INBOX publish must carry creator + every auto-enrolled member (same-site + cross-site)") + assert.Equal(t, "site-A", inner.SiteID) + assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted regardless of req.History") + + wantMsgID := natsutil.OutboxDedupID(ctx, "site-A", "room-ch-inbox:alice:"+strconv.FormatInt(ts, 10)) + assert.Equal(t, wantMsgID, got.msgID) +} + +func TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded(t *testing.T) { + h, mockStore, getPublished := newCreateRoomTestHandler(t) + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + invited := []model.User{ + {ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-B"}, + } + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ListNewMembersForNewRoom(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{"bob"}, nil) + mockStore.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return(invited, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().BulkCreateRoomMembers(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-ch-xsite").Return(nil) + + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-ch-xsite", Name: "Cross", RequesterAccount: "alice", + Users: []string{"bob"}, Orgs: []string{"org1"}, + ResolvedUsers: []string{"bob"}, ResolvedOrgs: []string{"org1"}, + Timestamp: time.Now().UnixMilli(), + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + memberAddedOutbox := outboxFor(getPublished(), "site-B", model.OutboxMemberAdded) + require.Len(t, memberAddedOutbox, 1, + "finishCreateRoom must emit outbox.{origin}.to.{remote}.member_added alongside room_created so the remote site's search-sync-worker updates its MV") + + var envelope model.OutboxEvent + require.NoError(t, json.Unmarshal(memberAddedOutbox[0].data, &envelope)) + assert.Equal(t, model.OutboxMemberAdded, envelope.Type) + assert.Equal(t, "site-A", envelope.SiteID) + assert.Equal(t, "site-B", envelope.DestSiteID) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(envelope.Payload, &inner)) + assert.Equal(t, "room-ch-xsite", inner.RoomID) + assert.Equal(t, "Cross", inner.RoomName) + assert.Equal(t, []string{"bob"}, inner.Accounts, "carries only the remote-site accounts, mirroring processAddMembers") + assert.Equal(t, "site-A", inner.SiteID, "inner SiteID is the origin (room's home)") + assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted") + + // Sanity: the existing room_created outbox is still emitted on the same loop. + roomCreatedOutbox := outboxFor(getPublished(), "site-B", model.OutboxTypeRoomCreated) + require.Len(t, roomCreatedOutbox, 1, "room_created outbox path unchanged") +} diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index 6a95becab..746e0b9d6 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -642,17 +642,17 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Equal(t, "site-A", s.SiteID, "sub %s siteID", s.User.Account) } - // Filter outbox publishes per destination site. - pubsB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", "")) - pubsC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", "")) - pubsA := cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")) - require.Len(t, pubsB, 1, "exactly one outbox to site-B") - require.Len(t, pubsC, 1, "exactly one outbox to site-C") - assert.Empty(t, pubsA, "no outbox to home site-A") - - // Site-B payload assertions. + assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")), + "no outbox to home site-A") + + // room_created outboxes — one per remote site. + createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) + createdC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxTypeRoomCreated)) + require.Len(t, createdB, 1, "exactly one room_created outbox to site-B") + require.Len(t, createdC, 1, "exactly one room_created outbox to site-C") + var envB model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsB[0].data, &envB)) + require.NoError(t, json.Unmarshal(createdB[0].data, &envB)) var payloadB model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(envB.Payload, &payloadB)) assert.ElementsMatch(t, []string{"bob", "carol"}, payloadB.Accounts) @@ -660,19 +660,37 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Equal(t, "deal team", payloadB.RoomName) assert.Equal(t, "site-A", payloadB.HomeSiteID) assert.Equal(t, "alice", payloadB.RequesterAccount) - assert.Equal(t, reqID+":site-B", pubsB[0].msgID) + assert.Equal(t, reqID+":site-B", createdB[0].msgID) - // Site-C payload assertions. var envC model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsC[0].data, &envC)) + require.NoError(t, json.Unmarshal(createdC[0].data, &envC)) var payloadC model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(envC.Payload, &payloadC)) assert.ElementsMatch(t, []string{"ian"}, payloadC.Accounts) - assert.Equal(t, model.RoomTypeChannel, payloadC.RoomType) - assert.Equal(t, "deal team", payloadC.RoomName) - assert.Equal(t, "site-A", payloadC.HomeSiteID) - assert.Equal(t, "alice", payloadC.RequesterAccount) - assert.Equal(t, reqID+":site-C", pubsC[0].msgID) + assert.Equal(t, reqID+":site-C", createdC[0].msgID) + + // member_added outboxes — one per remote site (search-sync-worker federation). + memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) + memberC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxMemberAdded)) + require.Len(t, memberB, 1, "exactly one member_added outbox to site-B") + require.Len(t, memberC, 1, "exactly one member_added outbox to site-C") + + var memberEnvB model.OutboxEvent + require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnvB)) + var memberPayloadB model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnvB.Payload, &memberPayloadB)) + assert.ElementsMatch(t, []string{"bob", "carol"}, memberPayloadB.Accounts) + assert.Equal(t, "deal team", memberPayloadB.RoomName) + assert.Equal(t, "site-A", memberPayloadB.SiteID) + assert.Nil(t, memberPayloadB.HistorySharedSince) + assert.Equal(t, reqID+":site-B", memberB[0].msgID) + + var memberEnvC model.OutboxEvent + require.NoError(t, json.Unmarshal(memberC[0].data, &memberEnvC)) + var memberPayloadC model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnvC.Payload, &memberPayloadC)) + assert.ElementsMatch(t, []string{"ian"}, memberPayloadC.Accounts) + assert.Equal(t, reqID+":site-C", memberC[0].msgID) } func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { @@ -712,14 +730,14 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.Equal(t, "site-A", s.SiteID, "sub %s siteID", s.User.Account) } - // Only one outbox publish, to site-B. - pubsB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", "")) - require.Len(t, pubsB, 1) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", ""))) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", ""))) + // room_created outbox to the recipient's site. + createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) + require.Len(t, createdB, 1) var env model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsB[0].data, &env)) + require.NoError(t, json.Unmarshal(createdB[0].data, &env)) var payload model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, model.RoomTypeDM, payload.RoomType) @@ -727,7 +745,18 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.ElementsMatch(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, "site-A", payload.HomeSiteID) - assert.Equal(t, reqID+":site-B", pubsB[0].msgID) + assert.Equal(t, reqID+":site-B", createdB[0].msgID) + + // member_added outbox (search-sync-worker federation). + memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) + require.Len(t, memberB, 1) + var memberEnv model.OutboxEvent + require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnv)) + var memberPayload model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnv.Payload, &memberPayload)) + assert.ElementsMatch(t, []string{"bob"}, memberPayload.Accounts) + assert.Equal(t, "site-A", memberPayload.SiteID) + assert.Equal(t, reqID+":site-B", memberB[0].msgID) } func TestProcessAddMembers_OutboxPerRemoteSite(t *testing.T) { @@ -913,7 +942,7 @@ func TestProcessAddMembers_PublishesLocalInbox_Integration(t *testing.T) { assert.ElementsMatch(t, []string{"charlie", "bob"}, inner.Accounts, "local INBOX must carry full add set — same-site (charlie) + remote (bob)") assert.Equal(t, reqID+":site-A", pubs[0].msgID, - "Nats-Msg-Id must be outboxDedupID(ctx, originSite, payloadSeed) so JetStream dedups self-loop replays") + "Nats-Msg-Id must be natsutil.OutboxDedupID(ctx, originSite, payloadSeed) so JetStream dedups self-loop replays") } func TestProcessRemoveIndividual_PublishesLocalInbox_Integration(t *testing.T) { From 4aa9606d957345a581f3a31cca183490cadb5339 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 10:23:54 +0000 Subject: [PATCH 3/3] fix(stream): default consumer DeliverPolicy to All instead of New MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DurableConsumerDefaults from PR #168 hardcoded DeliverPolicy=New for every durable JetStream consumer. That's wrong for our workload: - search-sync-worker's user-room-sync and spotlight-sync rebuild ES indexes from the INBOX stream. With DeliverNew, a fresh durable (new deploy, new site, deleted-and-recreated durable) starts at HEAD and the MV is permanently missing every historical event. - inbox-worker handles cross-site federated arrivals. With DeliverNew, any catch-up after a stream-side gap is lost — the local state diverges from the remote OUTBOX silently. - broadcast-worker / message-worker / notification-worker / message-gatekeeper / room-worker: same risk if a durable ever needs to be recreated from scratch. Flip the project-wide invariant to DeliverAll. For streams with no historical data (steady-state new sites) All and New are equivalent; for any catch-up or rebuild scenario All is the only correct choice. DeliverPolicy is only honored at consumer creation, so existing durables in prod are unaffected — this only changes behavior for new consumers (new deploys, new sites, durables deleted and recreated). Updates the doc comment on DurableConsumerDefaults and the eight consumer_config_test.go invariant assertions. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- broadcast-worker/consumer_config_test.go | 2 +- inbox-worker/consumer_config_test.go | 2 +- message-gatekeeper/consumer_config_test.go | 2 +- message-worker/consumer_config_test.go | 2 +- notification-worker/consumer_config_test.go | 2 +- pkg/stream/consumer.go | 10 ++++++++-- pkg/stream/consumer_test.go | 4 ++-- room-worker/consumer_config_test.go | 2 +- search-sync-worker/consumer_config_test.go | 2 +- 9 files changed, 17 insertions(+), 11 deletions(-) diff --git a/broadcast-worker/consumer_config_test.go b/broadcast-worker/consumer_config_test.go index cf4411769..131a138b3 100644 --- a/broadcast-worker/consumer_config_test.go +++ b/broadcast-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/inbox-worker/consumer_config_test.go b/inbox-worker/consumer_config_test.go index 633d476a7..b756ca369 100644 --- a/inbox-worker/consumer_config_test.go +++ b/inbox-worker/consumer_config_test.go @@ -29,7 +29,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/message-gatekeeper/consumer_config_test.go b/message-gatekeeper/consumer_config_test.go index 5c5d4f8ad..a9335f6dd 100644 --- a/message-gatekeeper/consumer_config_test.go +++ b/message-gatekeeper/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/message-worker/consumer_config_test.go b/message-worker/consumer_config_test.go index b78260b79..a36ee9645 100644 --- a/message-worker/consumer_config_test.go +++ b/message-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/notification-worker/consumer_config_test.go b/notification-worker/consumer_config_test.go index f62860097..2da32ed9f 100644 --- a/notification-worker/consumer_config_test.go +++ b/notification-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 66996e9d4..a0dfca149 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -21,18 +21,24 @@ type ConsumerSettings struct { // DurableConsumerDefaults returns a ConsumerConfig populated from the // supplied ConsumerSettings plus the project-wide architectural -// invariants (AckPolicy=Explicit, DeliverPolicy=New). +// invariants (AckPolicy=Explicit, DeliverPolicy=All). // // Callers MUST set Durable. Callers MAY set FilterSubjects to scope the // consumer to a subset of the stream's subjects. // +// DeliverPolicy=All so a freshly-created durable (new deploy, new site, +// or a deleted-and-recreated durable) replays the stream from the start. +// search-sync-worker's MV rebuild and inbox-worker's federated catch-up +// both depend on this; for streams with no historical data (steady-state +// new sites) All and New are equivalent. +// // DeliverPolicy is honored only at consumer creation. Updating an // existing durable via js.CreateOrUpdateConsumer does not reset its // cursor position. func DurableConsumerDefaults(s ConsumerSettings) jetstream.ConsumerConfig { return jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, - DeliverPolicy: jetstream.DeliverNewPolicy, + DeliverPolicy: jetstream.DeliverAllPolicy, AckWait: s.AckWait, MaxDeliver: s.MaxDeliver, MaxWaiting: s.MaxWaiting, diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 737e5db1f..7ba8a103e 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -23,7 +23,7 @@ func TestDurableConsumerDefaults(t *testing.T) { cc := stream.DurableConsumerDefaults(s) assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy, "AckPolicy invariant") - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy, "DeliverPolicy invariant") + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy, "DeliverPolicy invariant") assert.Equal(t, 45*time.Second, cc.AckWait) assert.Equal(t, 3, cc.MaxDeliver) assert.Equal(t, 256, cc.MaxWaiting) @@ -37,7 +37,7 @@ func TestDurableConsumerDefaults(t *testing.T) { cc := stream.DurableConsumerDefaults(stream.ConsumerSettings{}) assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) assert.Zero(t, cc.AckWait) assert.Zero(t, cc.MaxDeliver) assert.Zero(t, cc.MaxWaiting) diff --git a/room-worker/consumer_config_test.go b/room-worker/consumer_config_test.go index 755f7d0f7..682db5a24 100644 --- a/room-worker/consumer_config_test.go +++ b/room-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/search-sync-worker/consumer_config_test.go b/search-sync-worker/consumer_config_test.go index 877ea7402..3349b118d 100644 --- a/search-sync-worker/consumer_config_test.go +++ b/search-sync-worker/consumer_config_test.go @@ -59,7 +59,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) } })