diff --git a/broadcast-worker/consumer_config_test.go b/broadcast-worker/consumer_config_test.go index cf4411769..131a138b3 100644 --- a/broadcast-worker/consumer_config_test.go +++ b/broadcast-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md new file mode 100644 index 000000000..4363549bb --- /dev/null +++ b/docs/superpowers/specs/2026-05-11-create-room-origin-site-mv-fix-design.md @@ -0,0 +1,267 @@ +# Create-Room Origin-Site MV Fix — Design + +**Date:** 2026-05-11 +**Status:** Draft +**Services:** `room-worker` +**Related specs:** +- `2026-04-09-room-spotlight-user-room-design.md` (user-room and spotlight collections) +- `2026-04-21-search-service-sync-worker-extension-design.md` (search-sync-worker INBOX consumer) +- `2026-05-01-federated-room-origin-site-mv-fix-design.md` (sibling fix for add/remove flows; this spec applies the same pattern to the create flow) + +## Problem + +PR #145 closed the origin-site MV gap for `member.add` / `member.remove` by adding a local `chat.inbox.{siteID}.member_added` / `member_removed` publish from `room-worker`, plus a per-remote-site `outbox.{origin}.to.{remote}.member_added` outbox event that arrives on every federated site's INBOX (via JetStream Sources + SubjectTransform) as `chat.inbox.{remote}.aggregate.member_added`. `search-sync-worker` on each site consumes both lanes and updates its `user-room-{siteID}` and `spotlight-{siteID}` ES indexes. + +The room-creation path still has the same gap. `room-worker.finishCreateRoom` writes the auto-enrolled `Subscription` rows (creator + DM recipient + every initial channel member) and emits a per-remote-site `outbox.{origin}.to.{remote}.room_created` for federation — but **never publishes a `member_added` event** on either the origin-INBOX local lane or the cross-site outbox. `search-sync-worker` never sees the create. + +Result: a freshly-created room is invisible to search until the next add/remove operation re-emits the event. + +### User-visible consequences + +1. **Spotlight (room typeahead) returns nothing for the new room.** The creator types the room name; `search-sync-worker` has no spotlight doc; no result. +2. **Cross-site message search returns empty for the new room.** CCS terms-lookup against the user's `user-room-{siteID}` doc reports the user as not subscribed; message hits are filtered out as unauthorized. +3. **Self-corrects on churn.** Both indexes catch up on the next `member.add` or `member.remove` (PR #145's publish fires). Until then, the room is silently invisible to search. + +### Concrete trace + +Alice on `s1` creates a channel `r1` with `Orgs: [eng-org]` (org expands to `[bob@s1, charlie@s1, dave@s2]`). Today: + +| Subject | Stream | Effect | +|---|---|---| +| `chat.user.{account}.event.subscription.update` × 4 | core | Frontend left-panel updates for alice, bob, charlie, dave | +| `chat.room.canonical.s1.create` (sys-message only, channel-only) | core | "alice created the room" | +| `outbox.s1.to.s2.room_created` | OUTBOX_s1 | `inbox-worker` on s2 mirrors dave's `Subscription` row | + +`s1`'s `user-room-s1` gains zero entries. `s2`'s `user-room-s2` gains zero entries. Alice CCS-querying `r1` from any site → empty result. + +## Goals + +- `user-room-{siteID}` and `spotlight-{siteID}` on the origin site **and every federated remote site** contain correct entries for every member auto-enrolled at room creation, regardless of room type. +- Fix lives **entirely in `room-worker.finishCreateRoom`** — no changes to `inbox-worker`, `search-sync-worker`, stream config, or any new model types. +- Wire format byte-for-byte compatible with PR #145's existing publishes so `search-sync-worker/inbox_stream.go::parseMemberEvent` decodes all `member_added` events identically (whether create-time or add-member, origin-local or federated-aggregate). + +## Non-Goals + +- **Backfilling pre-fix rooms.** Forward-only deployment per agreement. Rooms created before this fix lands stay missing in their MV until any later add/remove churn re-emits the event. +- **Changing `chat.user.{account}.event.subscription.update` or `chat.room.canonical.{siteID}.create`.** UI fan-out and sys-message paths are correct; not in scope. +- **Refactoring `finishCreateRoom`** beyond the two added publishes. +- **Mint-on-create for the room encryption key.** Separate concern, deferred until `ENCRYPTION_ENABLED=true` is required in prod. + +## Design + +### Why this lives in room-worker alone + +The cross-site federation path for `member_added` already exists from PR #145: + +``` +outbox.{origin}.to.{remote}.member_added + → (JetStream Sources + SubjectTransform) + → chat.inbox.{remote}.aggregate.member_added + → search-sync-worker on {remote} updates user-room-{remote}/spotlight-{remote} +``` + +`search-sync-worker` on the remote site already has `chat.inbox.{remote}.aggregate.member_added` in its consumer's `FilterSubjects`. By making `room-worker.finishCreateRoom` emit the **same** outbox event it already emits in `processAddMembers`, we reuse the entire federation lane end-to-end and `search-sync-worker` indexes the new room without any extra hop through `inbox-worker`. + +An alternative considered: have `inbox-worker.handleRoomCreated` re-emit a local `chat.inbox.{remote}.member_added` after creating the subs. Rejected because (i) it duplicates federation work `room-worker` already does for add-members; (ii) adds a second hop on the remote side; (iii) requires `inbox-worker.Handler` to grow a `publish` field and a `siteID` field with all the test churn that implies. The symmetric "publish the same outbox events as add-members" path is materially smaller. + +### NATS subjects (all already exist) + +```go +// chat.inbox.{siteID}.member_added — origin-site local lane (PR #145 added) +subject.InboxMemberAdded(siteID) + +// outbox.{origin}.to.{destSiteID}.member_added — federation lane (PR #145 added) +subject.Outbox(siteID, destSiteID, model.OutboxMemberAdded) +``` + +### Wire format + +Both publishes wrap `model.MemberAddEvent` in `model.OutboxEvent`. The local publish has `SiteID == DestSiteID == originSite` (self-loop convention); the cross-site publish has the per-remote `DestSiteID`. The inner `MemberAddEvent` carries: + +| Field | Value | +|---|---| +| `Type` | `model.OutboxMemberAdded` | +| `RoomID` | `room.ID` | +| `RoomName` | `room.Name` (empty for DM/botDM) | +| `Accounts` | Expanded individual accounts (see below) | +| `SiteID` | `room.SiteID` — the origin | +| `JoinedAt` | `req.Timestamp` | +| `HistorySharedSince` | Always `nil` — no prior history at create time | +| `Timestamp` | `now.UnixMilli()` | + +### Accounts list + +| Publish | `Accounts` source | +|---|---| +| **Origin-local INBOX** (`chat.inbox.{origin}.member_added`) | Every entry in `subs[]` (creator + every auto-enrolled member, including cross-site members for s1's own MV) | +| **Cross-site OUTBOX** (`outbox.{origin}.to.{remote}.member_added`) | Only members whose `SiteID == remote` (per-destination split, matches PR #145's batched outbox shape) | + +For channel rooms with `Orgs`, expansion has already happened in `processCreateRoomChannel` before `finishCreateRoom` runs. `subs[]` already carries expanded individual accounts. + +### Dedup IDs + +Reuse `natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed)` with seed `"{roomID}:{requesterAccount}:{timestamp}"`. PR #145 uses the identical seed shape for the add-members path; identical seed at the same `{destSiteID}` is fine because there's exactly one create per room per requester per timestamp. + +### End-to-end flow after the fix + +For `[bob@s1, charlie@s1, dave@s2]` channel-create on `s1`: + +| Subject | Stream | Effect | +|---|---|---| +| `chat.user.{account}.event.subscription.update` × 4 | core | UI fan-out (unchanged) | +| `chat.room.canonical.s1.create` (sys-message only) | core | "alice created the room" (unchanged) | +| **`chat.inbox.s1.member_added`** (NEW) | INBOX_s1 (local lane) | s1's `search-sync-worker` updates `user-room-s1` + `spotlight-s1` for alice + bob + charlie + dave | +| `outbox.s1.to.s2.room_created` (existing) | OUTBOX_s1 | s2's `inbox-worker` mirrors dave's `Subscription` row | +| **`outbox.s1.to.s2.member_added`** (NEW) | OUTBOX_s1 → INBOX_s2 aggregate lane | s2's `search-sync-worker` updates `user-room-s2` + `spotlight-s2` for dave | + +End state: every site's MV/spotlight indexes contain the new room for every locally-affected member. + +### Ordering + +Both new publishes go inside `finishCreateRoom`: + +- **Origin-local INBOX** publish: after the `subscription.update` loop, before the per-remote-site OUTBOX loop. Same position as PR #145's local INBOX publishes in `processAddMembers`/`processRemoveIndividual`/`processRemoveOrg`. +- **Cross-site OUTBOX `member_added`** publish: inside the existing `for destSiteID, accounts := range remoteSiteAccounts` loop, immediately after the existing `room_created` publish for the same dest site. + +The federation lane delivers `room_created` and `member_added` to the remote site's INBOX in publish order. `inbox-worker` (which consumes the aggregate lane via `FilterSubjects: aggregate.>`) and `search-sync-worker` (whose `FilterSubjects` matches `aggregate.member_added` but not `aggregate.room_created`) operate on disjoint event types, so the order they execute relative to each other doesn't matter — `search-sync-worker` doesn't read MongoDB and doesn't depend on `Subscription` rows existing. + +### Idempotency + +`natsutil.OutboxDedupID` produces a stable `Nats-Msg-Id` per `(room, requester, timestamp, destSiteID)`. JetStream stream-level dedup drops redeliveries within its dedup window. Beyond that window, `search-sync-worker`'s Painless last-write-wins guard makes ES replay idempotent. + +## Code Changes + +### Change 1 — `room-worker/handler.go::finishCreateRoom` (origin-local INBOX publish) + +After the existing `subscription.update` loop and channel sys-message publish, before the per-remote-site OUTBOX loop: + +```go +accounts := make([]string, 0, len(subs)) +for _, sub := range subs { + accounts = append(accounts, sub.User.Account) +} +inner := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), +} +innerData, _ := json.Marshal(inner) +outbox := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: innerData, + Timestamp: now.UnixMilli(), +} +outboxData, _ := json.Marshal(outbox) +payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) +if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, natsutil.OutboxDedupID(ctx, room.SiteID, payloadSeed)); err != nil { + slog.Error("local inbox member_added publish failed", "error", err, "roomID", room.ID, "requestID", requestID) +} +``` + +Log-and-continue on publish failure — JetStream redelivery + `search-sync-worker`'s last-write-wins guard handle transient failures self-correctingly. + +### Change 2 — `room-worker/handler.go::finishCreateRoom` (cross-site OUTBOX member_added publish) + +Inside the existing per-remote-site loop, right after the existing `room_created` publish: + +```go +memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, // per-dest accounts (loop variable) + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), +} +memberData, _ := json.Marshal(memberEvt) +memberEnvelope := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: destSiteID, + Payload: memberData, + Timestamp: now.UnixMilli(), +} +memberOutboxData, _ := json.Marshal(memberEnvelope) +memberSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) +if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), memberOutboxData, natsutil.OutboxDedupID(ctx, destSiteID, memberSeed)); err != nil { + return fmt.Errorf("publish member_added outbox to %s: %w", destSiteID, err) +} +``` + +The cross-site publish returns an error on failure (rather than log-and-continue) to match the surrounding `room_created` publish — JetStream NAKs the create, and `room-worker` redelivers from `MESSAGES_{siteID}`. + +### Change 3 — `pkg/natsutil/request_id.go::OutboxDedupID` + +Lift `room-worker`'s private `outboxDedupID` to `natsutil.OutboxDedupID` — pure logic, identical at both call sites in `room-worker` and consistent with `natsutil`'s ownership of `RequestIDFromContext` and `NewMsg`. Removes a copy I would otherwise introduce. + +### What is NOT changed + +- `pkg/subject`, `pkg/stream`, `pkg/model` — no new types/subjects. +- `inbox-worker` — untouched. It continues to consume `aggregate.room_created` for sub creation (its current job) and ignores `aggregate.member_added` for fresh rooms because the BulkCreateSubscriptions path is gated on the room having been created via `aggregate.room_created` first (sub creation happens in `handleRoomCreated`; `handleMemberAdded` adds members to an existing room). + + **Subtle:** if the remote site's `inbox-worker.handleMemberAdded` arrives **before** `handleRoomCreated` for a fresh room (out-of-order delivery), it will try to `BulkCreateSubscriptions` and either (a) the unique index `(roomId, u.account)` rejects the duplicate later when `handleRoomCreated` runs, or (b) the first delivery succeeds and the second is a no-op. Either way the end state is correct because both events carry the same `Accounts` list for the locally-resolved subset. JetStream publish order from `OUTBOX_{origin}` is preserved, so out-of-order delivery is unlikely in practice. + +- `search-sync-worker`, `message-worker`, `broadcast-worker`, `history-service` — untouched. + +### Diff size estimate + +- `room-worker/handler.go`: +~50 lines (two publish blocks). +- `pkg/natsutil/request_id.go`: +~13 lines (new `OutboxDedupID` helper, called from 9 existing sites in `room-worker`). +- `room-worker/handler.go` callers: 9 lines updated to use `natsutil.OutboxDedupID` instead of the private helper; private helper deleted. +- Tests: 3 new unit tests (2 for origin-local INBOX publish: DM + channel; 1 for cross-site OUTBOX member_added). + +## Testing + +Unit tests only. Handler tests inject `publish` as a field already; tests capture publishes and assert on the entries. + +### Unit tests — `room-worker/handler_test.go` + +- `TestProcessCreateRoom_DM_PublishesLocalInbox`: DM across sites; assert single publish to `subject.InboxMemberAdded(room.SiteID)` with both creator+recipient accounts, `RoomName` empty, `HistorySharedSince` nil, expected `Nats-Msg-Id`. + +- `TestProcessCreateRoom_Channel_PublishesLocalInbox`: channel mixed-site; assert single publish to `subject.InboxMemberAdded(room.SiteID)` with creator + every initial member (same-site + cross-site), expected `Nats-Msg-Id`. + +- `TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded`: channel with at least one cross-site member; assert single publish to `subject.Outbox(origin, remote, model.OutboxMemberAdded)` carrying only the remote-site accounts, with `DestSiteID == remote`, `RoomName` set, `HistorySharedSince` nil. Confirms the existing `room_created` outbox is still emitted on the same loop. + +### Out of scope for new tests + +- Integration tests against real NATS / Mongo — not in scope (would double diff size for marginal coverage gain). +- `search-sync-worker`'s ES write path — already covered by `search-sync-worker/inbox_integration_test.go` against the aggregate lane. + +### Coverage target + +Combined unit coverage for `finishCreateRoom` stays above the 80% project minimum. + +## Rollout + +Both changes are backward-compatible: + +- The origin-local INBOX publish is additive on the local site. +- The cross-site OUTBOX `member_added` publish is additive on the federation lane. PR #145 already established this path for add-members; remote sites' `search-sync-worker` consumers already filter for `aggregate.member_added`. + +No coordinated multi-site rollout needed. Deploy `room-worker` and the rest of the stack normally. + +### Per-site verification after deploy + +1. Create a federated room (channel or DM) with members on at least one remote site. +2. Within seconds, query each site's `user-room-{siteID}` ES index and confirm: + - The creator's doc on the origin site contains the new room ID. + - Every channel member's / DM recipient's doc on their respective home site contains the new room ID. +3. Confirm spotlight typeahead returns the new room for the creator on the origin site. + +## Observability + +- **Logs:** new publishes use `slog.Error` log-and-continue (origin local) or return-error (cross-site OUTBOX, matching surrounding `room_created` publish). Failure message: `"local inbox member_added publish failed"` (origin) or `"publish member_added outbox to %s: %w"` (cross-site). +- **Metrics:** none added. Existing JetStream stream-level metrics on `INBOX_{siteID}` and `OUTBOX_{siteID}` will show throughput on the `member_added` subject rise from "PR #145's add/remove rate" to "that plus the create rate". +- **Traces:** the new publishes inherit the request context, so OTel trace IDs propagate end-to-end (room-worker → INBOX → search-sync-worker → ES bulk write all under one trace). + +## Risks + +- **Stale spec drift if the create path grows new sub sources.** If a future change adds members to a room outside the `subs []*model.Subscription` slice passed to `finishCreateRoom`, the new INBOX publish would miss them. Mitigation: keep `subs` as the single source of truth for "who got auto-enrolled at create time". +- **Cross-site `member_added` for a brand-new room arriving before `room_created`.** Both events flow through OUTBOX in publish order, so JetStream preserves order on the federated stream — out-of-order delivery on the receiver side is theoretically possible only if `inbox-worker` parallelizes consumers across event types. Today it doesn't. If it ever does, the unique index on `(roomId, u.account)` makes the race idempotent. diff --git a/inbox-worker/consumer_config_test.go b/inbox-worker/consumer_config_test.go index 633d476a7..b756ca369 100644 --- a/inbox-worker/consumer_config_test.go +++ b/inbox-worker/consumer_config_test.go @@ -29,7 +29,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/message-gatekeeper/consumer_config_test.go b/message-gatekeeper/consumer_config_test.go index 5c5d4f8ad..a9335f6dd 100644 --- a/message-gatekeeper/consumer_config_test.go +++ b/message-gatekeeper/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/message-worker/consumer_config_test.go b/message-worker/consumer_config_test.go index b78260b79..a36ee9645 100644 --- a/message-worker/consumer_config_test.go +++ b/message-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/notification-worker/consumer_config_test.go b/notification-worker/consumer_config_test.go index f62860097..2da32ed9f 100644 --- a/notification-worker/consumer_config_test.go +++ b/notification-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/pkg/natsutil/request_id.go b/pkg/natsutil/request_id.go index e0bc702f0..e01a12b39 100644 --- a/pkg/natsutil/request_id.go +++ b/pkg/natsutil/request_id.go @@ -3,6 +3,7 @@ package natsutil import ( "context" + "log/slog" "github.com/nats-io/nats.go" ) @@ -57,3 +58,15 @@ func NewMsg(ctx context.Context, subj string, data []byte) *nats.Msg { Header: HeaderForContext(ctx), } } + +// OutboxDedupID composes a JetStream Nats-Msg-Id as base+":"+destSiteID. base +// is the X-Request-ID from ctx; falls back to payloadSeed when ctx carries no +// request ID, with a warn log so partial-deployment cases are observable. +func OutboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string { + base := RequestIDFromContext(ctx) + if base == "" { + slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID) + base = payloadSeed + } + return base + ":" + destSiteID +} diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 66996e9d4..a0dfca149 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -21,18 +21,24 @@ type ConsumerSettings struct { // DurableConsumerDefaults returns a ConsumerConfig populated from the // supplied ConsumerSettings plus the project-wide architectural -// invariants (AckPolicy=Explicit, DeliverPolicy=New). +// invariants (AckPolicy=Explicit, DeliverPolicy=All). // // Callers MUST set Durable. Callers MAY set FilterSubjects to scope the // consumer to a subset of the stream's subjects. // +// DeliverPolicy=All so a freshly-created durable (new deploy, new site, +// or a deleted-and-recreated durable) replays the stream from the start. +// search-sync-worker's MV rebuild and inbox-worker's federated catch-up +// both depend on this; for streams with no historical data (steady-state +// new sites) All and New are equivalent. +// // DeliverPolicy is honored only at consumer creation. Updating an // existing durable via js.CreateOrUpdateConsumer does not reset its // cursor position. func DurableConsumerDefaults(s ConsumerSettings) jetstream.ConsumerConfig { return jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, - DeliverPolicy: jetstream.DeliverNewPolicy, + DeliverPolicy: jetstream.DeliverAllPolicy, AckWait: s.AckWait, MaxDeliver: s.MaxDeliver, MaxWaiting: s.MaxWaiting, diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 737e5db1f..7ba8a103e 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -23,7 +23,7 @@ func TestDurableConsumerDefaults(t *testing.T) { cc := stream.DurableConsumerDefaults(s) assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy, "AckPolicy invariant") - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy, "DeliverPolicy invariant") + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy, "DeliverPolicy invariant") assert.Equal(t, 45*time.Second, cc.AckWait) assert.Equal(t, 3, cc.MaxDeliver) assert.Equal(t, 256, cc.MaxWaiting) @@ -37,7 +37,7 @@ func TestDurableConsumerDefaults(t *testing.T) { cc := stream.DurableConsumerDefaults(stream.ConsumerSettings{}) assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) assert.Zero(t, cc.AckWait) assert.Zero(t, cc.MaxDeliver) assert.Zero(t, cc.MaxWaiting) diff --git a/room-worker/consumer_config_test.go b/room-worker/consumer_config_test.go index 755f7d0f7..682db5a24 100644 --- a/room-worker/consumer_config_test.go +++ b/room-worker/consumer_config_test.go @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) t.Run("overrides flow through", func(t *testing.T) { diff --git a/room-worker/handler.go b/room-worker/handler.go index c17efda4a..81f8ca4b8 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -36,16 +36,6 @@ func NewHandler(store SubscriptionStore, siteID string, publish PublishFunc) *Ha return &Handler{store: store, siteID: siteID, publish: publish} } -// outboxDedupID composes Nats-Msg-Id as base+":"+destSiteID; base is X-Request-ID from ctx, falling back to payloadSeed when absent (partial-deployment safety). -func outboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string { - base := natsutil.RequestIDFromContext(ctx) - if base == "" { - slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID) - base = payloadSeed - } - return base + ":" + destSiteID -} - // messageDedupSeed returns the X-Request-ID from ctx, or payloadSeed when absent (partial-deployment safety, with a warn log). func messageDedupSeed(ctx context.Context, handler, roomID, payloadSeed string) string { if seed := natsutil.RequestIDFromContext(ctx); seed != "" { @@ -236,7 +226,7 @@ func (h *Handler) processRoleUpdate(ctx context.Context, data []byte) error { } outboxSubj := subject.Outbox(h.siteID, user.SiteID, "role_updated") payloadSeed := fmt.Sprintf("%s:%s:%s:%d", req.RoomID, req.Account, req.NewRole, req.Timestamp) - dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed) if err := h.publish(ctx, outboxSubj, outboxData, dedupID); err != nil { return fmt.Errorf("publish outbox: %w", err) } @@ -342,7 +332,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) } @@ -388,7 +378,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp) - dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(h.siteID, user.SiteID, "member_removed"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s: %w", user.SiteID, err) } @@ -484,7 +474,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) } } @@ -539,7 +529,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp) - dedupID := outboxDedupID(ctx, destSiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(h.siteID, destSiteID, "member_removed"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s: %w", destSiteID, err) } @@ -769,7 +759,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error } inboxData, _ := json.Marshal(inboxOutbox) inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) - if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, outboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil { + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, natsutil.OutboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil { slog.Error("local inbox member_added publish failed", "error", err, "roomID", req.RoomID) } } @@ -830,7 +820,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error } outboxData, _ := json.Marshal(outbox) payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) - dedupID := outboxDedupID(ctx, destSiteID, payloadSeed) + dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed) if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), outboxData, dedupID); err != nil { return fmt.Errorf("outbox publish to %s failed: %w", destSiteID, err) } @@ -1121,6 +1111,34 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq } } + accounts := make([]string, 0, len(subs)) + for _, sub := range subs { + accounts = append(accounts, sub.User.Account) + } + inner := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + HistorySharedSince: nil, + Timestamp: now.UnixMilli(), + } + innerData, _ := json.Marshal(inner) + outbox := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: innerData, + Timestamp: now.UnixMilli(), + } + outboxData, _ := json.Marshal(outbox) + payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, natsutil.OutboxDedupID(ctx, room.SiteID, payloadSeed)); err != nil { + slog.Error("local inbox member_added publish failed", "error", err, "roomID", room.ID, "requestID", requestID) + } + // Task 37: outbox per remote site remoteSiteAccounts := map[string][]string{} for _, u := range allUsers { @@ -1157,6 +1175,35 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxTypeRoomCreated), eData, requestID+":"+destSiteID); err != nil { return fmt.Errorf("publish room_created outbox to %s: %w", destSiteID, err) } + + // Cross-site member_added so the remote site's search-sync-worker + // updates its user-room/spotlight MV — mirrors processAddMembers' + // federation. inbox-worker still consumes the room_created above to + // build correctly-typed Subscription rows; this event only feeds the + // search index. + memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, + RoomID: room.ID, + RoomName: room.Name, + Accounts: accounts, + SiteID: room.SiteID, + JoinedAt: req.Timestamp, + HistorySharedSince: nil, + Timestamp: now.UnixMilli(), + } + memberData, _ := json.Marshal(memberEvt) + memberEnvelope := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: destSiteID, + Payload: memberData, + Timestamp: now.UnixMilli(), + } + memberOutboxData, _ := json.Marshal(memberEnvelope) + memberSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp) + if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), memberOutboxData, natsutil.OutboxDedupID(ctx, destSiteID, memberSeed)); err != nil { + return fmt.Errorf("publish member_added outbox to %s: %w", destSiteID, err) + } } return nil @@ -1439,7 +1486,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req return h.publish(ctx, subject.Outbox(room.SiteID, other.SiteID, model.OutboxTypeRoomCreated), eData, - outboxDedupID(ctx, other.SiteID, payloadSeed), + natsutil.OutboxDedupID(ctx, other.SiteID, payloadSeed), ) } diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index a689fd82f..de8f43c0e 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "slices" + "strconv" "strings" "sync" "testing" @@ -2896,3 +2897,180 @@ func TestHandleSyncCreateDM_IdempotentRecreate_UsesExistingCreatedAt(t *testing. "sub.JoinedAt must reflect existing.CreatedAt on idempotent re-delivery, not retry wall-clock") } } + +type inboxCapturedPublish struct { + subj string + data []byte + msgID string +} + +func captureInboxPublishes() (PublishFunc, func() []inboxCapturedPublish) { + var captured []inboxCapturedPublish + fn := PublishFunc(func(_ context.Context, subj string, data []byte, msgID string) error { + captured = append(captured, inboxCapturedPublish{subj: subj, data: append([]byte(nil), data...), msgID: msgID}) + return nil + }) + return fn, func() []inboxCapturedPublish { return captured } +} + +func findInboxMemberAdded(t *testing.T, captured []inboxCapturedPublish, siteID string) inboxCapturedPublish { + t.Helper() + want := subject.InboxMemberAdded(siteID) + var matches []inboxCapturedPublish + for _, p := range captured { + if p.subj == want { + matches = append(matches, p) + } + } + require.Lenf(t, matches, 1, "expected exactly 1 publish to %s, got %d", want, len(matches)) + return matches[0] +} + +func TestProcessCreateRoom_DM_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := NewMockSubscriptionStore(ctrl) + publish, getCaptured := captureInboxPublishes() + h := &Handler{store: mockStore, publish: publish, siteID: "site-A"} + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + // bob lives on site-B → cross-site DM + other := &model.User{ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-B"} + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().GetUser(gomock.Any(), "bob").Return(other, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-inbox").Return(nil) + + ts := time.Now().UnixMilli() + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-dm-inbox", + RequesterAccount: "alice", + Users: []string{"bob"}, + Timestamp: ts, + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got := findInboxMemberAdded(t, getCaptured(), "site-A") + + var outbox model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outbox)) + assert.Equal(t, "member_added", outbox.Type) + assert.Equal(t, "site-A", outbox.SiteID) + assert.Equal(t, "site-A", outbox.DestSiteID, "self-loop publish: dest must equal origin") + assert.Greater(t, outbox.Timestamp, int64(0)) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outbox.Payload, &inner)) + assert.Equal(t, "member_added", inner.Type) + assert.Equal(t, "room-dm-inbox", inner.RoomID) + assert.Empty(t, inner.RoomName, "DM rooms have no name") + assert.ElementsMatch(t, []string{"alice", "bob"}, inner.Accounts, + "DM INBOX publish must carry both creator and recipient") + assert.Equal(t, "site-A", inner.SiteID) + assert.Nil(t, inner.HistorySharedSince, "HistorySharedSince must be nil at create-time") + + wantMsgID := natsutil.OutboxDedupID(ctx, "site-A", "room-dm-inbox:alice:"+strconv.FormatInt(ts, 10)) + assert.Equal(t, wantMsgID, got.msgID, "Nats-Msg-Id must be natsutil.OutboxDedupID(ctx, originSite, payloadSeed)") +} + +func TestProcessCreateRoom_Channel_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := NewMockSubscriptionStore(ctrl) + publish, getCaptured := captureInboxPublishes() + h := &Handler{store: mockStore, publish: publish, siteID: "site-A"} + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + invited := []model.User{ + {ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-A"}, + {ID: "u_dave", Account: "dave", EngName: "Dave", ChineseName: "戴", SiteID: "site-B"}, + } + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ListNewMembersForNewRoom(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return([]string{"bob", "dave"}, nil) + mockStore.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return(invited, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().BulkCreateRoomMembers(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-ch-inbox").Return(nil) + + ts := time.Now().UnixMilli() + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-ch-inbox", Name: "Mixed", RequesterAccount: "alice", + Users: []string{"bob", "dave"}, Orgs: []string{"org1"}, + ResolvedUsers: []string{"bob", "dave"}, ResolvedOrgs: []string{"org1"}, + Timestamp: ts, + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got := findInboxMemberAdded(t, getCaptured(), "site-A") + + var outbox model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outbox)) + assert.Equal(t, "member_added", outbox.Type) + assert.Equal(t, "site-A", outbox.SiteID) + assert.Equal(t, "site-A", outbox.DestSiteID) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outbox.Payload, &inner)) + assert.Equal(t, "room-ch-inbox", inner.RoomID) + assert.Equal(t, "Mixed", inner.RoomName) + assert.ElementsMatch(t, []string{"alice", "bob", "dave"}, inner.Accounts, + "channel INBOX publish must carry creator + every auto-enrolled member (same-site + cross-site)") + assert.Equal(t, "site-A", inner.SiteID) + assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted regardless of req.History") + + wantMsgID := natsutil.OutboxDedupID(ctx, "site-A", "room-ch-inbox:alice:"+strconv.FormatInt(ts, 10)) + assert.Equal(t, wantMsgID, got.msgID) +} + +func TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded(t *testing.T) { + h, mockStore, getPublished := newCreateRoomTestHandler(t) + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "艾", SiteID: "site-A"} + invited := []model.User{ + {ID: "u_bob", Account: "bob", EngName: "Bob", ChineseName: "鮑", SiteID: "site-B"}, + } + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ListNewMembersForNewRoom(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return([]string{"bob"}, nil) + mockStore.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return(invited, nil) + mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().BulkCreateRoomMembers(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-ch-xsite").Return(nil) + + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-ch-xsite", Name: "Cross", RequesterAccount: "alice", + Users: []string{"bob"}, Orgs: []string{"org1"}, + ResolvedUsers: []string{"bob"}, ResolvedOrgs: []string{"org1"}, + Timestamp: time.Now().UnixMilli(), + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + memberAddedOutbox := outboxFor(getPublished(), "site-B", model.OutboxMemberAdded) + require.Len(t, memberAddedOutbox, 1, + "finishCreateRoom must emit outbox.{origin}.to.{remote}.member_added alongside room_created so the remote site's search-sync-worker updates its MV") + + var envelope model.OutboxEvent + require.NoError(t, json.Unmarshal(memberAddedOutbox[0].data, &envelope)) + assert.Equal(t, model.OutboxMemberAdded, envelope.Type) + assert.Equal(t, "site-A", envelope.SiteID) + assert.Equal(t, "site-B", envelope.DestSiteID) + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(envelope.Payload, &inner)) + assert.Equal(t, "room-ch-xsite", inner.RoomID) + assert.Equal(t, "Cross", inner.RoomName) + assert.Equal(t, []string{"bob"}, inner.Accounts, "carries only the remote-site accounts, mirroring processAddMembers") + assert.Equal(t, "site-A", inner.SiteID, "inner SiteID is the origin (room's home)") + assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted") + + // Sanity: the existing room_created outbox is still emitted on the same loop. + roomCreatedOutbox := outboxFor(getPublished(), "site-B", model.OutboxTypeRoomCreated) + require.Len(t, roomCreatedOutbox, 1, "room_created outbox path unchanged") +} diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index 6a95becab..746e0b9d6 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -642,17 +642,17 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Equal(t, "site-A", s.SiteID, "sub %s siteID", s.User.Account) } - // Filter outbox publishes per destination site. - pubsB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", "")) - pubsC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", "")) - pubsA := cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")) - require.Len(t, pubsB, 1, "exactly one outbox to site-B") - require.Len(t, pubsC, 1, "exactly one outbox to site-C") - assert.Empty(t, pubsA, "no outbox to home site-A") - - // Site-B payload assertions. + assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")), + "no outbox to home site-A") + + // room_created outboxes — one per remote site. + createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) + createdC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxTypeRoomCreated)) + require.Len(t, createdB, 1, "exactly one room_created outbox to site-B") + require.Len(t, createdC, 1, "exactly one room_created outbox to site-C") + var envB model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsB[0].data, &envB)) + require.NoError(t, json.Unmarshal(createdB[0].data, &envB)) var payloadB model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(envB.Payload, &payloadB)) assert.ElementsMatch(t, []string{"bob", "carol"}, payloadB.Accounts) @@ -660,19 +660,37 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Equal(t, "deal team", payloadB.RoomName) assert.Equal(t, "site-A", payloadB.HomeSiteID) assert.Equal(t, "alice", payloadB.RequesterAccount) - assert.Equal(t, reqID+":site-B", pubsB[0].msgID) + assert.Equal(t, reqID+":site-B", createdB[0].msgID) - // Site-C payload assertions. var envC model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsC[0].data, &envC)) + require.NoError(t, json.Unmarshal(createdC[0].data, &envC)) var payloadC model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(envC.Payload, &payloadC)) assert.ElementsMatch(t, []string{"ian"}, payloadC.Accounts) - assert.Equal(t, model.RoomTypeChannel, payloadC.RoomType) - assert.Equal(t, "deal team", payloadC.RoomName) - assert.Equal(t, "site-A", payloadC.HomeSiteID) - assert.Equal(t, "alice", payloadC.RequesterAccount) - assert.Equal(t, reqID+":site-C", pubsC[0].msgID) + assert.Equal(t, reqID+":site-C", createdC[0].msgID) + + // member_added outboxes — one per remote site (search-sync-worker federation). + memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) + memberC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxMemberAdded)) + require.Len(t, memberB, 1, "exactly one member_added outbox to site-B") + require.Len(t, memberC, 1, "exactly one member_added outbox to site-C") + + var memberEnvB model.OutboxEvent + require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnvB)) + var memberPayloadB model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnvB.Payload, &memberPayloadB)) + assert.ElementsMatch(t, []string{"bob", "carol"}, memberPayloadB.Accounts) + assert.Equal(t, "deal team", memberPayloadB.RoomName) + assert.Equal(t, "site-A", memberPayloadB.SiteID) + assert.Nil(t, memberPayloadB.HistorySharedSince) + assert.Equal(t, reqID+":site-B", memberB[0].msgID) + + var memberEnvC model.OutboxEvent + require.NoError(t, json.Unmarshal(memberC[0].data, &memberEnvC)) + var memberPayloadC model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnvC.Payload, &memberPayloadC)) + assert.ElementsMatch(t, []string{"ian"}, memberPayloadC.Accounts) + assert.Equal(t, reqID+":site-C", memberC[0].msgID) } func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { @@ -712,14 +730,14 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.Equal(t, "site-A", s.SiteID, "sub %s siteID", s.User.Account) } - // Only one outbox publish, to site-B. - pubsB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", "")) - require.Len(t, pubsB, 1) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", ""))) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", ""))) + // room_created outbox to the recipient's site. + createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) + require.Len(t, createdB, 1) var env model.OutboxEvent - require.NoError(t, json.Unmarshal(pubsB[0].data, &env)) + require.NoError(t, json.Unmarshal(createdB[0].data, &env)) var payload model.RoomCreatedOutbox require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, model.RoomTypeDM, payload.RoomType) @@ -727,7 +745,18 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.ElementsMatch(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, "site-A", payload.HomeSiteID) - assert.Equal(t, reqID+":site-B", pubsB[0].msgID) + assert.Equal(t, reqID+":site-B", createdB[0].msgID) + + // member_added outbox (search-sync-worker federation). + memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) + require.Len(t, memberB, 1) + var memberEnv model.OutboxEvent + require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnv)) + var memberPayload model.MemberAddEvent + require.NoError(t, json.Unmarshal(memberEnv.Payload, &memberPayload)) + assert.ElementsMatch(t, []string{"bob"}, memberPayload.Accounts) + assert.Equal(t, "site-A", memberPayload.SiteID) + assert.Equal(t, reqID+":site-B", memberB[0].msgID) } func TestProcessAddMembers_OutboxPerRemoteSite(t *testing.T) { @@ -913,7 +942,7 @@ func TestProcessAddMembers_PublishesLocalInbox_Integration(t *testing.T) { assert.ElementsMatch(t, []string{"charlie", "bob"}, inner.Accounts, "local INBOX must carry full add set — same-site (charlie) + remote (bob)") assert.Equal(t, reqID+":site-A", pubs[0].msgID, - "Nats-Msg-Id must be outboxDedupID(ctx, originSite, payloadSeed) so JetStream dedups self-loop replays") + "Nats-Msg-Id must be natsutil.OutboxDedupID(ctx, originSite, payloadSeed) so JetStream dedups self-loop replays") } func TestProcessRemoveIndividual_PublishesLocalInbox_Integration(t *testing.T) { diff --git a/search-sync-worker/consumer_config_test.go b/search-sync-worker/consumer_config_test.go index 877ea7402..3349b118d 100644 --- a/search-sync-worker/consumer_config_test.go +++ b/search-sync-worker/consumer_config_test.go @@ -59,7 +59,7 @@ func TestBuildConsumerConfig(t *testing.T) { assert.Equal(t, 30*time.Second, cc.AckWait) assert.Equal(t, 5, cc.MaxDeliver) assert.Equal(t, 512, cc.MaxWaiting) - assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy) }) } })