From 499a42ba253b6e789b46170af9c1983a912e6540 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 23:56:48 +0000 Subject: [PATCH 1/4] docs(spec): consolidate cross-site room-creation federation event Drop the redundant outbox.{origin}.to.{remote}.room_created event in favor of the existing outbox.{origin}.to.{remote}.member_added event (added in PR #169) doing double duty: drive sub creation in inbox-worker AND MV update in search-sync-worker, mirroring the add-members path which already works that way since PR #145. Extend MemberAddEvent with RoomType + RequesterAccount so inbox-worker.handleMemberAdded can build correctly-shaped DM/botDM subs via the existing helpers (subscriptionName / rolesForType / subscriptionIsSubscribed) instead of needing a separate handleRoomCreated path. Full removal of room_created event, model, handler, and tests. Incidental benefit: heals a latent search-sync-worker bug where the spotlight ES doc's roomType field has been empty since PR #145 because today's MemberAddEvent wire format doesn't carry RoomType. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...nsolidate-room-create-federation-design.md | 263 ++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md diff --git a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md new file mode 100644 index 000000000..ed400ff9c --- /dev/null +++ b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md @@ -0,0 +1,263 @@ +# Consolidate Cross-Site Room-Creation Federation Event — Design + +**Date:** 2026-05-12 +**Status:** Draft +**Services:** `room-worker`, `inbox-worker`, `search-sync-worker` (test-only) +**Related specs:** +- `2026-05-01-federated-room-origin-site-mv-fix-design.md` (PR #145 — added cross-site `member_added` for add-members) +- `2026-05-11-create-room-origin-site-mv-fix-design.md` (PR #169 — added cross-site `member_added` for room creation, leaving `room_created` in place) + +## Problem + +Today every room-creation publishes two cross-site OUTBOX events per remote site: + +| Event | Consumer on remote site | Purpose | +|---|---|---| +| `outbox.{origin}.to.{remote}.room_created` | `inbox-worker.handleRoomCreated` | Build correctly-shaped `Subscription` rows (DM/botDM/channel) | +| `outbox.{origin}.to.{remote}.member_added` | `search-sync-worker` (MV update) and `inbox-worker.handleMemberAdded` (dup-key no-op for fresh rooms) | Search index update | + +The redundancy is a layering artifact: PR #142 (Vinayak, `feat(create-room): DM, botDM, and channel creation across sites`) introduced `room_created` as the cross-site federation event before the `member_added` lane existed end-to-end. PR #145 then added the `member_added` cross-site event for the add-members path, and PR #169 extended `member_added` to the create path. At that point `room_created` became redundant for everything except the per-room-type sub-shape logic inside `inbox-worker.handleMemberAdded`, which is hardcoded to `RoomType: channel`. + +### Two concrete consequences + +1. **2× outbound publish per remote site per room creation.** `room-worker.finishCreateRoom` emits both events for every remote member's site. +2. **Latent search bug in `search-sync-worker/spotlight.go`**: it writes `RoomType: string(evt.RoomType)` into the spotlight ES doc, but today's `MemberAddEvent` wire payload doesn't carry `RoomType` — so for every room indexed via `member_added` since PR #145, the spotlight doc's `roomType` field has been the empty string. Room typeahead UI loses type info. This consolidation incidentally heals the bug because room-worker will start populating `RoomType` on every `member_added` publish. + +## Goals + +- One cross-site federation event per room creation. `member_added` does double duty: drives sub creation in `inbox-worker` and MV updates in `search-sync-worker`. +- `inbox-worker.handleMemberAdded` builds the right `Subscription` shape for every room type (channel, DM, botDM), reusing the existing per-type helpers (`subscriptionName`, `rolesForType`, `subscriptionIsSubscribed`) that `handleRoomCreated` uses today. +- Full removal of the `room_created` event type, its model, its handler, and its tests. No shim, no transition flag. +- No subject or stream changes — same wire lanes, fewer wire formats. + +## Non-Goals + +- **Backward compatibility for cross-tree consumers of `RoomCreatedOutbox` / `OutboxTypeRoomCreated`.** Grep confirms no out-of-tree consumer exists today. +- **Bridging in-flight `room_created` events during the deploy window.** Both ends ship in the same release per site; any straggler federated event arriving at a new `inbox-worker` pod hits the existing `default` case in the event-type switch and logs `"unknown event type, skipping"`. The corresponding `member_added` for the same room arrives moments later and does the right thing. +- **Refactoring `MemberRemoveEvent`** the same way. Remove is already type-agnostic in `inbox-worker.handleMemberRemoved`; only `member_added` needs the per-type sub-shape logic. +- **Search-sync-worker code changes.** Zero structural change required. One regression test added to lock in the latent-bug fix. + +## Design + +### Wire format + +Extend `model.MemberAddEvent` with two `omitempty` fields: + +```go +type MemberAddEvent struct { + Type string `json:"type"` + RoomID string `json:"roomId"` + RoomName string `json:"roomName"` + Accounts []string `json:"accounts"` + SiteID string `json:"siteId"` + JoinedAt int64 `json:"joinedAt"` + HistorySharedSince *int64 `json:"historySharedSince,omitempty"` + Timestamp int64 `json:"timestamp"` + + // NEW + RoomType RoomType `json:"roomType,omitempty"` + RequesterAccount string `json:"requesterAccount,omitempty"` +} +``` + +- `RoomType`: drives `inbox-worker.handleMemberAdded`'s sub-shape branch and `search-sync-worker.spotlight.go`'s ES doc population. Zero value (`""`) is treated as `RoomTypeChannel` for backward compatibility with any in-flight publishes during deploy. +- `RequesterAccount`: only meaningful when `RoomType` is `DM` or `BotDM` — used by `subscriptionName` to compute the counterpart account for the recipient sub's `Name`. Empty/unused for channels. + +The two added fields are `omitempty` so wire payloads from older publishers and from add-member operations (which don't need the fields) stay unchanged. + +### Publisher side — `room-worker/handler.go` + +Two publish call sites populate the new fields: + +1. **`finishCreateRoom`** — the per-remote-site `member_added` OUTBOX publish (added in #169) sets `RoomType: room.Type` and `RequesterAccount: requester.Account`. The origin-local INBOX publish does the same for consistency (search-sync-worker on the origin reads it). +2. **`finishCreateRoom`** — delete the per-remote-site `room_created` OUTBOX publish entirely (the block constructing `RoomCreatedOutbox`). +3. **`processAddMembers`** — both publish sites (local INBOX + per-remote-site OUTBOX) get `RoomType: room.Type` and `RequesterAccount: req.RequesterAccount`. Channels only, so `RequesterAccount` won't actually be read on the consumer side, but keeping it consistent across all `member_added` emissions reduces special-case surface in publisher code. + +### Consumer side — `inbox-worker/handler.go` + +`handleMemberAdded` dispatches on `event.RoomType` and uses the existing helpers, refactored to take primitives so they're callable from both old-and-removed `handleRoomCreated` and the new `handleMemberAdded` path: + +**Before** (lives in handler.go around line 227): +```go +func subscriptionName(d *model.RoomCreatedOutbox, u *model.User) string { ... } +func subscriptionIsSubscribed(d *model.RoomCreatedOutbox, u *model.User) bool { ... } +``` + +**After** (primitives, no struct dependency): +```go +func subscriptionName(roomType model.RoomType, roomName, requesterAccount string, u *model.User) string { ... } +func subscriptionIsSubscribed(roomType model.RoomType, u *model.User) bool { ... } +``` + +`rolesForType(t model.RoomType) []Role` already takes a primitive — unchanged. + +`handleMemberAdded` post-refactor: + +```go +func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) error { + var event model.MemberAddEvent + if err := json.Unmarshal(evt.Payload, &event); err != nil { + return fmt.Errorf("unmarshal member_added payload: %w", err) + } + + roomType := event.RoomType + if roomType == "" { + roomType = model.RoomTypeChannel // backward-compat for older publishers + } + + users, err := h.store.FindUsersByAccounts(ctx, event.Accounts) + if err != nil { + return fmt.Errorf("find users by accounts: %w", err) + } + userMap := make(map[string]model.User, len(users)) + for i := range users { + userMap[users[i].Account] = users[i] + } + + joinedAt := time.UnixMilli(event.JoinedAt).UTC() + var historySharedSince *time.Time + if event.HistorySharedSince != nil && *event.HistorySharedSince > 0 { + t := time.UnixMilli(*event.HistorySharedSince).UTC() + historySharedSince = &t + } + + subs := make([]*model.Subscription, 0, len(event.Accounts)) + for _, account := range event.Accounts { + u, ok := userMap[account] + if !ok { + slog.Warn("user not found for account", "account", account) + continue + } + sub := &model.Subscription{ + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: u.ID, Account: u.Account}, + RoomID: event.RoomID, + RoomType: roomType, + SiteID: event.SiteID, + Roles: rolesForType(roomType), + Name: subscriptionName(roomType, event.RoomName, event.RequesterAccount, &u), + IsSubscribed: subscriptionIsSubscribed(roomType, &u), + HistorySharedSince: historySharedSince, + JoinedAt: joinedAt, + } + subs = append(subs, sub) + } + + if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + return fmt.Errorf("bulk create subscriptions: %w", err) + } + return nil +} +``` + +The old `handleRoomCreated` function is **deleted**. The `case model.MessageTypeRoomCreated:` arm in `HandleEvent`'s switch is also deleted. + +### Removal sweep + +- `pkg/model/event.go`: + - Delete `type RoomCreatedOutbox struct`. + - Delete `OutboxTypeRoomCreated` constant (cross-site OUTBOX event type). + - **Do not delete** `MessageTypeRoomCreated` constant — it's a distinct const for the channel-create system-message type ("alice created the room"), used by `room-worker`'s `publishChannelSysMessages` and unrelated to federation. The two consts happen to share the string value `"room_created"` but live on different code paths. +- `inbox-worker/handler.go`: + - Delete `handleRoomCreated`. + - Delete the `case model.MessageTypeRoomCreated:` arm. + - Refactor `subscriptionName` / `subscriptionIsSubscribed` to take primitives. +- `inbox-worker/handler_test.go`: delete `TestHandleRoomCreated*` cases (5 of them per grep). Replace with new `TestHandleMemberAdded_DM*` / `TestHandleMemberAdded_BotDM*` cases. +- `inbox-worker/integration_test.go`: delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub` (2 cases). Replace with `TestHandleMemberAddedDM_PersistsCorrectShape_Integration`. +- `room-worker/handler.go::finishCreateRoom`: delete the per-remote-site `room_created` publish block. Add `RoomType` + `RequesterAccount` to the existing `member_added` publishes (both local-INBOX and cross-site OUTBOX). +- `room-worker/handler.go::processAddMembers`: add `RoomType` + `RequesterAccount` to all three publish sites (local INBOX, cross-site OUTBOX, the older add-members publish if separate). +- `room-worker/handler_test.go`: assert `RoomType` is populated on every `member_added` publish capture; delete any test that asserted on the `room_created` outbox publishes. +- `room-worker/integration_test.go`: `TestProcessCreateRoomChannel_OutboxPerRemoteSite` and `TestProcessCreateRoomDM_OutboxToCounterpartSite` updated to assert only one outbox per dest site (the `member_added`), drop the `room_created` assertions. + +### Search-sync-worker impact + +Zero structural change. `model.InboxMemberEvent` (the struct `search-sync-worker.parseMemberEvent` decodes into) already has `RoomType` — it's the publisher side that didn't populate it. Once `room-worker` starts setting `RoomType` on every `member_added` publish, the existing `string(evt.RoomType)` write at `spotlight.go:120` produces the correct value for the first time. + +One regression test added to `search-sync-worker/spotlight_test.go`: + +- `TestSpotlightCollection_BuildAction_PopulatesRoomType` — builds an `InboxMemberEvent` with `RoomType: model.RoomTypeChannel`, runs `BuildAction`, asserts the resulting ES doc carries `roomType: "channel"`. Pure regression guard against the publishers reverting to empty `RoomType`. + +### Idempotency + +No change. The unique index on `subscriptions.(roomId, u.account)` already handles concurrent or redelivered creates idempotently. The dedup-key fall-through fix from PR #169 (CodeRabbit's catch) still applies — `mongo.IsDuplicateKeyError` is swallowed and execution continues so search-sync-worker's MV update still fires on replays. + +Wait — that fix is in `handleRoomCreated`. After deleting `handleRoomCreated`, the dup-key handling needs to be present in `handleMemberAdded` too. Today `handleMemberAdded` returns the bulk-create error on any failure, including dup-key: + +```go +if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + return fmt.Errorf("bulk create subscriptions: %w", err) +} +``` + +Apply PR #169's fix here as well: treat `mongo.IsDuplicateKeyError` as idempotent and continue (no publish to fall through to in this handler, but the explicit nil return matches the intent and prevents JetStream nak-loops). + +```go +if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + if !mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("bulk create subscriptions: %w", err) + } +} +``` + +## Testing + +Unit only, per spec. + +### `inbox-worker/handler_test.go` + +New tests covering per-room-type sub shape via `handleMemberAdded`: + +- `TestHandleMemberAdded_Channel_BuildsChannelSub` (regression — covers the previous behavior, with `RoomType` field set explicitly on the event). +- `TestHandleMemberAdded_Channel_DefaultsWhenRoomTypeEmpty` — backward-compat: event with empty `RoomType` is treated as channel (older publishers). +- `TestHandleMemberAdded_DM_BuildsRecipientSubWithCounterpartName` — asserts `Name = RequesterAccount`, `Roles = nil`, `IsSubscribed = false`. +- `TestHandleMemberAdded_BotDM_BuildsBotSub` — asserts `IsSubscribed` depends on `isBot(u.Account)`. +- `TestHandleMemberAdded_DuplicateKey_IsIdempotent` — bulk-create returns dup-key; handler returns nil. + +Delete `TestHandleRoomCreated*` cases (5 total per current `grep`). + +### `inbox-worker/integration_test.go` + +Delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub`. Replace with `TestHandleMemberAddedDM_PersistsCorrectShape_Integration` — exercises the full DM flow against real Mongo: `member_added` event with `RoomType=DM`, asserts the persisted Subscription row has `Name = RequesterAccount`, `Roles = nil`, `IsSubscribed = false`. + +### `room-worker/handler_test.go` + +Update existing PR #169 tests to assert `RoomType` and `RequesterAccount` are populated on the captured publishes. New test: + +- `TestProcessCreateRoom_DM_OutboxCarriesRoomTypeAndRequester` — DM create across sites, captures the cross-site `member_added` publish, asserts inner `RoomType = DM` and `RequesterAccount = alice`. + +### `room-worker/integration_test.go` + +- `TestProcessCreateRoomChannel_OutboxPerRemoteSite` — drop the `room_created` outbox assertions. Assert exactly one outbox per dest site (`member_added`), with `RoomType=channel` on the payload. +- `TestProcessCreateRoomDM_OutboxToCounterpartSite` — same shape. Assert `RoomType=DM`, `RequesterAccount=alice` on the payload. + +### `search-sync-worker/spotlight_test.go` + +- `TestSpotlightCollection_BuildAction_PopulatesRoomType` (NEW): builds an `InboxMemberEvent` with `RoomType: model.RoomTypeChannel`, runs `BuildAction`, asserts the resulting ES doc carries `roomType: "channel"`. Locks in the consolidation's latent-bug fix. + +## Rollout + +Both publisher and consumer changes ship in the same release per site: + +1. `room-worker` stops publishing `room_created`, starts publishing `member_added` with `RoomType` + `RequesterAccount` populated. +2. `inbox-worker` deletes the `room_created` switch arm + handler; `handleMemberAdded` learns to build DM/botDM-shaped subs. + +In-flight `room_created` events at deploy boundary land at new `inbox-worker` pods → hit the existing `default` case in the event-type switch → `slog.Warn("unknown event type, skipping")` + ack. The matching `member_added` for the same room arrives moments later (same publisher, same OUTBOX, same federation lane) and creates the subs. No correctness loss; one warn log per straggler. + +### Per-site verification after deploy + +1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. +2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. +3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). +4. `nats stream info OUTBOX_{site}` should show member_added events flowing at room-creation rate; room_created subject should report zero new messages. + +## Observability + +- **Logs**: one new `slog.Warn` per straggler `room_created` event during the deploy window — expected, transient. After deploy completes, zero. +- **Metrics**: existing JetStream metrics on `OUTBOX_{site}` will show `member_added` subject throughput rise (it now carries create-time too), `room_created` subject throughput drop to zero. +- **Traces**: unchanged — `member_added` was already on the trace path for create via PR #169. + +## Risks + +- **Federated event missing `RoomType` during deploy window.** Older publishers (pre-deploy) emit `member_added` without `RoomType`. The `roomType == ""` → `RoomTypeChannel` defaulting in `handleMemberAdded` keeps channel-create-and-add-member paths correct. The only risk is a pre-deploy DM federation: an old publisher sends `room_created` (which the new consumer drops as unknown) but no `member_added` for DMs (because the old publisher doesn't emit `member_added` for DMs at all — that's net-new in this PR). Mitigation: deploy `room-worker` first so all DM creates after that point emit the new event. +- **Search-sync-worker reading old InboxMemberEvent payloads from before the publisher rolled out.** Spotlight docs indexed during the window keep their empty `roomType` until the next event touches the room. Acceptable — consistent with the pre-deploy state, and the next add/remove on the room re-emits with the correct type. +- **Removal of `RoomCreatedOutbox` is a one-way ratchet.** No out-of-tree consumer exists today (`grep` confirms). If we ever want to reintroduce the dedicated event, the cost is just adding the type back; the lane is unchanged. From 93e904eb943b0d18eb75892f5ab6f6799398a983 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 00:05:28 +0000 Subject: [PATCH 2/4] docs(spec): address CodeRabbit review on federation consolidation - Line 160: "consts" -> "constants" (style) - Line 208: "sub shape" -> "sub-shape" (hyphenation) - Rollout section: rewrite to honestly document the no-fully-safe-single-PR-deploy-order issue CodeRabbit flagged. Walks both deploy orders showing the malformed-DM window each produces. Presents three options (A: ship as-is, B: 2-PR split, C: single PR + follow-up cleanup) with a recommendation for option C. Marks this as an open question requiring user input before implementation begins. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...nsolidate-room-create-federation-design.md | 61 ++++++++++++++++--- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md index ed400ff9c..136150ba3 100644 --- a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md +++ b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md @@ -157,7 +157,7 @@ The old `handleRoomCreated` function is **deleted**. The `case model.MessageType - `pkg/model/event.go`: - Delete `type RoomCreatedOutbox struct`. - Delete `OutboxTypeRoomCreated` constant (cross-site OUTBOX event type). - - **Do not delete** `MessageTypeRoomCreated` constant — it's a distinct const for the channel-create system-message type ("alice created the room"), used by `room-worker`'s `publishChannelSysMessages` and unrelated to federation. The two consts happen to share the string value `"room_created"` but live on different code paths. + - **Do not delete** `MessageTypeRoomCreated` constant — it's a distinct constant for the channel-create system-message type ("alice created the room"), used by `room-worker`'s `publishChannelSysMessages` and unrelated to federation. The two constants happen to share the string value `"room_created"` but live on different code paths. - `inbox-worker/handler.go`: - Delete `handleRoomCreated`. - Delete the `case model.MessageTypeRoomCreated:` arm. @@ -205,7 +205,7 @@ Unit only, per spec. ### `inbox-worker/handler_test.go` -New tests covering per-room-type sub shape via `handleMemberAdded`: +New tests covering per-room-type sub-shape via `handleMemberAdded`: - `TestHandleMemberAdded_Channel_BuildsChannelSub` (regression — covers the previous behavior, with `RoomType` field set explicitly on the event). - `TestHandleMemberAdded_Channel_DefaultsWhenRoomTypeEmpty` — backward-compat: event with empty `RoomType` is treated as channel (older publishers). @@ -236,19 +236,64 @@ Update existing PR #169 tests to assert `RoomType` and `RequesterAccount` are po ## Rollout -Both publisher and consumer changes ship in the same release per site: +⚠️ **Genuine tradeoff in this PR's scope** — see "Open rollout question" at the bottom of this section. -1. `room-worker` stops publishing `room_created`, starts publishing `member_added` with `RoomType` + `RequesterAccount` populated. -2. `inbox-worker` deletes the `room_created` switch arm + handler; `handleMemberAdded` learns to build DM/botDM-shaped subs. +Same-release deploy with no ordering is **unsafe**. Walking the two possible orders: -In-flight `room_created` events at deploy boundary land at new `inbox-worker` pods → hit the existing `default` case in the event-type switch → `slog.Warn("unknown event type, skipping")` + ack. The matching `member_added` for the same room arrives moments later (same publisher, same OUTBOX, same federation lane) and creates the subs. No correctness loss; one warn log per straggler. +| Deploy order | What happens during the rollout window | +|---|---| +| `room-worker` first | New room-worker stops publishing `room_created` and starts publishing `member_added` with `RoomType` + `RequesterAccount`. Old `inbox-worker` still has `handleMemberAdded` channel-hardcoded — when it receives a DM `member_added` from new room-worker, it builds the recipient sub with `Name=""`, `RoomType=channel`. **Malformed DM subs on the remote side.** | +| `inbox-worker` first | Old room-worker still publishes `room_created` for DM creations. New `inbox-worker` no longer handles `room_created` (default-arm warn-log). The corresponding `member_added` from old room-worker arrives without `RoomType` set; new `handleMemberAdded` defaults empty → channel. **Same malformed DM subs.** | -### Per-site verification after deploy +There is **no deploy order** that fully avoids the malformed-DM window in a single PR, because the wire-shape contract for DM federation changes between old and new schemas. The only correctness-preserving rollouts require keeping either the `room_created` publisher OR the `handleRoomCreated` handler alive through the transition — meaning a 2-PR split. + +### Severity of the malformed-DM window + +- **Defect**: cross-site DM created during the deploy window has `Name=""` (empty counterpart name) on the recipient site instead of the requester's account. +- **Impact**: frontend shows an unnamed DM in the recipient's left panel until manually fixed. +- **Scope**: bounded by the deploy duration (typically minutes per site). For a large org with multiple federated sites, the affected window per pair-of-sites is each site's individual deploy. +- **Recovery**: identifiable via `db.subscriptions.find({roomType: 'channel', name: ''})` on each remote site; fixable by either a one-shot script that re-derives `Name` from the room's home-site `Subscription` record, or by churn (any later add-member on the room re-emits with correct schema). + +### Open rollout question + +The Q1 design decision was "full removal in one PR". CodeRabbit (correctly) flagged that this isn't safe without one of these adjustments: + +| Option | Single PR? | DM malformation window? | Trade-off | +|---|---|---|---| +| **(A)** Ship as-is, accept the malformed-DM window, document recovery | Yes | Yes (bounded, recoverable) | Simplest; one operator burden during deploy | +| **(B)** Split: PR1 adds new fields + handleMemberAdded RoomType dispatch + populates publishes; PR2 removes `room_created` publish + handler + model | No (2 PRs) | No | Cleanest correctness; slower delivery | +| **(C)** Single PR but room-worker keeps publishing `room_created` AND adds the new fields to `member_added`; inbox-worker deletes `handleRoomCreated` and learns RoomType dispatch. Deploy order: room-worker first, then inbox-worker. Follow-up PR later deletes the `room_created` publish from room-worker. | Yes (this PR), with follow-up | No | One transition PR + one cleanup PR; intermediate state is wire-compatible | + +**Recommendation: option (C).** This PR removes the consumer's reliance on `room_created` and prepares the publisher to be removable in a follow-up, without any data-loss window. The follow-up PR to remove the publisher is mechanical (delete-only) once everyone's on the new inbox-worker. + +This requires the user to confirm whether to amend the spec to (C) before proceeding to implementation. The sections below describe option (A) for completeness; flip to (C) on user confirmation. + +### Step 1 — Deploy `room-worker` globally first (option A) + +Roll the new `room-worker` image to every replica on every site. After this step, every `member_added` publish from any healthy `room-worker` pod carries the new `RoomType` + `RequesterAccount` fields, and every room creation emits the cross-site `member_added` event (not just channels). + +Old `room-worker` pods are still publishing `room_created` for DM/botDM creations; that's fine because old `inbox-worker` pods (yet to be replaced) still handle it. + +### Step 2 — Verify step 1 completion before starting step 3 + +Per site, all sites: + +- **Replica check**: confirm the new `room-worker` image tag is uniform across every replica. +- **Schema check**: `nats stream get OUTBOX_{site} --last-by-subject 'outbox.*.to.*.member_added'` — the inner `MemberAddEvent` payload must contain a non-empty `roomType` field, and for DM/botDM creations a non-empty `requesterAccount`. If `roomType` is empty, a pre-deploy `room-worker` pod is still publishing — block step 3 until resolved. +- **Lane check**: `nats stream info OUTBOX_{site}` shows `member_added` subject traffic at the room-creation rate. + +Step 3 MUST NOT begin until every site is green on the three checks above. This minimizes (but does not eliminate) the malformed-DM window. + +### Step 3 — Deploy `inbox-worker` globally second + +Roll the new `inbox-worker` image. This release deletes the `case model.MessageTypeRoomCreated:` switch arm and the `handleRoomCreated` function. After this step, new `inbox-worker` pods route every `member_added` through `handleMemberAdded` with RoomType dispatch. + +### Per-site verification after step 3 1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. 2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. 3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). -4. `nats stream info OUTBOX_{site}` should show member_added events flowing at room-creation rate; room_created subject should report zero new messages. +4. `nats stream info OUTBOX_{site}` should show `member_added` events flowing at room-creation rate; `room_created` subject should report zero new messages. ## Observability From ee1d33ee6aae46eb864cf1ecd30fc48fe28d94a1 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 00:10:10 +0000 Subject: [PATCH 3/4] docs(spec): simplify rollout to single-PR per pre-prod context User confirmed cross-site federation is not yet integrated end-to-end, so the theoretical mixed-version DM-sub-malformation window has no real-world incidence today. Reverting the spec to option (A): single PR ships both publisher and consumer changes; deploy order (room-worker first) is defensive rather than strictly required. Trims the option-discussion text and rolls the deploy-window risk into the Risks section with explicit acknowledgment that it's theoretical under current operational reality. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...nsolidate-room-create-federation-design.md | 60 ++++--------------- 1 file changed, 11 insertions(+), 49 deletions(-) diff --git a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md index 136150ba3..2a3e6803d 100644 --- a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md +++ b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md @@ -236,59 +236,22 @@ Update existing PR #169 tests to assert `RoomType` and `RequesterAccount` are po ## Rollout -⚠️ **Genuine tradeoff in this PR's scope** — see "Open rollout question" at the bottom of this section. +Single PR, both publisher and consumer changes shipped together. -Same-release deploy with no ordering is **unsafe**. Walking the two possible orders: +**Pre-production context**: cross-site federation is not yet integrated end-to-end in any deployed environment. No live cross-site DM traffic exists today, so the theoretical mixed-version DM-sub-malformation window during a mid-deploy creation has no real-world incidence. This rollout plan is therefore straightforward — when federation does go live in prod, both this PR and its schema extension will already be in place everywhere. -| Deploy order | What happens during the rollout window | -|---|---| -| `room-worker` first | New room-worker stops publishing `room_created` and starts publishing `member_added` with `RoomType` + `RequesterAccount`. Old `inbox-worker` still has `handleMemberAdded` channel-hardcoded — when it receives a DM `member_added` from new room-worker, it builds the recipient sub with `Name=""`, `RoomType=channel`. **Malformed DM subs on the remote side.** | -| `inbox-worker` first | Old room-worker still publishes `room_created` for DM creations. New `inbox-worker` no longer handles `room_created` (default-arm warn-log). The corresponding `member_added` from old room-worker arrives without `RoomType` set; new `handleMemberAdded` defaults empty → channel. **Same malformed DM subs.** | +### Deploy order (defensive, not strictly required) -There is **no deploy order** that fully avoids the malformed-DM window in a single PR, because the wire-shape contract for DM federation changes between old and new schemas. The only correctness-preserving rollouts require keeping either the `room_created` publisher OR the `handleRoomCreated` handler alive through the transition — meaning a 2-PR split. +For belt-and-suspenders correctness in case federation traffic does materialize during a rolling deploy, prefer deploying `room-worker` ahead of `inbox-worker` per site. This ensures any new `member_added` events on the wire already carry `RoomType` + `RequesterAccount` before the consumer side starts dispatching on those fields. -### Severity of the malformed-DM window +In-flight `room_created` events that arrive at the new `inbox-worker` (because they were redelivered from before its deploy) hit the existing `default` case in the event-type switch → `slog.Warn("unknown event type, skipping")` + ack. No nak-loop, no behavior regression. -- **Defect**: cross-site DM created during the deploy window has `Name=""` (empty counterpart name) on the recipient site instead of the requester's account. -- **Impact**: frontend shows an unnamed DM in the recipient's left panel until manually fixed. -- **Scope**: bounded by the deploy duration (typically minutes per site). For a large org with multiple federated sites, the affected window per pair-of-sites is each site's individual deploy. -- **Recovery**: identifiable via `db.subscriptions.find({roomType: 'channel', name: ''})` on each remote site; fixable by either a one-shot script that re-derives `Name` from the room's home-site `Subscription` record, or by churn (any later add-member on the room re-emits with correct schema). +### Per-site verification after deploy -### Open rollout question - -The Q1 design decision was "full removal in one PR". CodeRabbit (correctly) flagged that this isn't safe without one of these adjustments: - -| Option | Single PR? | DM malformation window? | Trade-off | -|---|---|---|---| -| **(A)** Ship as-is, accept the malformed-DM window, document recovery | Yes | Yes (bounded, recoverable) | Simplest; one operator burden during deploy | -| **(B)** Split: PR1 adds new fields + handleMemberAdded RoomType dispatch + populates publishes; PR2 removes `room_created` publish + handler + model | No (2 PRs) | No | Cleanest correctness; slower delivery | -| **(C)** Single PR but room-worker keeps publishing `room_created` AND adds the new fields to `member_added`; inbox-worker deletes `handleRoomCreated` and learns RoomType dispatch. Deploy order: room-worker first, then inbox-worker. Follow-up PR later deletes the `room_created` publish from room-worker. | Yes (this PR), with follow-up | No | One transition PR + one cleanup PR; intermediate state is wire-compatible | - -**Recommendation: option (C).** This PR removes the consumer's reliance on `room_created` and prepares the publisher to be removable in a follow-up, without any data-loss window. The follow-up PR to remove the publisher is mechanical (delete-only) once everyone's on the new inbox-worker. - -This requires the user to confirm whether to amend the spec to (C) before proceeding to implementation. The sections below describe option (A) for completeness; flip to (C) on user confirmation. - -### Step 1 — Deploy `room-worker` globally first (option A) - -Roll the new `room-worker` image to every replica on every site. After this step, every `member_added` publish from any healthy `room-worker` pod carries the new `RoomType` + `RequesterAccount` fields, and every room creation emits the cross-site `member_added` event (not just channels). - -Old `room-worker` pods are still publishing `room_created` for DM/botDM creations; that's fine because old `inbox-worker` pods (yet to be replaced) still handle it. - -### Step 2 — Verify step 1 completion before starting step 3 - -Per site, all sites: - -- **Replica check**: confirm the new `room-worker` image tag is uniform across every replica. -- **Schema check**: `nats stream get OUTBOX_{site} --last-by-subject 'outbox.*.to.*.member_added'` — the inner `MemberAddEvent` payload must contain a non-empty `roomType` field, and for DM/botDM creations a non-empty `requesterAccount`. If `roomType` is empty, a pre-deploy `room-worker` pod is still publishing — block step 3 until resolved. -- **Lane check**: `nats stream info OUTBOX_{site}` shows `member_added` subject traffic at the room-creation rate. - -Step 3 MUST NOT begin until every site is green on the three checks above. This minimizes (but does not eliminate) the malformed-DM window. - -### Step 3 — Deploy `inbox-worker` globally second - -Roll the new `inbox-worker` image. This release deletes the `case model.MessageTypeRoomCreated:` switch arm and the `handleRoomCreated` function. After this step, new `inbox-worker` pods route every `member_added` through `handleMemberAdded` with RoomType dispatch. - -### Per-site verification after step 3 +1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. +2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. +3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). +4. `nats stream info OUTBOX_{site}` should show `member_added` events flowing at room-creation rate; `room_created` subject should report zero new messages. 1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. 2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. @@ -303,6 +266,5 @@ Roll the new `inbox-worker` image. This release deletes the `case model.MessageT ## Risks -- **Federated event missing `RoomType` during deploy window.** Older publishers (pre-deploy) emit `member_added` without `RoomType`. The `roomType == ""` → `RoomTypeChannel` defaulting in `handleMemberAdded` keeps channel-create-and-add-member paths correct. The only risk is a pre-deploy DM federation: an old publisher sends `room_created` (which the new consumer drops as unknown) but no `member_added` for DMs (because the old publisher doesn't emit `member_added` for DMs at all — that's net-new in this PR). Mitigation: deploy `room-worker` first so all DM creates after that point emit the new event. -- **Search-sync-worker reading old InboxMemberEvent payloads from before the publisher rolled out.** Spotlight docs indexed during the window keep their empty `roomType` until the next event touches the room. Acceptable — consistent with the pre-deploy state, and the next add/remove on the room re-emits with the correct type. +- **Mixed-version deploy window** (theoretical, no real-world incidence today). Cross-site federation is not yet integrated end-to-end, so no DM federation traffic exists during a rolling deploy of this PR. If federation traffic did flow during the window, a DM creation initiated by a new `room-worker` pod could be consumed by an old `inbox-worker` pod's channel-hardcoded `handleMemberAdded` and produce a recipient sub with `Name=""` (empty counterpart name). The defect would be bounded to the deploy duration and recoverable via either a manual script that re-derives `Name` from the room's home-site sub, or by churn (any later add-member re-emits with correct schema). Not blocking under current operational reality. - **Removal of `RoomCreatedOutbox` is a one-way ratchet.** No out-of-tree consumer exists today (`grep` confirms). If we ever want to reintroduce the dedicated event, the cost is just adding the type back; the lane is unchanged. From 88f7c66c8c35a5dc06d65b6ca19cc0d5fb761e55 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 03:03:14 +0000 Subject: [PATCH 4/4] feat(model,room-worker,inbox-worker): consolidate room-create federation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single cross-site event for room creation: outbox.{origin}.to.{remote}. member_added does double duty — drives sub creation in inbox-worker (with correct DM/botDM/channel shapes) AND MV update in search-sync-worker. Drops the redundant room_created event entirely. Schema (pkg/model/event.go): - MemberAddEvent gains RoomType + RequesterAccount (both omitempty). - Delete RoomCreatedOutbox struct. - Delete OutboxTypeRoomCreated constant. - MessageTypeRoomCreated stays — distinct system-message-type constant used by room-worker's publishChannelSysMessages, unrelated to federation. Consumer (inbox-worker/handler.go): - handleMemberAdded dispatches on event.RoomType. Empty RoomType defaults to RoomTypeChannel for backward-compat with pre-deploy publishers that didn't set the field. - subscriptionName / subscriptionIsSubscribed helpers refactored to take primitives (roomType, roomName, requesterAccount, *user) instead of *RoomCreatedOutbox, so handleMemberAdded can call them. - Duplicate-key BulkCreateSubscriptions errors swallowed (replay after a crashed prior delivery is idempotent — matches PR #169 fix). - handleRoomCreated function deleted. - case model.MessageTypeRoomCreated arm in HandleEvent switch deleted. Publisher (room-worker/handler.go): - finishCreateRoom: delete the per-remote-site room_created OUTBOX publish. Cross-site member_added publish now carries RoomType + RequesterAccount. - finishCreateRoom local INBOX publish: same fields populated for consistency (search-sync-worker reads them). - processAddMembers: populate RoomType + RequesterAccount on all three member_added publishes (UI fan-out, local INBOX, cross-site OUTBOX). Channels-only path, but consistent shape avoids surprises. - publishSyncDMOutbox: switch from room_created to member_added with the full new schema. Tests: - inbox-worker/handler_test.go: replace 5 TestHandleRoomCreated* tests with TestHandleMemberAdded_DM/BotDM/Channel/EmptyRoomType/ DuplicateKey cases. Helpers refactored to match new signatures. - inbox-worker/integration_test.go: replace 2 room_created integration tests with member_added equivalents going through HandleEvent. - room-worker/handler_test.go + integration_test.go: assertions on cross-site outboxes now look for OutboxMemberAdded subjects with full RoomType + RequesterAccount payload. Incidental fix: search-sync-worker.spotlight.go has been writing an empty `roomType` field to the spotlight ES doc since PR #145 because MemberAddEvent's wire format didn't carry RoomType. Once room-worker starts populating RoomType, the spotlight doc gets correct roomType for the first time. No code change in search-sync-worker; existing TestSpotlightCollection_BuildAction_MemberAdded asserts the correct value. Spec: docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- ...nsolidate-room-create-federation-design.md | 43 +--- inbox-worker/handler.go | 105 ++------ inbox-worker/handler_test.go | 240 +++++++++--------- inbox-worker/integration_test.go | 55 ++-- pkg/model/event.go | 20 +- pkg/model/model_test.go | 19 -- room-worker/handler.go | 63 ++--- room-worker/handler_test.go | 23 +- room-worker/integration_test.go | 63 ++--- 9 files changed, 237 insertions(+), 394 deletions(-) diff --git a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md index 2a3e6803d..c5ae0031e 100644 --- a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md +++ b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md @@ -35,7 +35,7 @@ The redundancy is a layering artifact: PR #142 (Vinayak, `feat(create-room): DM, - **Backward compatibility for cross-tree consumers of `RoomCreatedOutbox` / `OutboxTypeRoomCreated`.** Grep confirms no out-of-tree consumer exists today. - **Bridging in-flight `room_created` events during the deploy window.** Both ends ship in the same release per site; any straggler federated event arriving at a new `inbox-worker` pod hits the existing `default` case in the event-type switch and logs `"unknown event type, skipping"`. The corresponding `member_added` for the same room arrives moments later and does the right thing. - **Refactoring `MemberRemoveEvent`** the same way. Remove is already type-agnostic in `inbox-worker.handleMemberRemoved`; only `member_added` needs the per-type sub-shape logic. -- **Search-sync-worker code changes.** Zero structural change required. One regression test added to lock in the latent-bug fix. +- **Search-sync-worker code changes.** Zero structural change required. No new tests added; existing spotlight coverage already asserts `roomType` via `baseInboxMemberEvent`. ## Design @@ -144,7 +144,9 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) } if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subscriptions: %w", err) + if !mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("bulk create subscriptions: %w", err) + } } return nil } @@ -163,7 +165,7 @@ The old `handleRoomCreated` function is **deleted**. The `case model.MessageType - Delete the `case model.MessageTypeRoomCreated:` arm. - Refactor `subscriptionName` / `subscriptionIsSubscribed` to take primitives. - `inbox-worker/handler_test.go`: delete `TestHandleRoomCreated*` cases (5 of them per grep). Replace with new `TestHandleMemberAdded_DM*` / `TestHandleMemberAdded_BotDM*` cases. -- `inbox-worker/integration_test.go`: delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub` (2 cases). Replace with `TestHandleMemberAddedDM_PersistsCorrectShape_Integration`. +- `inbox-worker/integration_test.go`: delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub` (2 cases). Replace with `TestHandleMemberAdded_Channel_PersistsRemoteSubs` and `TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub` going through the public `HandleEvent` entry point. - `room-worker/handler.go::finishCreateRoom`: delete the per-remote-site `room_created` publish block. Add `RoomType` + `RequesterAccount` to the existing `member_added` publishes (both local-INBOX and cross-site OUTBOX). - `room-worker/handler.go::processAddMembers`: add `RoomType` + `RequesterAccount` to all three publish sites (local INBOX, cross-site OUTBOX, the older add-members publish if separate). - `room-worker/handler_test.go`: assert `RoomType` is populated on every `member_added` publish capture; delete any test that asserted on the `room_created` outbox publishes. @@ -173,35 +175,15 @@ The old `handleRoomCreated` function is **deleted**. The `case model.MessageType Zero structural change. `model.InboxMemberEvent` (the struct `search-sync-worker.parseMemberEvent` decodes into) already has `RoomType` — it's the publisher side that didn't populate it. Once `room-worker` starts setting `RoomType` on every `member_added` publish, the existing `string(evt.RoomType)` write at `spotlight.go:120` produces the correct value for the first time. -One regression test added to `search-sync-worker/spotlight_test.go`: - -- `TestSpotlightCollection_BuildAction_PopulatesRoomType` — builds an `InboxMemberEvent` with `RoomType: model.RoomTypeChannel`, runs `BuildAction`, asserts the resulting ES doc carries `roomType: "channel"`. Pure regression guard against the publishers reverting to empty `RoomType`. +No new search-sync-worker tests needed. The existing `TestSpotlightCollection_BuildAction_MemberAdded` already asserts `assert.Equal(t, "channel", doc["roomType"])` via `baseInboxMemberEvent()` (which sets `RoomType: model.RoomTypeChannel`), so the regression guard for the latent spotlight-roomType-empty bug is already in place. ### Idempotency -No change. The unique index on `subscriptions.(roomId, u.account)` already handles concurrent or redelivered creates idempotently. The dedup-key fall-through fix from PR #169 (CodeRabbit's catch) still applies — `mongo.IsDuplicateKeyError` is swallowed and execution continues so search-sync-worker's MV update still fires on replays. - -Wait — that fix is in `handleRoomCreated`. After deleting `handleRoomCreated`, the dup-key handling needs to be present in `handleMemberAdded` too. Today `handleMemberAdded` returns the bulk-create error on any failure, including dup-key: - -```go -if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subscriptions: %w", err) -} -``` - -Apply PR #169's fix here as well: treat `mongo.IsDuplicateKeyError` as idempotent and continue (no publish to fall through to in this handler, but the explicit nil return matches the intent and prevents JetStream nak-loops). - -```go -if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - if !mongo.IsDuplicateKeyError(err) { - return fmt.Errorf("bulk create subscriptions: %w", err) - } -} -``` +The unique index on `subscriptions.(roomId, u.account)` keeps concurrent and redelivered creates idempotent. `handleMemberAdded` swallows `mongo.IsDuplicateKeyError` (see the bulk-create branch in the code example above) and continues, so JetStream replays of `member_added` after a crashed prior delivery do not nak-loop. Non-duplicate errors propagate so JetStream retries until success or `MaxDeliver` is hit. ## Testing -Unit only, per spec. +Unit + integration coverage, per spec. ### `inbox-worker/handler_test.go` @@ -217,7 +199,7 @@ Delete `TestHandleRoomCreated*` cases (5 total per current `grep`). ### `inbox-worker/integration_test.go` -Delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub`. Replace with `TestHandleMemberAddedDM_PersistsCorrectShape_Integration` — exercises the full DM flow against real Mongo: `member_added` event with `RoomType=DM`, asserts the persisted Subscription row has `Name = RequesterAccount`, `Roles = nil`, `IsSubscribed = false`. +Delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub`. Replace with `TestHandleMemberAdded_Channel_PersistsRemoteSubs` and `TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub` — both go through the public `HandleEvent` entry point with `member_added` payloads carrying the new `RoomType` + `RequesterAccount` fields, and assert the persisted Subscription rows have the right shape. ### `room-worker/handler_test.go` @@ -232,7 +214,7 @@ Update existing PR #169 tests to assert `RoomType` and `RequesterAccount` are po ### `search-sync-worker/spotlight_test.go` -- `TestSpotlightCollection_BuildAction_PopulatesRoomType` (NEW): builds an `InboxMemberEvent` with `RoomType: model.RoomTypeChannel`, runs `BuildAction`, asserts the resulting ES doc carries `roomType: "channel"`. Locks in the consolidation's latent-bug fix. +No new tests. The existing `TestSpotlightCollection_BuildAction_MemberAdded` already asserts `doc["roomType"] == "channel"` via `baseInboxMemberEvent()`, so the regression guard for the latent spotlight-roomType-empty bug is in place without code change. ## Rollout @@ -253,11 +235,6 @@ In-flight `room_created` events that arrive at the new `inbox-worker` (because t 3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). 4. `nats stream info OUTBOX_{site}` should show `member_added` events flowing at room-creation rate; `room_created` subject should report zero new messages. -1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. -2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. -3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). -4. `nats stream info OUTBOX_{site}` should show `member_added` events flowing at room-creation rate; `room_created` subject should report zero new messages. - ## Observability - **Logs**: one new `slog.Warn` per straggler `room_created` event during the deploy window — expected, transient. After deploy completes, zero. diff --git a/inbox-worker/handler.go b/inbox-worker/handler.go index defa38f1f..3fa5cf91c 100644 --- a/inbox-worker/handler.go +++ b/inbox-worker/handler.go @@ -3,15 +3,15 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "log/slog" "strings" "time" + "go.mongodb.org/mongo-driver/v2/mongo" + "github.com/hmchangw/chat/pkg/idgen" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" ) // InboxStore abstracts the data store operations needed by the inbox worker. @@ -61,8 +61,6 @@ func (h *Handler) HandleEvent(ctx context.Context, data []byte) error { return h.handleSubscriptionRead(ctx, &evt) case "thread_subscription_upserted": return h.handleThreadSubscriptionUpserted(ctx, &evt) - case model.MessageTypeRoomCreated: - return h.handleRoomCreated(ctx, &evt) default: slog.Warn("unknown event type, skipping", "type", evt.Type) return nil @@ -75,6 +73,11 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) return fmt.Errorf("unmarshal member_added payload: %w", err) } + roomType := event.RoomType + if roomType == "" { + roomType = model.RoomTypeChannel + } + users, err := h.store.FindUsersByAccounts(ctx, event.Accounts) if err != nil { return fmt.Errorf("find users by accounts: %w", err) @@ -98,25 +101,28 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) slog.Warn("user not found for account", "account", account) continue } - // RoomType is fixed to channel: cross-site member_added events only - // originate from rooms that support add-member (channel/discussion), - // never from DM/botDM. sub := &model.Subscription{ ID: idgen.GenerateUUIDv7(), User: model.SubscriptionUser{ID: user.ID, Account: user.Account}, RoomID: event.RoomID, - RoomType: model.RoomTypeChannel, + RoomType: roomType, SiteID: event.SiteID, - Roles: []model.Role{model.RoleMember}, - Name: event.RoomName, + Roles: rolesForType(roomType), + Name: subscriptionName(roomType, event.RoomName, event.RequesterAccount), + IsSubscribed: subscriptionIsSubscribed(roomType, &user), HistorySharedSince: historySharedSince, JoinedAt: joinedAt, } subs = append(subs, sub) } + if len(subs) == 0 { + return nil + } if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subscriptions: %w", err) + if !mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("bulk create subscriptions: %w", err) + } } // No SubscriptionUpdateEvent is published here — room-worker already publishes @@ -209,9 +215,6 @@ func (h *Handler) handleThreadSubscriptionUpserted(ctx context.Context, evt *mod return nil } -// errPermanent signals a non-retryable error; callers should Ack and move on. -var errPermanent = errors.New("permanent") - func rolesForType(t model.RoomType) []model.Role { if t == model.RoomTypeChannel { return []model.Role{model.RoleMember} @@ -219,13 +222,12 @@ func rolesForType(t model.RoomType) []model.Role { return nil } -func subscriptionName(d *model.RoomCreatedOutbox, u *model.User) string { - switch d.RoomType { +func subscriptionName(roomType model.RoomType, roomName, requesterAccount string) string { + switch roomType { case model.RoomTypeChannel, model.RoomTypeDiscussion: - return d.RoomName + return roomName case model.RoomTypeDM, model.RoomTypeBotDM: - // On the remote site, the "other party" relative to u is the requester. - return d.RequesterAccount + return requesterAccount } return "" } @@ -236,70 +238,9 @@ func isBot(account string) bool { return strings.HasSuffix(account, ".bot") || strings.HasPrefix(account, "p_") } -func subscriptionIsSubscribed(d *model.RoomCreatedOutbox, u *model.User) bool { - if d.RoomType != model.RoomTypeBotDM { +func subscriptionIsSubscribed(roomType model.RoomType, u *model.User) bool { + if roomType != model.RoomTypeBotDM { return false } return !isBot(u.Account) } - -func (h *Handler) handleRoomCreated(ctx context.Context, evt *model.OutboxEvent) error { - requestID := natsutil.RequestIDFromContext(ctx) - if requestID == "" { - return fmt.Errorf("missing X-Request-ID: %w", errPermanent) - } - - var data model.RoomCreatedOutbox - if err := json.Unmarshal(evt.Payload, &data); err != nil { - return fmt.Errorf("unmarshal room_created payload: %w: %w", err, errPermanent) - } - if len(data.Accounts) == 0 { - slog.Warn("room_created event with empty Accounts list", - "requestId", requestID, "roomId", data.RoomID) - return nil - } - - users, err := h.store.FindUsersByAccounts(ctx, data.Accounts) - if err != nil { - return fmt.Errorf("find users by accounts: %w", err) - } - // FindUsersByAccounts can return a subset; treat any account in - // data.Accounts that didn't come back as a hard failure rather than - // silently materializing partial remote-side state with no retry signal. - userByAccount := make(map[string]model.User, len(users)) - for i := range users { - userByAccount[users[i].Account] = users[i] - } - for _, account := range data.Accounts { - if _, ok := userByAccount[account]; !ok { - return fmt.Errorf("find users by accounts: missing account %q (room %s home %s)", - account, data.RoomID, data.HomeSiteID) - } - } - - acceptedAt := time.UnixMilli(data.Timestamp).UTC() - subs := make([]*model.Subscription, 0, len(data.Accounts)) - for _, account := range data.Accounts { - u := userByAccount[account] - sub := &model.Subscription{ - ID: idgen.GenerateUUIDv7(), - User: model.SubscriptionUser{ID: u.ID, Account: u.Account}, - RoomID: data.RoomID, - SiteID: data.HomeSiteID, - Roles: rolesForType(data.RoomType), - Name: subscriptionName(&data, &u), - RoomType: data.RoomType, - IsSubscribed: subscriptionIsSubscribed(&data, &u), - JoinedAt: acceptedAt, - } - subs = append(subs, sub) - } - - if len(subs) == 0 { - return nil - } - if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subs: %w", err) - } - return nil -} diff --git a/inbox-worker/handler_test.go b/inbox-worker/handler_test.go index f7c846813..7781b6ddc 100644 --- a/inbox-worker/handler_test.go +++ b/inbox-worker/handler_test.go @@ -10,10 +10,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/mongo" "github.com/hmchangw/chat/pkg/idgen" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" ) // --- In-memory InboxStore stub --- @@ -35,6 +35,7 @@ type stubInboxStore struct { mu sync.Mutex subscriptions []model.Subscription bulkSubscriptions []*model.Subscription + bulkCreateErr error rooms []model.Room roleUpdates []roleUpdate users []model.User @@ -132,6 +133,9 @@ func (s *stubInboxStore) FindUsersByAccounts(_ context.Context, accounts []strin func (s *stubInboxStore) BulkCreateSubscriptions(_ context.Context, subs []*model.Subscription) error { s.mu.Lock() defer s.mu.Unlock() + if s.bulkCreateErr != nil { + return s.bulkCreateErr + } s.bulkSubscriptions = append(s.bulkSubscriptions, subs...) for _, sub := range subs { s.subscriptions = append(s.subscriptions, *sub) @@ -997,91 +1001,93 @@ func TestRolesForType(t *testing.T) { } func TestSubscriptionName(t *testing.T) { - d := model.RoomCreatedOutbox{ - RoomType: model.RoomTypeChannel, - RoomName: "deal team", - RequesterAccount: "alice", - } - assert.Equal(t, "deal team", subscriptionName(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeDM - assert.Equal(t, "alice", subscriptionName(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeBotDM - assert.Equal(t, "alice", subscriptionName(&d, &model.User{Account: "weather.bot"})) + assert.Equal(t, "deal team", subscriptionName(model.RoomTypeChannel, "deal team", "alice")) + assert.Equal(t, "alice", subscriptionName(model.RoomTypeDM, "", "alice")) + assert.Equal(t, "alice", subscriptionName(model.RoomTypeBotDM, "", "alice")) + assert.Equal(t, "", subscriptionName(model.RoomType(""), "ignored", "alice")) } func TestSubscriptionIsSubscribed(t *testing.T) { - d := model.RoomCreatedOutbox{RoomType: model.RoomTypeChannel} - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeDM - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeBotDM - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "weather.bot"})) - assert.True(t, subscriptionIsSubscribed(&d, &model.User{Account: "alice"})) - // p_ webhook bots: same as .bot — bot side gets IsSubscribed=false. - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "p_webhook"})) -} - -func TestHandleRoomCreatedRequiresRequestID(t *testing.T) { - store := &stubInboxStore{} - h := NewHandler(store) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r1", RoomType: model.RoomTypeChannel, - Accounts: []string{"bob"}, - }) - err := h.handleRoomCreated(context.Background(), &model.OutboxEvent{Payload: payload}) - require.Error(t, err) - assert.Contains(t, err.Error(), "missing X-Request-ID") + assert.False(t, subscriptionIsSubscribed(model.RoomTypeChannel, &model.User{Account: "bob"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeDM, &model.User{Account: "bob"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "weather.bot"})) + assert.True(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "alice"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "p_webhook"})) } -func TestHandleRoomCreatedEmptyAccountsAcksWithWarn(t *testing.T) { - store := &stubInboxStore{} - h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r1", RoomType: model.RoomTypeChannel, Accounts: []string{}, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) -} - -func TestHandleRoomCreatedDMBuildsRemoteSub(t *testing.T) { +func TestHandleMemberAdded_DM_BuildsRecipientSubWithCounterpartName(t *testing.T) { store := &stubInboxStore{ users: []model.User{ {ID: "u_bob", Account: "bob", SiteID: "site-B"}, }, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "u_aliceu_bob", - RoomType: model.RoomTypeDM, RoomName: "", - HomeSiteID: "site-A", + RoomType: model.RoomTypeDM, Accounts: []string{"bob"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: 1740000000000, Timestamp: 1740000000000, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) subs := store.bulkSubscriptions require.Len(t, subs, 1) assert.True(t, idgen.IsValidUUIDv7(subs[0].ID)) assert.Equal(t, "u_aliceu_bob", subs[0].RoomID) assert.Equal(t, "site-A", subs[0].SiteID) - assert.Equal(t, "alice", subs[0].Name) - assert.Nil(t, subs[0].Roles) + assert.Equal(t, "alice", subs[0].Name, "DM recipient sub.Name is the requester (counterpart) account") + assert.Nil(t, subs[0].Roles, "DM has no roles") assert.False(t, subs[0].IsSubscribed) assert.Equal(t, model.RoomTypeDM, subs[0].RoomType) } -func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { +func TestHandleMemberAdded_BotDM_BuildsBotSub(t *testing.T) { + // Cross-site botDM: human (alice) is the requester on site-A; bot + // (weather.bot) lives on site-B. Bot's sub on site-B should have + // Name = human's account, IsSubscribed = false (bot side). + store := &stubInboxStore{ + users: []model.User{ + {ID: "u_weather", Account: "weather.bot", SiteID: "site-B"}, + }, + } + h := NewHandler(store) + + change := model.MemberAddEvent{ + Type: "member_added", + RoomID: "u_aliceu_weather", + RoomName: "", + RoomType: model.RoomTypeBotDM, + Accounts: []string{"weather.bot"}, + SiteID: "site-A", + RequesterAccount: "alice", + JoinedAt: 1740000000000, + Timestamp: 1740000000000, + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) + + subs := store.bulkSubscriptions + require.Len(t, subs, 1) + assert.Equal(t, "alice", subs[0].Name) + assert.Nil(t, subs[0].Roles) + assert.False(t, subs[0].IsSubscribed, "bot account on the bot side never has IsSubscribed=true") + assert.Equal(t, model.RoomTypeBotDM, subs[0].RoomType) +} + +func TestHandleMemberAdded_Channel_BuildsChannelSub(t *testing.T) { store := &stubInboxStore{ users: []model.User{ {ID: "u_bob", Account: "bob", SiteID: "site-B"}, @@ -1089,19 +1095,23 @@ func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { }, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", - RoomType: model.RoomTypeChannel, RoomName: "deal team", - HomeSiteID: "site-A", + RoomType: model.RoomTypeChannel, Accounts: []string{"bob", "ian"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: 1, Timestamp: 1, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) subs := store.bulkSubscriptions require.Len(t, subs, 2) @@ -1113,11 +1123,11 @@ func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { } } -func TestHandleMemberAddedSetsNameAndRoomType(t *testing.T) { +func TestHandleMemberAdded_EmptyRoomType_DefaultsToChannel(t *testing.T) { + // Backward-compat: events from older publishers don't carry RoomType. + // handleMemberAdded must treat empty as channel. store := &stubInboxStore{ - users: []model.User{ - {ID: "u_bob", Account: "bob", SiteID: "site-B"}, - }, + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, } h := NewHandler(store) @@ -1127,66 +1137,56 @@ func TestHandleMemberAddedSetsNameAndRoomType(t *testing.T) { RoomName: "deal team", Accounts: []string{"bob"}, SiteID: "site-A", - JoinedAt: 1740000000000, - Timestamp: 1740000000000, + JoinedAt: 1, + Timestamp: 1, + // RoomType intentionally left empty } - changeData, err := json.Marshal(change) - require.NoError(t, err) - - evt := model.OutboxEvent{ - Type: "member_added", - SiteID: "site-A", - DestSiteID: "site-B", - Payload: changeData, - } - evtData, err := json.Marshal(evt) - require.NoError(t, err) + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", Payload: changeData} + evtData, _ := json.Marshal(evt) require.NoError(t, h.HandleEvent(context.Background(), evtData)) - subs := store.getSubscriptions() + subs := store.bulkSubscriptions require.Len(t, subs, 1) - assert.Equal(t, "deal team", subs[0].Name) - assert.Equal(t, model.RoomTypeChannel, subs[0].RoomType) + assert.Equal(t, model.RoomTypeChannel, subs[0].RoomType, "empty RoomType must default to channel") + assert.Equal(t, []model.Role{model.RoleMember}, subs[0].Roles) } -func TestHandleRoomCreatedBotDMBuildsRemoteBotSub(t *testing.T) { - // Cross-site botDM: human (alice) is the requester on site-A; bot - // (weather.bot) lives on site-B. The outbox event lands at site-B's - // inbox-worker, which must materialize the bot's sub with: - // Name = human's account ("alice") - // IsSubscribed = false - // Roles = nil (no member role for botDM) - // SiteID = home site (site-A) +func TestHandleMemberAdded_DuplicateKey_IsIdempotent(t *testing.T) { store := &stubInboxStore{ - users: []model.User{ - {ID: "u_weather", Account: "weather.bot", SiteID: "site-B"}, - }, + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, + bulkCreateErr: mongo.WriteException{WriteErrors: []mongo.WriteError{{Code: 11000, Message: "duplicate key"}}}, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "u_aliceu_weather", - RoomType: model.RoomTypeBotDM, - RoomName: "", - HomeSiteID: "site-A", - Accounts: []string{"weather.bot"}, - RequesterAccount: "alice", - Timestamp: 1740000000000, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", RoomType: model.RoomTypeChannel, + Accounts: []string{"bob"}, SiteID: "site-A", JoinedAt: 1, Timestamp: 1, + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", Payload: changeData} + evtData, _ := json.Marshal(evt) - subs := store.bulkSubscriptions - require.Len(t, subs, 1, "exactly one remote sub for the bot") - assert.True(t, idgen.IsValidUUIDv7(subs[0].ID)) - assert.Equal(t, "u_aliceu_weather", subs[0].RoomID) - assert.Equal(t, "site-A", subs[0].SiteID, "bot's sub.siteID is the room's home site") - assert.Equal(t, "alice", subs[0].Name, "bot's sub.Name is the human account") - assert.Nil(t, subs[0].Roles) - assert.False(t, subs[0].IsSubscribed) - assert.Equal(t, model.RoomTypeBotDM, subs[0].RoomType) - assert.Equal(t, "u_weather", subs[0].User.ID) - assert.Equal(t, "weather.bot", subs[0].User.Account) + require.NoError(t, h.HandleEvent(context.Background(), evtData), + "duplicate-key on bulk-create must be swallowed (replay after prior delivery is idempotent)") +} + +func TestHandleMemberAdded_BulkCreate_NonDuplicateError_ReturnsError(t *testing.T) { + store := &stubInboxStore{ + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, + bulkCreateErr: fmt.Errorf("connection refused"), + } + h := NewHandler(store) + + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", RoomType: model.RoomTypeChannel, + Accounts: []string{"bob"}, SiteID: "site-A", JoinedAt: 1, Timestamp: 1, + } + changeData, _ := json.Marshal(change) + evtData, _ := json.Marshal(model.OutboxEvent{Type: "member_added", Payload: changeData}) + + err := h.HandleEvent(context.Background(), evtData) + require.Error(t, err) + assert.Contains(t, err.Error(), "bulk create subscriptions") } diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index dd35fe22d..8eb8599e6 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -18,7 +18,6 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/stream" "github.com/hmchangw/chat/pkg/subject" "github.com/hmchangw/chat/pkg/testutil" @@ -490,7 +489,7 @@ func newIntegrationHandler(t *testing.T, db *mongo.Database) *Handler { return NewHandler(store) } -func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { +func TestHandleMemberAdded_Channel_PersistsRemoteSubs(t *testing.T) { ctx := context.Background() db := setupMongo(t) mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", @@ -499,27 +498,32 @@ func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { SiteID: "site-B", EngName: "Ian", ChineseName: "伊恩"}) h := newIntegrationHandler(t, db) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx = natsutil.WithRequestID(ctx, reqID) - payload, err := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r_xyz", RoomType: model.RoomTypeChannel, - RoomName: "deal team", HomeSiteID: "site-A", + payload, err := json.Marshal(model.MemberAddEvent{ + Type: "member_added", + RoomID: "r_xyz", + RoomName: "deal team", + RoomType: model.RoomTypeChannel, Accounts: []string{"bob", "ian"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: time.Now().UTC().UnixMilli(), Timestamp: time.Now().UTC().UnixMilli(), }) require.NoError(t, err) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + evt, err := json.Marshal(model.OutboxEvent{ + Type: "member_added", + SiteID: "site-A", + DestSiteID: "site-B", + Payload: payload, + }) + require.NoError(t, err) + require.NoError(t, h.HandleEvent(ctx, evt)) subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": "r_xyz"}) require.NoError(t, err) assert.Equal(t, int64(2), subCount) - roomCount, err := db.Collection("rooms").CountDocuments(ctx, bson.M{"_id": "r_xyz"}) - require.NoError(t, err) - assert.Equal(t, int64(0), roomCount, "inbox-worker must not create room mirror") - var bobSub model.Subscription require.NoError(t, db.Collection("subscriptions").FindOne(ctx, bson.M{"roomId": "r_xyz", "u.account": "bob"}).Decode(&bobSub)) @@ -528,42 +532,45 @@ func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { assert.Equal(t, model.RoomTypeChannel, bobSub.RoomType) } -func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) { +func TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub(t *testing.T) { ctx := context.Background() db := setupMongo(t) mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", SiteID: "site-B", EngName: "Bob", ChineseName: "鲍勃"}) h := newIntegrationHandler(t, db) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx = natsutil.WithRequestID(ctx, reqID) const roomID = "u_aliceu_bob" - payload, err := json.Marshal(model.RoomCreatedOutbox{ + payload, err := json.Marshal(model.MemberAddEvent{ + Type: "member_added", RoomID: roomID, - RoomType: model.RoomTypeDM, RoomName: "", - HomeSiteID: "site-A", + RoomType: model.RoomTypeDM, Accounts: []string{"bob"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: time.Now().UTC().UnixMilli(), Timestamp: time.Now().UTC().UnixMilli(), }) require.NoError(t, err) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + evt, err := json.Marshal(model.OutboxEvent{ + Type: "member_added", + SiteID: "site-A", + DestSiteID: "site-B", + Payload: payload, + }) + require.NoError(t, err) + require.NoError(t, h.HandleEvent(ctx, evt)) subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": roomID}) require.NoError(t, err) assert.Equal(t, int64(1), subCount) - roomCount, err := db.Collection("rooms").CountDocuments(ctx, bson.M{"_id": roomID}) - require.NoError(t, err) - assert.Equal(t, int64(0), roomCount, "inbox-worker must not create room mirror") - var bobSub model.Subscription require.NoError(t, db.Collection("subscriptions").FindOne(ctx, bson.M{"roomId": roomID, "u.account": "bob"}).Decode(&bobSub)) assert.Equal(t, "bob", bobSub.User.Account) - assert.Equal(t, "alice", bobSub.Name, "DM Subscription.Name = counterpart account") + assert.Equal(t, "alice", bobSub.Name, "DM Subscription.Name = counterpart account (the requester)") assert.Equal(t, "site-A", bobSub.SiteID, "sub SiteID is room's home, not this site") assert.Equal(t, model.RoomTypeDM, bobSub.RoomType) assert.Nil(t, bobSub.Roles, "DMs have no roles") diff --git a/pkg/model/event.go b/pkg/model/event.go index 0e48f924c..afea3ae8d 100644 --- a/pkg/model/event.go +++ b/pkg/model/event.go @@ -110,8 +110,10 @@ type MemberAddEvent struct { Type string `json:"type" bson:"type"` RoomID string `json:"roomId" bson:"roomId"` RoomName string `json:"roomName" bson:"roomName"` + RoomType RoomType `json:"roomType,omitempty" bson:"roomType,omitempty"` Accounts []string `json:"accounts" bson:"accounts"` SiteID string `json:"siteId" bson:"siteId"` + RequesterAccount string `json:"requesterAccount,omitempty" bson:"requesterAccount,omitempty"` JoinedAt int64 `json:"joinedAt" bson:"joinedAt"` HistorySharedSince *int64 `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"` Timestamp int64 `json:"timestamp" bson:"timestamp"` @@ -202,13 +204,6 @@ const ( MessageTypeMembersAdded = "members_added" ) -const ( - // OutboxTypeRoomCreated is the cross-site outbox event type emitted when a room is created. - // Distinct from MessageTypeRoomCreated (system-message type) so destination sites can - // route on event semantics without collision. - OutboxTypeRoomCreated = "room_created" -) - const ( // AsyncJobStatusOK indicates a successful async job result. AsyncJobStatusOK = "ok" @@ -225,14 +220,3 @@ type CreateRoomReply struct { // CreateRoomReplyAccepted means validated + queued; persistence happens later in room-worker. const CreateRoomReplyAccepted = "accepted" - -// RoomCreatedOutbox is the cross-site payload (wrapped in OutboxEvent) when a remote member exists. -type RoomCreatedOutbox struct { - RoomID string `json:"roomId"` - RoomType RoomType `json:"roomType"` - RoomName string `json:"roomName"` - HomeSiteID string `json:"homeSiteId"` - Accounts []string `json:"accounts"` - RequesterAccount string `json:"requesterAccount"` - Timestamp int64 `json:"timestamp"` -} diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index 550a5bf99..0afdd78c6 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -1910,25 +1910,6 @@ func TestCreateRoomRequestRoundtrip(t *testing.T) { assert.Equal(t, int64(1740000000000), dst.Timestamp) } -func TestRoomCreatedOutboxRoundtrip(t *testing.T) { - out := model.RoomCreatedOutbox{ - RoomID: "r1", - RoomType: model.RoomTypeChannel, - RoomName: "deal team", - HomeSiteID: "site-A", - Accounts: []string{"bob", "ian"}, - RequesterAccount: "alice", - Timestamp: 1740000000000, - } - data, err := json.Marshal(&out) - require.NoError(t, err) - var dst model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(data, &dst)) - assert.Equal(t, model.RoomTypeChannel, dst.RoomType) - assert.Equal(t, []string{"bob", "ian"}, dst.Accounts) - assert.NotContains(t, string(data), "appName") -} - func TestErrorResponseRoomIDOmitempty(t *testing.T) { er := model.ErrorResponse{Error: "internal"} body, err := json.Marshal(er) diff --git a/room-worker/handler.go b/room-worker/handler.go index f956d9add..e0fa8040e 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -842,8 +842,11 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error memberAddEvt := model.MemberAddEvent{ Type: "member_added", RoomID: req.RoomID, + RoomName: room.Name, + RoomType: room.Type, Accounts: actualAccounts, SiteID: room.SiteID, + RequesterAccount: req.RequesterAccount, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), @@ -911,8 +914,10 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error Type: "member_added", RoomID: req.RoomID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: req.RequesterAccount, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), @@ -1233,8 +1238,10 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq Type: model.OutboxMemberAdded, RoomID: room.ID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: requester.Account, JoinedAt: req.Timestamp, HistorySharedSince: nil, Timestamp: now.UnixMilli(), @@ -1263,45 +1270,14 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq remoteSiteAccounts[u.SiteID] = append(remoteSiteAccounts[u.SiteID], u.Account) } for destSiteID, accounts := range remoteSiteAccounts { - payload := model.RoomCreatedOutbox{ - RoomID: room.ID, - RoomType: room.Type, - RoomName: room.Name, - HomeSiteID: room.SiteID, - Accounts: accounts, - RequesterAccount: requester.Account, - Timestamp: req.Timestamp, - } - pData, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("marshal room_created outbox payload: %w", err) - } - envelope := model.OutboxEvent{ - Type: model.OutboxTypeRoomCreated, - SiteID: room.SiteID, - DestSiteID: destSiteID, - Payload: pData, - Timestamp: now.UnixMilli(), - } - eData, err := json.Marshal(envelope) - if err != nil { - return fmt.Errorf("marshal outbox envelope: %w", err) - } - if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxTypeRoomCreated), eData, requestID+":"+destSiteID); err != nil { - return fmt.Errorf("publish room_created outbox to %s: %w", destSiteID, err) - } - - // Cross-site member_added so the remote site's search-sync-worker - // updates its user-room/spotlight MV — mirrors processAddMembers' - // federation. inbox-worker still consumes the room_created above to - // build correctly-typed Subscription rows; this event only feeds the - // search index. memberEvt := model.MemberAddEvent{ Type: model.OutboxMemberAdded, RoomID: room.ID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: requester.Account, JoinedAt: req.Timestamp, HistorySharedSince: nil, Timestamp: now.UnixMilli(), @@ -1581,25 +1557,28 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req return nil } - payload := model.RoomCreatedOutbox{ + now := time.Now().UTC().UnixMilli() + memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, RoomID: room.ID, - RoomType: room.Type, RoomName: "", - HomeSiteID: room.SiteID, + RoomType: room.Type, Accounts: []string{other.Account}, + SiteID: room.SiteID, RequesterAccount: requester.Account, - Timestamp: acceptedAt.UnixMilli(), + JoinedAt: acceptedAt.UnixMilli(), + Timestamp: now, } - pData, err := json.Marshal(payload) + pData, err := json.Marshal(memberEvt) if err != nil { - return fmt.Errorf("marshal room_created outbox payload: %w", err) + return fmt.Errorf("marshal member_added outbox payload: %w", err) } envelope := model.OutboxEvent{ - Type: model.OutboxTypeRoomCreated, + Type: model.OutboxMemberAdded, SiteID: room.SiteID, DestSiteID: other.SiteID, Payload: pData, - Timestamp: time.Now().UTC().UnixMilli(), + Timestamp: now, } eData, err := json.Marshal(envelope) if err != nil { @@ -1607,7 +1586,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req } payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, acceptedAt.UnixMilli()) return h.publish(ctx, - subject.Outbox(room.SiteID, other.SiteID, model.OutboxTypeRoomCreated), + subject.Outbox(room.SiteID, other.SiteID, model.OutboxMemberAdded), eData, natsutil.OutboxDedupID(ctx, other.SiteID, payloadSeed), ) diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index 8d8a53c02..8cf62ffee 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -2322,18 +2322,19 @@ func TestProcessCreateRoom_Channel_OutboxPerRemoteSite(t *testing.T) { }) require.NoError(t, h.processCreateRoom(ctx, body)) - outboxMsgs := outboxFor(getPublished(), "site-B", model.MessageTypeRoomCreated) + outboxMsgs := outboxFor(getPublished(), "site-B", model.OutboxMemberAdded) require.Len(t, outboxMsgs, 1) var envelope model.OutboxEvent require.NoError(t, json.Unmarshal(outboxMsgs[0].data, &envelope)) - assert.Equal(t, model.MessageTypeRoomCreated, envelope.Type) + assert.Equal(t, model.OutboxMemberAdded, envelope.Type) assert.Equal(t, "site-A", envelope.SiteID) assert.Equal(t, "site-B", envelope.DestSiteID) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(envelope.Payload, &payload)) assert.Equal(t, "room-ch-5", payload.RoomID) + assert.Equal(t, model.RoomTypeChannel, payload.RoomType) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) } @@ -2826,24 +2827,24 @@ func TestHandleSyncCreateDM_CrossSite_EmitsOutbox(t *testing.T) { var outbox *dmCapturedPublish for i := range capture.captured { - if capture.captured[i].subject == subject.Outbox("site-a", "site-b", model.OutboxTypeRoomCreated) { + if capture.captured[i].subject == subject.Outbox("site-a", "site-b", model.OutboxMemberAdded) { outbox = &capture.captured[i] break } } - require.NotNil(t, outbox, "expected an outbox publish to site-b") + require.NotNil(t, outbox, "expected a member_added outbox publish to site-b") var env model.OutboxEvent require.NoError(t, json.Unmarshal(outbox.data, &env)) - assert.Equal(t, model.OutboxEventType(model.OutboxTypeRoomCreated), env.Type) + assert.Equal(t, model.OutboxMemberAdded, env.Type) assert.Equal(t, "site-a", env.SiteID) assert.Equal(t, "site-b", env.DestSiteID) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, model.RoomTypeDM, payload.RoomType) assert.Equal(t, "", payload.RoomName) - assert.Equal(t, "site-a", payload.HomeSiteID) + assert.Equal(t, "site-a", payload.SiteID) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, "01970a4f-8c2d-7c9a-abcd-e0123456789f:site-b", outbox.msgID) @@ -3139,13 +3140,11 @@ func TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded(t *testing.T) { require.NoError(t, json.Unmarshal(envelope.Payload, &inner)) assert.Equal(t, "room-ch-xsite", inner.RoomID) assert.Equal(t, "Cross", inner.RoomName) + assert.Equal(t, model.RoomTypeChannel, inner.RoomType, "create-time member_added carries RoomType for inbox-worker dispatch") assert.Equal(t, []string{"bob"}, inner.Accounts, "carries only the remote-site accounts, mirroring processAddMembers") assert.Equal(t, "site-A", inner.SiteID, "inner SiteID is the origin (room's home)") + assert.Equal(t, "alice", inner.RequesterAccount, "create-time member_added carries RequesterAccount for DM/botDM counterpart resolution") assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted") - - // Sanity: the existing room_created outbox is still emitted on the same loop. - roomCreatedOutbox := outboxFor(getPublished(), "site-B", model.OutboxTypeRoomCreated) - require.Len(t, roomCreatedOutbox, 1, "room_created outbox path unchanged") } // ---- Task 10: key-gate and fan-out tests ---- diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index f734fba40..0680888ba 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -653,31 +653,8 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")), "no outbox to home site-A") - // room_created outboxes — one per remote site. - createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) - createdC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxTypeRoomCreated)) - require.Len(t, createdB, 1, "exactly one room_created outbox to site-B") - require.Len(t, createdC, 1, "exactly one room_created outbox to site-C") - - var envB model.OutboxEvent - require.NoError(t, json.Unmarshal(createdB[0].data, &envB)) - var payloadB model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(envB.Payload, &payloadB)) - assert.ElementsMatch(t, []string{"bob", "carol"}, payloadB.Accounts) - assert.Equal(t, model.RoomTypeChannel, payloadB.RoomType) - assert.Equal(t, "deal team", payloadB.RoomName) - assert.Equal(t, "site-A", payloadB.HomeSiteID) - assert.Equal(t, "alice", payloadB.RequesterAccount) - assert.Equal(t, reqID+":site-B", createdB[0].msgID) - - var envC model.OutboxEvent - require.NoError(t, json.Unmarshal(createdC[0].data, &envC)) - var payloadC model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(envC.Payload, &payloadC)) - assert.ElementsMatch(t, []string{"ian"}, payloadC.Accounts) - assert.Equal(t, reqID+":site-C", createdC[0].msgID) - - // member_added outboxes — one per remote site (search-sync-worker federation). + // One member_added outbox per remote site — carries all the info + // inbox-worker needs for sub creation AND search-sync-worker for MV update. memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) memberC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxMemberAdded)) require.Len(t, memberB, 1, "exactly one member_added outbox to site-B") @@ -688,8 +665,10 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { var memberPayloadB model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnvB.Payload, &memberPayloadB)) assert.ElementsMatch(t, []string{"bob", "carol"}, memberPayloadB.Accounts) + assert.Equal(t, model.RoomTypeChannel, memberPayloadB.RoomType) assert.Equal(t, "deal team", memberPayloadB.RoomName) assert.Equal(t, "site-A", memberPayloadB.SiteID) + assert.Equal(t, "alice", memberPayloadB.RequesterAccount) assert.Nil(t, memberPayloadB.HistorySharedSince) assert.Equal(t, reqID+":site-B", memberB[0].msgID) @@ -698,6 +677,11 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { var memberPayloadC model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnvC.Payload, &memberPayloadC)) assert.ElementsMatch(t, []string{"ian"}, memberPayloadC.Accounts) + assert.Equal(t, model.RoomTypeChannel, memberPayloadC.RoomType) + assert.Equal(t, "deal team", memberPayloadC.RoomName) + assert.Equal(t, "site-A", memberPayloadC.SiteID) + assert.Equal(t, "alice", memberPayloadC.RequesterAccount) + assert.Nil(t, memberPayloadC.HistorySharedSince) assert.Equal(t, reqID+":site-C", memberC[0].msgID) } @@ -741,28 +725,18 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", ""))) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", ""))) - // room_created outbox to the recipient's site. - createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) - require.Len(t, createdB, 1) - var env model.OutboxEvent - require.NoError(t, json.Unmarshal(createdB[0].data, &env)) - var payload model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(env.Payload, &payload)) - assert.Equal(t, model.RoomTypeDM, payload.RoomType) - assert.Equal(t, "", payload.RoomName, "DM RoomName empty per v2 cleanup") - assert.ElementsMatch(t, []string{"bob"}, payload.Accounts) - assert.Equal(t, "alice", payload.RequesterAccount) - assert.Equal(t, "site-A", payload.HomeSiteID) - assert.Equal(t, reqID+":site-B", createdB[0].msgID) - - // member_added outbox (search-sync-worker federation). + // One member_added outbox to the recipient's site — carries everything + // inbox-worker needs to build the DM recipient's sub with the right shape. memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) require.Len(t, memberB, 1) var memberEnv model.OutboxEvent require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnv)) var memberPayload model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnv.Payload, &memberPayload)) + assert.Equal(t, model.RoomTypeDM, memberPayload.RoomType) + assert.Equal(t, "", memberPayload.RoomName, "DM RoomName empty per v2 cleanup") assert.ElementsMatch(t, []string{"bob"}, memberPayload.Accounts) + assert.Equal(t, "alice", memberPayload.RequesterAccount, "DM counterpart resolution depends on this") assert.Equal(t, "site-A", memberPayload.SiteID) assert.Equal(t, reqID+":site-B", memberB[0].msgID) } @@ -1076,7 +1050,7 @@ func TestSyncCreateDM_BotDM_CrossSiteOutbox(t *testing.T) { _, err := handler.handleSyncCreateDM(newIntegSyncDMCtx(), data) require.NoError(t, err) - pubs := cap.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs := cap.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) assert.Len(t, pubs, 1, "exactly one outbox to site-B") } @@ -1142,14 +1116,15 @@ func TestSyncCreateDM_CrossSite_OutboxPayloadConverges(t *testing.T) { assert.Equal(t, wantRoomID, persisted.ID) // 2. OUTBOX payload carries the same RoomID + the dedup key includes destSiteID. - pubs := cap1.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs := cap1.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) require.Len(t, pubs, 1) var env model.OutboxEvent require.NoError(t, json.Unmarshal(pubs[0].data, &env)) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, wantRoomID, payload.RoomID, "outbox RoomID must match local room.ID so remote site converges") + assert.Equal(t, model.RoomTypeDM, payload.RoomType) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Contains(t, pubs[0].msgID, "site-B", @@ -1161,7 +1136,7 @@ func TestSyncCreateDM_CrossSite_OutboxPayloadConverges(t *testing.T) { handler2 := NewHandler(store, siteID, cap2.fn(), testKeyStore, testKeySender) _, err = handler2.handleSyncCreateDM(ctx, data) require.NoError(t, err) - pubs2 := cap2.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs2 := cap2.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) require.Len(t, pubs2, 1) assert.Equal(t, pubs[0].msgID, pubs2[0].msgID, "replay must produce identical Nats-Msg-Id so broker dedup blocks duplicate cross-site events")