diff --git a/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md new file mode 100644 index 000000000..c5ae0031e --- /dev/null +++ b/docs/superpowers/specs/2026-05-12-consolidate-room-create-federation-design.md @@ -0,0 +1,247 @@ +# Consolidate Cross-Site Room-Creation Federation Event — Design + +**Date:** 2026-05-12 +**Status:** Draft +**Services:** `room-worker`, `inbox-worker`, `search-sync-worker` (test-only) +**Related specs:** +- `2026-05-01-federated-room-origin-site-mv-fix-design.md` (PR #145 — added cross-site `member_added` for add-members) +- `2026-05-11-create-room-origin-site-mv-fix-design.md` (PR #169 — added cross-site `member_added` for room creation, leaving `room_created` in place) + +## Problem + +Today every room-creation publishes two cross-site OUTBOX events per remote site: + +| Event | Consumer on remote site | Purpose | +|---|---|---| +| `outbox.{origin}.to.{remote}.room_created` | `inbox-worker.handleRoomCreated` | Build correctly-shaped `Subscription` rows (DM/botDM/channel) | +| `outbox.{origin}.to.{remote}.member_added` | `search-sync-worker` (MV update) and `inbox-worker.handleMemberAdded` (dup-key no-op for fresh rooms) | Search index update | + +The redundancy is a layering artifact: PR #142 (Vinayak, `feat(create-room): DM, botDM, and channel creation across sites`) introduced `room_created` as the cross-site federation event before the `member_added` lane existed end-to-end. PR #145 then added the `member_added` cross-site event for the add-members path, and PR #169 extended `member_added` to the create path. At that point `room_created` became redundant for everything except the per-room-type sub-shape logic inside `inbox-worker.handleMemberAdded`, which is hardcoded to `RoomType: channel`. + +### Two concrete consequences + +1. **2× outbound publish per remote site per room creation.** `room-worker.finishCreateRoom` emits both events for every remote member's site. +2. **Latent search bug in `search-sync-worker/spotlight.go`**: it writes `RoomType: string(evt.RoomType)` into the spotlight ES doc, but today's `MemberAddEvent` wire payload doesn't carry `RoomType` — so for every room indexed via `member_added` since PR #145, the spotlight doc's `roomType` field has been the empty string. Room typeahead UI loses type info. This consolidation incidentally heals the bug because room-worker will start populating `RoomType` on every `member_added` publish. + +## Goals + +- One cross-site federation event per room creation. `member_added` does double duty: drives sub creation in `inbox-worker` and MV updates in `search-sync-worker`. +- `inbox-worker.handleMemberAdded` builds the right `Subscription` shape for every room type (channel, DM, botDM), reusing the existing per-type helpers (`subscriptionName`, `rolesForType`, `subscriptionIsSubscribed`) that `handleRoomCreated` uses today. +- Full removal of the `room_created` event type, its model, its handler, and its tests. No shim, no transition flag. +- No subject or stream changes — same wire lanes, fewer wire formats. + +## Non-Goals + +- **Backward compatibility for cross-tree consumers of `RoomCreatedOutbox` / `OutboxTypeRoomCreated`.** Grep confirms no out-of-tree consumer exists today. +- **Bridging in-flight `room_created` events during the deploy window.** Both ends ship in the same release per site; any straggler federated event arriving at a new `inbox-worker` pod hits the existing `default` case in the event-type switch and logs `"unknown event type, skipping"`. The corresponding `member_added` for the same room arrives moments later and does the right thing. +- **Refactoring `MemberRemoveEvent`** the same way. Remove is already type-agnostic in `inbox-worker.handleMemberRemoved`; only `member_added` needs the per-type sub-shape logic. +- **Search-sync-worker code changes.** Zero structural change required. No new tests added; existing spotlight coverage already asserts `roomType` via `baseInboxMemberEvent`. + +## Design + +### Wire format + +Extend `model.MemberAddEvent` with two `omitempty` fields: + +```go +type MemberAddEvent struct { + Type string `json:"type"` + RoomID string `json:"roomId"` + RoomName string `json:"roomName"` + Accounts []string `json:"accounts"` + SiteID string `json:"siteId"` + JoinedAt int64 `json:"joinedAt"` + HistorySharedSince *int64 `json:"historySharedSince,omitempty"` + Timestamp int64 `json:"timestamp"` + + // NEW + RoomType RoomType `json:"roomType,omitempty"` + RequesterAccount string `json:"requesterAccount,omitempty"` +} +``` + +- `RoomType`: drives `inbox-worker.handleMemberAdded`'s sub-shape branch and `search-sync-worker.spotlight.go`'s ES doc population. Zero value (`""`) is treated as `RoomTypeChannel` for backward compatibility with any in-flight publishes during deploy. +- `RequesterAccount`: only meaningful when `RoomType` is `DM` or `BotDM` — used by `subscriptionName` to compute the counterpart account for the recipient sub's `Name`. Empty/unused for channels. + +The two added fields are `omitempty` so wire payloads from older publishers and from add-member operations (which don't need the fields) stay unchanged. + +### Publisher side — `room-worker/handler.go` + +Two publish call sites populate the new fields: + +1. **`finishCreateRoom`** — the per-remote-site `member_added` OUTBOX publish (added in #169) sets `RoomType: room.Type` and `RequesterAccount: requester.Account`. The origin-local INBOX publish does the same for consistency (search-sync-worker on the origin reads it). +2. **`finishCreateRoom`** — delete the per-remote-site `room_created` OUTBOX publish entirely (the block constructing `RoomCreatedOutbox`). +3. **`processAddMembers`** — both publish sites (local INBOX + per-remote-site OUTBOX) get `RoomType: room.Type` and `RequesterAccount: req.RequesterAccount`. Channels only, so `RequesterAccount` won't actually be read on the consumer side, but keeping it consistent across all `member_added` emissions reduces special-case surface in publisher code. + +### Consumer side — `inbox-worker/handler.go` + +`handleMemberAdded` dispatches on `event.RoomType` and uses the existing helpers, refactored to take primitives so they're callable from both old-and-removed `handleRoomCreated` and the new `handleMemberAdded` path: + +**Before** (lives in handler.go around line 227): +```go +func subscriptionName(d *model.RoomCreatedOutbox, u *model.User) string { ... } +func subscriptionIsSubscribed(d *model.RoomCreatedOutbox, u *model.User) bool { ... } +``` + +**After** (primitives, no struct dependency): +```go +func subscriptionName(roomType model.RoomType, roomName, requesterAccount string, u *model.User) string { ... } +func subscriptionIsSubscribed(roomType model.RoomType, u *model.User) bool { ... } +``` + +`rolesForType(t model.RoomType) []Role` already takes a primitive — unchanged. + +`handleMemberAdded` post-refactor: + +```go +func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) error { + var event model.MemberAddEvent + if err := json.Unmarshal(evt.Payload, &event); err != nil { + return fmt.Errorf("unmarshal member_added payload: %w", err) + } + + roomType := event.RoomType + if roomType == "" { + roomType = model.RoomTypeChannel // backward-compat for older publishers + } + + users, err := h.store.FindUsersByAccounts(ctx, event.Accounts) + if err != nil { + return fmt.Errorf("find users by accounts: %w", err) + } + userMap := make(map[string]model.User, len(users)) + for i := range users { + userMap[users[i].Account] = users[i] + } + + joinedAt := time.UnixMilli(event.JoinedAt).UTC() + var historySharedSince *time.Time + if event.HistorySharedSince != nil && *event.HistorySharedSince > 0 { + t := time.UnixMilli(*event.HistorySharedSince).UTC() + historySharedSince = &t + } + + subs := make([]*model.Subscription, 0, len(event.Accounts)) + for _, account := range event.Accounts { + u, ok := userMap[account] + if !ok { + slog.Warn("user not found for account", "account", account) + continue + } + sub := &model.Subscription{ + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: u.ID, Account: u.Account}, + RoomID: event.RoomID, + RoomType: roomType, + SiteID: event.SiteID, + Roles: rolesForType(roomType), + Name: subscriptionName(roomType, event.RoomName, event.RequesterAccount, &u), + IsSubscribed: subscriptionIsSubscribed(roomType, &u), + HistorySharedSince: historySharedSince, + JoinedAt: joinedAt, + } + subs = append(subs, sub) + } + + if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + if !mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("bulk create subscriptions: %w", err) + } + } + return nil +} +``` + +The old `handleRoomCreated` function is **deleted**. The `case model.MessageTypeRoomCreated:` arm in `HandleEvent`'s switch is also deleted. + +### Removal sweep + +- `pkg/model/event.go`: + - Delete `type RoomCreatedOutbox struct`. + - Delete `OutboxTypeRoomCreated` constant (cross-site OUTBOX event type). + - **Do not delete** `MessageTypeRoomCreated` constant — it's a distinct constant for the channel-create system-message type ("alice created the room"), used by `room-worker`'s `publishChannelSysMessages` and unrelated to federation. The two constants happen to share the string value `"room_created"` but live on different code paths. +- `inbox-worker/handler.go`: + - Delete `handleRoomCreated`. + - Delete the `case model.MessageTypeRoomCreated:` arm. + - Refactor `subscriptionName` / `subscriptionIsSubscribed` to take primitives. +- `inbox-worker/handler_test.go`: delete `TestHandleRoomCreated*` cases (5 of them per grep). Replace with new `TestHandleMemberAdded_DM*` / `TestHandleMemberAdded_BotDM*` cases. +- `inbox-worker/integration_test.go`: delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub` (2 cases). Replace with `TestHandleMemberAdded_Channel_PersistsRemoteSubs` and `TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub` going through the public `HandleEvent` entry point. +- `room-worker/handler.go::finishCreateRoom`: delete the per-remote-site `room_created` publish block. Add `RoomType` + `RequesterAccount` to the existing `member_added` publishes (both local-INBOX and cross-site OUTBOX). +- `room-worker/handler.go::processAddMembers`: add `RoomType` + `RequesterAccount` to all three publish sites (local INBOX, cross-site OUTBOX, the older add-members publish if separate). +- `room-worker/handler_test.go`: assert `RoomType` is populated on every `member_added` publish capture; delete any test that asserted on the `room_created` outbox publishes. +- `room-worker/integration_test.go`: `TestProcessCreateRoomChannel_OutboxPerRemoteSite` and `TestProcessCreateRoomDM_OutboxToCounterpartSite` updated to assert only one outbox per dest site (the `member_added`), drop the `room_created` assertions. + +### Search-sync-worker impact + +Zero structural change. `model.InboxMemberEvent` (the struct `search-sync-worker.parseMemberEvent` decodes into) already has `RoomType` — it's the publisher side that didn't populate it. Once `room-worker` starts setting `RoomType` on every `member_added` publish, the existing `string(evt.RoomType)` write at `spotlight.go:120` produces the correct value for the first time. + +No new search-sync-worker tests needed. The existing `TestSpotlightCollection_BuildAction_MemberAdded` already asserts `assert.Equal(t, "channel", doc["roomType"])` via `baseInboxMemberEvent()` (which sets `RoomType: model.RoomTypeChannel`), so the regression guard for the latent spotlight-roomType-empty bug is already in place. + +### Idempotency + +The unique index on `subscriptions.(roomId, u.account)` keeps concurrent and redelivered creates idempotent. `handleMemberAdded` swallows `mongo.IsDuplicateKeyError` (see the bulk-create branch in the code example above) and continues, so JetStream replays of `member_added` after a crashed prior delivery do not nak-loop. Non-duplicate errors propagate so JetStream retries until success or `MaxDeliver` is hit. + +## Testing + +Unit + integration coverage, per spec. + +### `inbox-worker/handler_test.go` + +New tests covering per-room-type sub-shape via `handleMemberAdded`: + +- `TestHandleMemberAdded_Channel_BuildsChannelSub` (regression — covers the previous behavior, with `RoomType` field set explicitly on the event). +- `TestHandleMemberAdded_Channel_DefaultsWhenRoomTypeEmpty` — backward-compat: event with empty `RoomType` is treated as channel (older publishers). +- `TestHandleMemberAdded_DM_BuildsRecipientSubWithCounterpartName` — asserts `Name = RequesterAccount`, `Roles = nil`, `IsSubscribed = false`. +- `TestHandleMemberAdded_BotDM_BuildsBotSub` — asserts `IsSubscribed` depends on `isBot(u.Account)`. +- `TestHandleMemberAdded_DuplicateKey_IsIdempotent` — bulk-create returns dup-key; handler returns nil. + +Delete `TestHandleRoomCreated*` cases (5 total per current `grep`). + +### `inbox-worker/integration_test.go` + +Delete `TestHandleRoomCreatedPersistsRemoteSubs` and `TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub`. Replace with `TestHandleMemberAdded_Channel_PersistsRemoteSubs` and `TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub` — both go through the public `HandleEvent` entry point with `member_added` payloads carrying the new `RoomType` + `RequesterAccount` fields, and assert the persisted Subscription rows have the right shape. + +### `room-worker/handler_test.go` + +Update existing PR #169 tests to assert `RoomType` and `RequesterAccount` are populated on the captured publishes. New test: + +- `TestProcessCreateRoom_DM_OutboxCarriesRoomTypeAndRequester` — DM create across sites, captures the cross-site `member_added` publish, asserts inner `RoomType = DM` and `RequesterAccount = alice`. + +### `room-worker/integration_test.go` + +- `TestProcessCreateRoomChannel_OutboxPerRemoteSite` — drop the `room_created` outbox assertions. Assert exactly one outbox per dest site (`member_added`), with `RoomType=channel` on the payload. +- `TestProcessCreateRoomDM_OutboxToCounterpartSite` — same shape. Assert `RoomType=DM`, `RequesterAccount=alice` on the payload. + +### `search-sync-worker/spotlight_test.go` + +No new tests. The existing `TestSpotlightCollection_BuildAction_MemberAdded` already asserts `doc["roomType"] == "channel"` via `baseInboxMemberEvent()`, so the regression guard for the latent spotlight-roomType-empty bug is in place without code change. + +## Rollout + +Single PR, both publisher and consumer changes shipped together. + +**Pre-production context**: cross-site federation is not yet integrated end-to-end in any deployed environment. No live cross-site DM traffic exists today, so the theoretical mixed-version DM-sub-malformation window during a mid-deploy creation has no real-world incidence. This rollout plan is therefore straightforward — when federation does go live in prod, both this PR and its schema extension will already be in place everywhere. + +### Deploy order (defensive, not strictly required) + +For belt-and-suspenders correctness in case federation traffic does materialize during a rolling deploy, prefer deploying `room-worker` ahead of `inbox-worker` per site. This ensures any new `member_added` events on the wire already carry `RoomType` + `RequesterAccount` before the consumer side starts dispatching on those fields. + +In-flight `room_created` events that arrive at the new `inbox-worker` (because they were redelivered from before its deploy) hit the existing `default` case in the event-type switch → `slog.Warn("unknown event type, skipping")` + ack. No nak-loop, no behavior regression. + +### Per-site verification after deploy + +1. Create a federated DM (alice@s1 → bob@s2). Verify on s2 the recipient's `Subscription` doc has `Name = "alice"`, `Roles = nil`, `IsSubscribed = false`. +2. Create a federated channel with one cross-site member. Verify on the remote site the member's `Subscription` doc has `Name = roomName`, `Roles = ["member"]`, `IsSubscribed = false`. +3. Query the spotlight ES index for the new rooms — confirm `roomType` is `"channel"` / `"dm"` / `"botDM"` (not empty). +4. `nats stream info OUTBOX_{site}` should show `member_added` events flowing at room-creation rate; `room_created` subject should report zero new messages. + +## Observability + +- **Logs**: one new `slog.Warn` per straggler `room_created` event during the deploy window — expected, transient. After deploy completes, zero. +- **Metrics**: existing JetStream metrics on `OUTBOX_{site}` will show `member_added` subject throughput rise (it now carries create-time too), `room_created` subject throughput drop to zero. +- **Traces**: unchanged — `member_added` was already on the trace path for create via PR #169. + +## Risks + +- **Mixed-version deploy window** (theoretical, no real-world incidence today). Cross-site federation is not yet integrated end-to-end, so no DM federation traffic exists during a rolling deploy of this PR. If federation traffic did flow during the window, a DM creation initiated by a new `room-worker` pod could be consumed by an old `inbox-worker` pod's channel-hardcoded `handleMemberAdded` and produce a recipient sub with `Name=""` (empty counterpart name). The defect would be bounded to the deploy duration and recoverable via either a manual script that re-derives `Name` from the room's home-site sub, or by churn (any later add-member re-emits with correct schema). Not blocking under current operational reality. +- **Removal of `RoomCreatedOutbox` is a one-way ratchet.** No out-of-tree consumer exists today (`grep` confirms). If we ever want to reintroduce the dedicated event, the cost is just adding the type back; the lane is unchanged. diff --git a/inbox-worker/handler.go b/inbox-worker/handler.go index defa38f1f..3fa5cf91c 100644 --- a/inbox-worker/handler.go +++ b/inbox-worker/handler.go @@ -3,15 +3,15 @@ package main import ( "context" "encoding/json" - "errors" "fmt" "log/slog" "strings" "time" + "go.mongodb.org/mongo-driver/v2/mongo" + "github.com/hmchangw/chat/pkg/idgen" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" ) // InboxStore abstracts the data store operations needed by the inbox worker. @@ -61,8 +61,6 @@ func (h *Handler) HandleEvent(ctx context.Context, data []byte) error { return h.handleSubscriptionRead(ctx, &evt) case "thread_subscription_upserted": return h.handleThreadSubscriptionUpserted(ctx, &evt) - case model.MessageTypeRoomCreated: - return h.handleRoomCreated(ctx, &evt) default: slog.Warn("unknown event type, skipping", "type", evt.Type) return nil @@ -75,6 +73,11 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) return fmt.Errorf("unmarshal member_added payload: %w", err) } + roomType := event.RoomType + if roomType == "" { + roomType = model.RoomTypeChannel + } + users, err := h.store.FindUsersByAccounts(ctx, event.Accounts) if err != nil { return fmt.Errorf("find users by accounts: %w", err) @@ -98,25 +101,28 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) slog.Warn("user not found for account", "account", account) continue } - // RoomType is fixed to channel: cross-site member_added events only - // originate from rooms that support add-member (channel/discussion), - // never from DM/botDM. sub := &model.Subscription{ ID: idgen.GenerateUUIDv7(), User: model.SubscriptionUser{ID: user.ID, Account: user.Account}, RoomID: event.RoomID, - RoomType: model.RoomTypeChannel, + RoomType: roomType, SiteID: event.SiteID, - Roles: []model.Role{model.RoleMember}, - Name: event.RoomName, + Roles: rolesForType(roomType), + Name: subscriptionName(roomType, event.RoomName, event.RequesterAccount), + IsSubscribed: subscriptionIsSubscribed(roomType, &user), HistorySharedSince: historySharedSince, JoinedAt: joinedAt, } subs = append(subs, sub) } + if len(subs) == 0 { + return nil + } if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subscriptions: %w", err) + if !mongo.IsDuplicateKeyError(err) { + return fmt.Errorf("bulk create subscriptions: %w", err) + } } // No SubscriptionUpdateEvent is published here — room-worker already publishes @@ -209,9 +215,6 @@ func (h *Handler) handleThreadSubscriptionUpserted(ctx context.Context, evt *mod return nil } -// errPermanent signals a non-retryable error; callers should Ack and move on. -var errPermanent = errors.New("permanent") - func rolesForType(t model.RoomType) []model.Role { if t == model.RoomTypeChannel { return []model.Role{model.RoleMember} @@ -219,13 +222,12 @@ func rolesForType(t model.RoomType) []model.Role { return nil } -func subscriptionName(d *model.RoomCreatedOutbox, u *model.User) string { - switch d.RoomType { +func subscriptionName(roomType model.RoomType, roomName, requesterAccount string) string { + switch roomType { case model.RoomTypeChannel, model.RoomTypeDiscussion: - return d.RoomName + return roomName case model.RoomTypeDM, model.RoomTypeBotDM: - // On the remote site, the "other party" relative to u is the requester. - return d.RequesterAccount + return requesterAccount } return "" } @@ -236,70 +238,9 @@ func isBot(account string) bool { return strings.HasSuffix(account, ".bot") || strings.HasPrefix(account, "p_") } -func subscriptionIsSubscribed(d *model.RoomCreatedOutbox, u *model.User) bool { - if d.RoomType != model.RoomTypeBotDM { +func subscriptionIsSubscribed(roomType model.RoomType, u *model.User) bool { + if roomType != model.RoomTypeBotDM { return false } return !isBot(u.Account) } - -func (h *Handler) handleRoomCreated(ctx context.Context, evt *model.OutboxEvent) error { - requestID := natsutil.RequestIDFromContext(ctx) - if requestID == "" { - return fmt.Errorf("missing X-Request-ID: %w", errPermanent) - } - - var data model.RoomCreatedOutbox - if err := json.Unmarshal(evt.Payload, &data); err != nil { - return fmt.Errorf("unmarshal room_created payload: %w: %w", err, errPermanent) - } - if len(data.Accounts) == 0 { - slog.Warn("room_created event with empty Accounts list", - "requestId", requestID, "roomId", data.RoomID) - return nil - } - - users, err := h.store.FindUsersByAccounts(ctx, data.Accounts) - if err != nil { - return fmt.Errorf("find users by accounts: %w", err) - } - // FindUsersByAccounts can return a subset; treat any account in - // data.Accounts that didn't come back as a hard failure rather than - // silently materializing partial remote-side state with no retry signal. - userByAccount := make(map[string]model.User, len(users)) - for i := range users { - userByAccount[users[i].Account] = users[i] - } - for _, account := range data.Accounts { - if _, ok := userByAccount[account]; !ok { - return fmt.Errorf("find users by accounts: missing account %q (room %s home %s)", - account, data.RoomID, data.HomeSiteID) - } - } - - acceptedAt := time.UnixMilli(data.Timestamp).UTC() - subs := make([]*model.Subscription, 0, len(data.Accounts)) - for _, account := range data.Accounts { - u := userByAccount[account] - sub := &model.Subscription{ - ID: idgen.GenerateUUIDv7(), - User: model.SubscriptionUser{ID: u.ID, Account: u.Account}, - RoomID: data.RoomID, - SiteID: data.HomeSiteID, - Roles: rolesForType(data.RoomType), - Name: subscriptionName(&data, &u), - RoomType: data.RoomType, - IsSubscribed: subscriptionIsSubscribed(&data, &u), - JoinedAt: acceptedAt, - } - subs = append(subs, sub) - } - - if len(subs) == 0 { - return nil - } - if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { - return fmt.Errorf("bulk create subs: %w", err) - } - return nil -} diff --git a/inbox-worker/handler_test.go b/inbox-worker/handler_test.go index f7c846813..7781b6ddc 100644 --- a/inbox-worker/handler_test.go +++ b/inbox-worker/handler_test.go @@ -10,10 +10,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/mongo" "github.com/hmchangw/chat/pkg/idgen" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" ) // --- In-memory InboxStore stub --- @@ -35,6 +35,7 @@ type stubInboxStore struct { mu sync.Mutex subscriptions []model.Subscription bulkSubscriptions []*model.Subscription + bulkCreateErr error rooms []model.Room roleUpdates []roleUpdate users []model.User @@ -132,6 +133,9 @@ func (s *stubInboxStore) FindUsersByAccounts(_ context.Context, accounts []strin func (s *stubInboxStore) BulkCreateSubscriptions(_ context.Context, subs []*model.Subscription) error { s.mu.Lock() defer s.mu.Unlock() + if s.bulkCreateErr != nil { + return s.bulkCreateErr + } s.bulkSubscriptions = append(s.bulkSubscriptions, subs...) for _, sub := range subs { s.subscriptions = append(s.subscriptions, *sub) @@ -997,91 +1001,93 @@ func TestRolesForType(t *testing.T) { } func TestSubscriptionName(t *testing.T) { - d := model.RoomCreatedOutbox{ - RoomType: model.RoomTypeChannel, - RoomName: "deal team", - RequesterAccount: "alice", - } - assert.Equal(t, "deal team", subscriptionName(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeDM - assert.Equal(t, "alice", subscriptionName(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeBotDM - assert.Equal(t, "alice", subscriptionName(&d, &model.User{Account: "weather.bot"})) + assert.Equal(t, "deal team", subscriptionName(model.RoomTypeChannel, "deal team", "alice")) + assert.Equal(t, "alice", subscriptionName(model.RoomTypeDM, "", "alice")) + assert.Equal(t, "alice", subscriptionName(model.RoomTypeBotDM, "", "alice")) + assert.Equal(t, "", subscriptionName(model.RoomType(""), "ignored", "alice")) } func TestSubscriptionIsSubscribed(t *testing.T) { - d := model.RoomCreatedOutbox{RoomType: model.RoomTypeChannel} - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeDM - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "bob"})) - - d.RoomType = model.RoomTypeBotDM - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "weather.bot"})) - assert.True(t, subscriptionIsSubscribed(&d, &model.User{Account: "alice"})) - // p_ webhook bots: same as .bot — bot side gets IsSubscribed=false. - assert.False(t, subscriptionIsSubscribed(&d, &model.User{Account: "p_webhook"})) -} - -func TestHandleRoomCreatedRequiresRequestID(t *testing.T) { - store := &stubInboxStore{} - h := NewHandler(store) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r1", RoomType: model.RoomTypeChannel, - Accounts: []string{"bob"}, - }) - err := h.handleRoomCreated(context.Background(), &model.OutboxEvent{Payload: payload}) - require.Error(t, err) - assert.Contains(t, err.Error(), "missing X-Request-ID") + assert.False(t, subscriptionIsSubscribed(model.RoomTypeChannel, &model.User{Account: "bob"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeDM, &model.User{Account: "bob"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "weather.bot"})) + assert.True(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "alice"})) + assert.False(t, subscriptionIsSubscribed(model.RoomTypeBotDM, &model.User{Account: "p_webhook"})) } -func TestHandleRoomCreatedEmptyAccountsAcksWithWarn(t *testing.T) { - store := &stubInboxStore{} - h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r1", RoomType: model.RoomTypeChannel, Accounts: []string{}, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) -} - -func TestHandleRoomCreatedDMBuildsRemoteSub(t *testing.T) { +func TestHandleMemberAdded_DM_BuildsRecipientSubWithCounterpartName(t *testing.T) { store := &stubInboxStore{ users: []model.User{ {ID: "u_bob", Account: "bob", SiteID: "site-B"}, }, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "u_aliceu_bob", - RoomType: model.RoomTypeDM, RoomName: "", - HomeSiteID: "site-A", + RoomType: model.RoomTypeDM, Accounts: []string{"bob"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: 1740000000000, Timestamp: 1740000000000, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) subs := store.bulkSubscriptions require.Len(t, subs, 1) assert.True(t, idgen.IsValidUUIDv7(subs[0].ID)) assert.Equal(t, "u_aliceu_bob", subs[0].RoomID) assert.Equal(t, "site-A", subs[0].SiteID) - assert.Equal(t, "alice", subs[0].Name) - assert.Nil(t, subs[0].Roles) + assert.Equal(t, "alice", subs[0].Name, "DM recipient sub.Name is the requester (counterpart) account") + assert.Nil(t, subs[0].Roles, "DM has no roles") assert.False(t, subs[0].IsSubscribed) assert.Equal(t, model.RoomTypeDM, subs[0].RoomType) } -func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { +func TestHandleMemberAdded_BotDM_BuildsBotSub(t *testing.T) { + // Cross-site botDM: human (alice) is the requester on site-A; bot + // (weather.bot) lives on site-B. Bot's sub on site-B should have + // Name = human's account, IsSubscribed = false (bot side). + store := &stubInboxStore{ + users: []model.User{ + {ID: "u_weather", Account: "weather.bot", SiteID: "site-B"}, + }, + } + h := NewHandler(store) + + change := model.MemberAddEvent{ + Type: "member_added", + RoomID: "u_aliceu_weather", + RoomName: "", + RoomType: model.RoomTypeBotDM, + Accounts: []string{"weather.bot"}, + SiteID: "site-A", + RequesterAccount: "alice", + JoinedAt: 1740000000000, + Timestamp: 1740000000000, + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) + + subs := store.bulkSubscriptions + require.Len(t, subs, 1) + assert.Equal(t, "alice", subs[0].Name) + assert.Nil(t, subs[0].Roles) + assert.False(t, subs[0].IsSubscribed, "bot account on the bot side never has IsSubscribed=true") + assert.Equal(t, model.RoomTypeBotDM, subs[0].RoomType) +} + +func TestHandleMemberAdded_Channel_BuildsChannelSub(t *testing.T) { store := &stubInboxStore{ users: []model.User{ {ID: "u_bob", Account: "bob", SiteID: "site-B"}, @@ -1089,19 +1095,23 @@ func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { }, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", - RoomType: model.RoomTypeChannel, RoomName: "deal team", - HomeSiteID: "site-A", + RoomType: model.RoomTypeChannel, Accounts: []string{"bob", "ian"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: 1, Timestamp: 1, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", SiteID: "site-A", DestSiteID: "site-B", Payload: changeData} + evtData, _ := json.Marshal(evt) + + require.NoError(t, h.HandleEvent(context.Background(), evtData)) subs := store.bulkSubscriptions require.Len(t, subs, 2) @@ -1113,11 +1123,11 @@ func TestHandleRoomCreatedChannelBulkInsert(t *testing.T) { } } -func TestHandleMemberAddedSetsNameAndRoomType(t *testing.T) { +func TestHandleMemberAdded_EmptyRoomType_DefaultsToChannel(t *testing.T) { + // Backward-compat: events from older publishers don't carry RoomType. + // handleMemberAdded must treat empty as channel. store := &stubInboxStore{ - users: []model.User{ - {ID: "u_bob", Account: "bob", SiteID: "site-B"}, - }, + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, } h := NewHandler(store) @@ -1127,66 +1137,56 @@ func TestHandleMemberAddedSetsNameAndRoomType(t *testing.T) { RoomName: "deal team", Accounts: []string{"bob"}, SiteID: "site-A", - JoinedAt: 1740000000000, - Timestamp: 1740000000000, + JoinedAt: 1, + Timestamp: 1, + // RoomType intentionally left empty } - changeData, err := json.Marshal(change) - require.NoError(t, err) - - evt := model.OutboxEvent{ - Type: "member_added", - SiteID: "site-A", - DestSiteID: "site-B", - Payload: changeData, - } - evtData, err := json.Marshal(evt) - require.NoError(t, err) + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", Payload: changeData} + evtData, _ := json.Marshal(evt) require.NoError(t, h.HandleEvent(context.Background(), evtData)) - subs := store.getSubscriptions() + subs := store.bulkSubscriptions require.Len(t, subs, 1) - assert.Equal(t, "deal team", subs[0].Name) - assert.Equal(t, model.RoomTypeChannel, subs[0].RoomType) + assert.Equal(t, model.RoomTypeChannel, subs[0].RoomType, "empty RoomType must default to channel") + assert.Equal(t, []model.Role{model.RoleMember}, subs[0].Roles) } -func TestHandleRoomCreatedBotDMBuildsRemoteBotSub(t *testing.T) { - // Cross-site botDM: human (alice) is the requester on site-A; bot - // (weather.bot) lives on site-B. The outbox event lands at site-B's - // inbox-worker, which must materialize the bot's sub with: - // Name = human's account ("alice") - // IsSubscribed = false - // Roles = nil (no member role for botDM) - // SiteID = home site (site-A) +func TestHandleMemberAdded_DuplicateKey_IsIdempotent(t *testing.T) { store := &stubInboxStore{ - users: []model.User{ - {ID: "u_weather", Account: "weather.bot", SiteID: "site-B"}, - }, + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, + bulkCreateErr: mongo.WriteException{WriteErrors: []mongo.WriteError{{Code: 11000, Message: "duplicate key"}}}, } h := NewHandler(store) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx := natsutil.WithRequestID(context.Background(), reqID) - payload, _ := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "u_aliceu_weather", - RoomType: model.RoomTypeBotDM, - RoomName: "", - HomeSiteID: "site-A", - Accounts: []string{"weather.bot"}, - RequesterAccount: "alice", - Timestamp: 1740000000000, - }) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", RoomType: model.RoomTypeChannel, + Accounts: []string{"bob"}, SiteID: "site-A", JoinedAt: 1, Timestamp: 1, + } + changeData, _ := json.Marshal(change) + evt := model.OutboxEvent{Type: "member_added", Payload: changeData} + evtData, _ := json.Marshal(evt) - subs := store.bulkSubscriptions - require.Len(t, subs, 1, "exactly one remote sub for the bot") - assert.True(t, idgen.IsValidUUIDv7(subs[0].ID)) - assert.Equal(t, "u_aliceu_weather", subs[0].RoomID) - assert.Equal(t, "site-A", subs[0].SiteID, "bot's sub.siteID is the room's home site") - assert.Equal(t, "alice", subs[0].Name, "bot's sub.Name is the human account") - assert.Nil(t, subs[0].Roles) - assert.False(t, subs[0].IsSubscribed) - assert.Equal(t, model.RoomTypeBotDM, subs[0].RoomType) - assert.Equal(t, "u_weather", subs[0].User.ID) - assert.Equal(t, "weather.bot", subs[0].User.Account) + require.NoError(t, h.HandleEvent(context.Background(), evtData), + "duplicate-key on bulk-create must be swallowed (replay after prior delivery is idempotent)") +} + +func TestHandleMemberAdded_BulkCreate_NonDuplicateError_ReturnsError(t *testing.T) { + store := &stubInboxStore{ + users: []model.User{{ID: "u_bob", Account: "bob", SiteID: "site-B"}}, + bulkCreateErr: fmt.Errorf("connection refused"), + } + h := NewHandler(store) + + change := model.MemberAddEvent{ + Type: "member_added", RoomID: "r1", RoomType: model.RoomTypeChannel, + Accounts: []string{"bob"}, SiteID: "site-A", JoinedAt: 1, Timestamp: 1, + } + changeData, _ := json.Marshal(change) + evtData, _ := json.Marshal(model.OutboxEvent{Type: "member_added", Payload: changeData}) + + err := h.HandleEvent(context.Background(), evtData) + require.Error(t, err) + assert.Contains(t, err.Error(), "bulk create subscriptions") } diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index dd35fe22d..8eb8599e6 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -18,7 +18,6 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo" "github.com/hmchangw/chat/pkg/model" - "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/stream" "github.com/hmchangw/chat/pkg/subject" "github.com/hmchangw/chat/pkg/testutil" @@ -490,7 +489,7 @@ func newIntegrationHandler(t *testing.T, db *mongo.Database) *Handler { return NewHandler(store) } -func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { +func TestHandleMemberAdded_Channel_PersistsRemoteSubs(t *testing.T) { ctx := context.Background() db := setupMongo(t) mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", @@ -499,27 +498,32 @@ func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { SiteID: "site-B", EngName: "Ian", ChineseName: "伊恩"}) h := newIntegrationHandler(t, db) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx = natsutil.WithRequestID(ctx, reqID) - payload, err := json.Marshal(model.RoomCreatedOutbox{ - RoomID: "r_xyz", RoomType: model.RoomTypeChannel, - RoomName: "deal team", HomeSiteID: "site-A", + payload, err := json.Marshal(model.MemberAddEvent{ + Type: "member_added", + RoomID: "r_xyz", + RoomName: "deal team", + RoomType: model.RoomTypeChannel, Accounts: []string{"bob", "ian"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: time.Now().UTC().UnixMilli(), Timestamp: time.Now().UTC().UnixMilli(), }) require.NoError(t, err) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + evt, err := json.Marshal(model.OutboxEvent{ + Type: "member_added", + SiteID: "site-A", + DestSiteID: "site-B", + Payload: payload, + }) + require.NoError(t, err) + require.NoError(t, h.HandleEvent(ctx, evt)) subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": "r_xyz"}) require.NoError(t, err) assert.Equal(t, int64(2), subCount) - roomCount, err := db.Collection("rooms").CountDocuments(ctx, bson.M{"_id": "r_xyz"}) - require.NoError(t, err) - assert.Equal(t, int64(0), roomCount, "inbox-worker must not create room mirror") - var bobSub model.Subscription require.NoError(t, db.Collection("subscriptions").FindOne(ctx, bson.M{"roomId": "r_xyz", "u.account": "bob"}).Decode(&bobSub)) @@ -528,42 +532,45 @@ func TestHandleRoomCreatedPersistsRemoteSubs(t *testing.T) { assert.Equal(t, model.RoomTypeChannel, bobSub.RoomType) } -func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) { +func TestHandleMemberAdded_DM_PersistsRemoteCounterpartSub(t *testing.T) { ctx := context.Background() db := setupMongo(t) mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", SiteID: "site-B", EngName: "Bob", ChineseName: "鲍勃"}) h := newIntegrationHandler(t, db) - const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" - ctx = natsutil.WithRequestID(ctx, reqID) const roomID = "u_aliceu_bob" - payload, err := json.Marshal(model.RoomCreatedOutbox{ + payload, err := json.Marshal(model.MemberAddEvent{ + Type: "member_added", RoomID: roomID, - RoomType: model.RoomTypeDM, RoomName: "", - HomeSiteID: "site-A", + RoomType: model.RoomTypeDM, Accounts: []string{"bob"}, + SiteID: "site-A", RequesterAccount: "alice", + JoinedAt: time.Now().UTC().UnixMilli(), Timestamp: time.Now().UTC().UnixMilli(), }) require.NoError(t, err) - require.NoError(t, h.handleRoomCreated(ctx, &model.OutboxEvent{Payload: payload})) + evt, err := json.Marshal(model.OutboxEvent{ + Type: "member_added", + SiteID: "site-A", + DestSiteID: "site-B", + Payload: payload, + }) + require.NoError(t, err) + require.NoError(t, h.HandleEvent(ctx, evt)) subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": roomID}) require.NoError(t, err) assert.Equal(t, int64(1), subCount) - roomCount, err := db.Collection("rooms").CountDocuments(ctx, bson.M{"_id": roomID}) - require.NoError(t, err) - assert.Equal(t, int64(0), roomCount, "inbox-worker must not create room mirror") - var bobSub model.Subscription require.NoError(t, db.Collection("subscriptions").FindOne(ctx, bson.M{"roomId": roomID, "u.account": "bob"}).Decode(&bobSub)) assert.Equal(t, "bob", bobSub.User.Account) - assert.Equal(t, "alice", bobSub.Name, "DM Subscription.Name = counterpart account") + assert.Equal(t, "alice", bobSub.Name, "DM Subscription.Name = counterpart account (the requester)") assert.Equal(t, "site-A", bobSub.SiteID, "sub SiteID is room's home, not this site") assert.Equal(t, model.RoomTypeDM, bobSub.RoomType) assert.Nil(t, bobSub.Roles, "DMs have no roles") diff --git a/pkg/model/event.go b/pkg/model/event.go index 0e48f924c..afea3ae8d 100644 --- a/pkg/model/event.go +++ b/pkg/model/event.go @@ -110,8 +110,10 @@ type MemberAddEvent struct { Type string `json:"type" bson:"type"` RoomID string `json:"roomId" bson:"roomId"` RoomName string `json:"roomName" bson:"roomName"` + RoomType RoomType `json:"roomType,omitempty" bson:"roomType,omitempty"` Accounts []string `json:"accounts" bson:"accounts"` SiteID string `json:"siteId" bson:"siteId"` + RequesterAccount string `json:"requesterAccount,omitempty" bson:"requesterAccount,omitempty"` JoinedAt int64 `json:"joinedAt" bson:"joinedAt"` HistorySharedSince *int64 `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"` Timestamp int64 `json:"timestamp" bson:"timestamp"` @@ -202,13 +204,6 @@ const ( MessageTypeMembersAdded = "members_added" ) -const ( - // OutboxTypeRoomCreated is the cross-site outbox event type emitted when a room is created. - // Distinct from MessageTypeRoomCreated (system-message type) so destination sites can - // route on event semantics without collision. - OutboxTypeRoomCreated = "room_created" -) - const ( // AsyncJobStatusOK indicates a successful async job result. AsyncJobStatusOK = "ok" @@ -225,14 +220,3 @@ type CreateRoomReply struct { // CreateRoomReplyAccepted means validated + queued; persistence happens later in room-worker. const CreateRoomReplyAccepted = "accepted" - -// RoomCreatedOutbox is the cross-site payload (wrapped in OutboxEvent) when a remote member exists. -type RoomCreatedOutbox struct { - RoomID string `json:"roomId"` - RoomType RoomType `json:"roomType"` - RoomName string `json:"roomName"` - HomeSiteID string `json:"homeSiteId"` - Accounts []string `json:"accounts"` - RequesterAccount string `json:"requesterAccount"` - Timestamp int64 `json:"timestamp"` -} diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index 550a5bf99..0afdd78c6 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -1910,25 +1910,6 @@ func TestCreateRoomRequestRoundtrip(t *testing.T) { assert.Equal(t, int64(1740000000000), dst.Timestamp) } -func TestRoomCreatedOutboxRoundtrip(t *testing.T) { - out := model.RoomCreatedOutbox{ - RoomID: "r1", - RoomType: model.RoomTypeChannel, - RoomName: "deal team", - HomeSiteID: "site-A", - Accounts: []string{"bob", "ian"}, - RequesterAccount: "alice", - Timestamp: 1740000000000, - } - data, err := json.Marshal(&out) - require.NoError(t, err) - var dst model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(data, &dst)) - assert.Equal(t, model.RoomTypeChannel, dst.RoomType) - assert.Equal(t, []string{"bob", "ian"}, dst.Accounts) - assert.NotContains(t, string(data), "appName") -} - func TestErrorResponseRoomIDOmitempty(t *testing.T) { er := model.ErrorResponse{Error: "internal"} body, err := json.Marshal(er) diff --git a/room-worker/handler.go b/room-worker/handler.go index f956d9add..e0fa8040e 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -842,8 +842,11 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error memberAddEvt := model.MemberAddEvent{ Type: "member_added", RoomID: req.RoomID, + RoomName: room.Name, + RoomType: room.Type, Accounts: actualAccounts, SiteID: room.SiteID, + RequesterAccount: req.RequesterAccount, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), @@ -911,8 +914,10 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error Type: "member_added", RoomID: req.RoomID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: req.RequesterAccount, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), @@ -1233,8 +1238,10 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq Type: model.OutboxMemberAdded, RoomID: room.ID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: requester.Account, JoinedAt: req.Timestamp, HistorySharedSince: nil, Timestamp: now.UnixMilli(), @@ -1263,45 +1270,14 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq remoteSiteAccounts[u.SiteID] = append(remoteSiteAccounts[u.SiteID], u.Account) } for destSiteID, accounts := range remoteSiteAccounts { - payload := model.RoomCreatedOutbox{ - RoomID: room.ID, - RoomType: room.Type, - RoomName: room.Name, - HomeSiteID: room.SiteID, - Accounts: accounts, - RequesterAccount: requester.Account, - Timestamp: req.Timestamp, - } - pData, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("marshal room_created outbox payload: %w", err) - } - envelope := model.OutboxEvent{ - Type: model.OutboxTypeRoomCreated, - SiteID: room.SiteID, - DestSiteID: destSiteID, - Payload: pData, - Timestamp: now.UnixMilli(), - } - eData, err := json.Marshal(envelope) - if err != nil { - return fmt.Errorf("marshal outbox envelope: %w", err) - } - if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxTypeRoomCreated), eData, requestID+":"+destSiteID); err != nil { - return fmt.Errorf("publish room_created outbox to %s: %w", destSiteID, err) - } - - // Cross-site member_added so the remote site's search-sync-worker - // updates its user-room/spotlight MV — mirrors processAddMembers' - // federation. inbox-worker still consumes the room_created above to - // build correctly-typed Subscription rows; this event only feeds the - // search index. memberEvt := model.MemberAddEvent{ Type: model.OutboxMemberAdded, RoomID: room.ID, RoomName: room.Name, + RoomType: room.Type, Accounts: accounts, SiteID: room.SiteID, + RequesterAccount: requester.Account, JoinedAt: req.Timestamp, HistorySharedSince: nil, Timestamp: now.UnixMilli(), @@ -1581,25 +1557,28 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req return nil } - payload := model.RoomCreatedOutbox{ + now := time.Now().UTC().UnixMilli() + memberEvt := model.MemberAddEvent{ + Type: model.OutboxMemberAdded, RoomID: room.ID, - RoomType: room.Type, RoomName: "", - HomeSiteID: room.SiteID, + RoomType: room.Type, Accounts: []string{other.Account}, + SiteID: room.SiteID, RequesterAccount: requester.Account, - Timestamp: acceptedAt.UnixMilli(), + JoinedAt: acceptedAt.UnixMilli(), + Timestamp: now, } - pData, err := json.Marshal(payload) + pData, err := json.Marshal(memberEvt) if err != nil { - return fmt.Errorf("marshal room_created outbox payload: %w", err) + return fmt.Errorf("marshal member_added outbox payload: %w", err) } envelope := model.OutboxEvent{ - Type: model.OutboxTypeRoomCreated, + Type: model.OutboxMemberAdded, SiteID: room.SiteID, DestSiteID: other.SiteID, Payload: pData, - Timestamp: time.Now().UTC().UnixMilli(), + Timestamp: now, } eData, err := json.Marshal(envelope) if err != nil { @@ -1607,7 +1586,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req } payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, acceptedAt.UnixMilli()) return h.publish(ctx, - subject.Outbox(room.SiteID, other.SiteID, model.OutboxTypeRoomCreated), + subject.Outbox(room.SiteID, other.SiteID, model.OutboxMemberAdded), eData, natsutil.OutboxDedupID(ctx, other.SiteID, payloadSeed), ) diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index 8d8a53c02..8cf62ffee 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -2322,18 +2322,19 @@ func TestProcessCreateRoom_Channel_OutboxPerRemoteSite(t *testing.T) { }) require.NoError(t, h.processCreateRoom(ctx, body)) - outboxMsgs := outboxFor(getPublished(), "site-B", model.MessageTypeRoomCreated) + outboxMsgs := outboxFor(getPublished(), "site-B", model.OutboxMemberAdded) require.Len(t, outboxMsgs, 1) var envelope model.OutboxEvent require.NoError(t, json.Unmarshal(outboxMsgs[0].data, &envelope)) - assert.Equal(t, model.MessageTypeRoomCreated, envelope.Type) + assert.Equal(t, model.OutboxMemberAdded, envelope.Type) assert.Equal(t, "site-A", envelope.SiteID) assert.Equal(t, "site-B", envelope.DestSiteID) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(envelope.Payload, &payload)) assert.Equal(t, "room-ch-5", payload.RoomID) + assert.Equal(t, model.RoomTypeChannel, payload.RoomType) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) } @@ -2826,24 +2827,24 @@ func TestHandleSyncCreateDM_CrossSite_EmitsOutbox(t *testing.T) { var outbox *dmCapturedPublish for i := range capture.captured { - if capture.captured[i].subject == subject.Outbox("site-a", "site-b", model.OutboxTypeRoomCreated) { + if capture.captured[i].subject == subject.Outbox("site-a", "site-b", model.OutboxMemberAdded) { outbox = &capture.captured[i] break } } - require.NotNil(t, outbox, "expected an outbox publish to site-b") + require.NotNil(t, outbox, "expected a member_added outbox publish to site-b") var env model.OutboxEvent require.NoError(t, json.Unmarshal(outbox.data, &env)) - assert.Equal(t, model.OutboxEventType(model.OutboxTypeRoomCreated), env.Type) + assert.Equal(t, model.OutboxMemberAdded, env.Type) assert.Equal(t, "site-a", env.SiteID) assert.Equal(t, "site-b", env.DestSiteID) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, model.RoomTypeDM, payload.RoomType) assert.Equal(t, "", payload.RoomName) - assert.Equal(t, "site-a", payload.HomeSiteID) + assert.Equal(t, "site-a", payload.SiteID) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, "01970a4f-8c2d-7c9a-abcd-e0123456789f:site-b", outbox.msgID) @@ -3139,13 +3140,11 @@ func TestProcessCreateRoom_Channel_PublishesCrossSiteMemberAdded(t *testing.T) { require.NoError(t, json.Unmarshal(envelope.Payload, &inner)) assert.Equal(t, "room-ch-xsite", inner.RoomID) assert.Equal(t, "Cross", inner.RoomName) + assert.Equal(t, model.RoomTypeChannel, inner.RoomType, "create-time member_added carries RoomType for inbox-worker dispatch") assert.Equal(t, []string{"bob"}, inner.Accounts, "carries only the remote-site accounts, mirroring processAddMembers") assert.Equal(t, "site-A", inner.SiteID, "inner SiteID is the origin (room's home)") + assert.Equal(t, "alice", inner.RequesterAccount, "create-time member_added carries RequesterAccount for DM/botDM counterpart resolution") assert.Nil(t, inner.HistorySharedSince, "create-time event must be unrestricted") - - // Sanity: the existing room_created outbox is still emitted on the same loop. - roomCreatedOutbox := outboxFor(getPublished(), "site-B", model.OutboxTypeRoomCreated) - require.Len(t, roomCreatedOutbox, 1, "room_created outbox path unchanged") } // ---- Task 10: key-gate and fan-out tests ---- diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index f734fba40..0680888ba 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -653,31 +653,8 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", "")), "no outbox to home site-A") - // room_created outboxes — one per remote site. - createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) - createdC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxTypeRoomCreated)) - require.Len(t, createdB, 1, "exactly one room_created outbox to site-B") - require.Len(t, createdC, 1, "exactly one room_created outbox to site-C") - - var envB model.OutboxEvent - require.NoError(t, json.Unmarshal(createdB[0].data, &envB)) - var payloadB model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(envB.Payload, &payloadB)) - assert.ElementsMatch(t, []string{"bob", "carol"}, payloadB.Accounts) - assert.Equal(t, model.RoomTypeChannel, payloadB.RoomType) - assert.Equal(t, "deal team", payloadB.RoomName) - assert.Equal(t, "site-A", payloadB.HomeSiteID) - assert.Equal(t, "alice", payloadB.RequesterAccount) - assert.Equal(t, reqID+":site-B", createdB[0].msgID) - - var envC model.OutboxEvent - require.NoError(t, json.Unmarshal(createdC[0].data, &envC)) - var payloadC model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(envC.Payload, &payloadC)) - assert.ElementsMatch(t, []string{"ian"}, payloadC.Accounts) - assert.Equal(t, reqID+":site-C", createdC[0].msgID) - - // member_added outboxes — one per remote site (search-sync-worker federation). + // One member_added outbox per remote site — carries all the info + // inbox-worker needs for sub creation AND search-sync-worker for MV update. memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) memberC := cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", model.OutboxMemberAdded)) require.Len(t, memberB, 1, "exactly one member_added outbox to site-B") @@ -688,8 +665,10 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { var memberPayloadB model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnvB.Payload, &memberPayloadB)) assert.ElementsMatch(t, []string{"bob", "carol"}, memberPayloadB.Accounts) + assert.Equal(t, model.RoomTypeChannel, memberPayloadB.RoomType) assert.Equal(t, "deal team", memberPayloadB.RoomName) assert.Equal(t, "site-A", memberPayloadB.SiteID) + assert.Equal(t, "alice", memberPayloadB.RequesterAccount) assert.Nil(t, memberPayloadB.HistorySharedSince) assert.Equal(t, reqID+":site-B", memberB[0].msgID) @@ -698,6 +677,11 @@ func TestProcessCreateRoomChannel_OutboxPerRemoteSite(t *testing.T) { var memberPayloadC model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnvC.Payload, &memberPayloadC)) assert.ElementsMatch(t, []string{"ian"}, memberPayloadC.Accounts) + assert.Equal(t, model.RoomTypeChannel, memberPayloadC.RoomType) + assert.Equal(t, "deal team", memberPayloadC.RoomName) + assert.Equal(t, "site-A", memberPayloadC.SiteID) + assert.Equal(t, "alice", memberPayloadC.RequesterAccount) + assert.Nil(t, memberPayloadC.HistorySharedSince) assert.Equal(t, reqID+":site-C", memberC[0].msgID) } @@ -741,28 +725,18 @@ func TestProcessCreateRoomDM_OutboxToCounterpartSite(t *testing.T) { assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-A", ""))) assert.Empty(t, cap.outboxOnPrefix(subject.Outbox("site-A", "site-C", ""))) - // room_created outbox to the recipient's site. - createdB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxTypeRoomCreated)) - require.Len(t, createdB, 1) - var env model.OutboxEvent - require.NoError(t, json.Unmarshal(createdB[0].data, &env)) - var payload model.RoomCreatedOutbox - require.NoError(t, json.Unmarshal(env.Payload, &payload)) - assert.Equal(t, model.RoomTypeDM, payload.RoomType) - assert.Equal(t, "", payload.RoomName, "DM RoomName empty per v2 cleanup") - assert.ElementsMatch(t, []string{"bob"}, payload.Accounts) - assert.Equal(t, "alice", payload.RequesterAccount) - assert.Equal(t, "site-A", payload.HomeSiteID) - assert.Equal(t, reqID+":site-B", createdB[0].msgID) - - // member_added outbox (search-sync-worker federation). + // One member_added outbox to the recipient's site — carries everything + // inbox-worker needs to build the DM recipient's sub with the right shape. memberB := cap.outboxOnPrefix(subject.Outbox("site-A", "site-B", model.OutboxMemberAdded)) require.Len(t, memberB, 1) var memberEnv model.OutboxEvent require.NoError(t, json.Unmarshal(memberB[0].data, &memberEnv)) var memberPayload model.MemberAddEvent require.NoError(t, json.Unmarshal(memberEnv.Payload, &memberPayload)) + assert.Equal(t, model.RoomTypeDM, memberPayload.RoomType) + assert.Equal(t, "", memberPayload.RoomName, "DM RoomName empty per v2 cleanup") assert.ElementsMatch(t, []string{"bob"}, memberPayload.Accounts) + assert.Equal(t, "alice", memberPayload.RequesterAccount, "DM counterpart resolution depends on this") assert.Equal(t, "site-A", memberPayload.SiteID) assert.Equal(t, reqID+":site-B", memberB[0].msgID) } @@ -1076,7 +1050,7 @@ func TestSyncCreateDM_BotDM_CrossSiteOutbox(t *testing.T) { _, err := handler.handleSyncCreateDM(newIntegSyncDMCtx(), data) require.NoError(t, err) - pubs := cap.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs := cap.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) assert.Len(t, pubs, 1, "exactly one outbox to site-B") } @@ -1142,14 +1116,15 @@ func TestSyncCreateDM_CrossSite_OutboxPayloadConverges(t *testing.T) { assert.Equal(t, wantRoomID, persisted.ID) // 2. OUTBOX payload carries the same RoomID + the dedup key includes destSiteID. - pubs := cap1.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs := cap1.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) require.Len(t, pubs, 1) var env model.OutboxEvent require.NoError(t, json.Unmarshal(pubs[0].data, &env)) - var payload model.RoomCreatedOutbox + var payload model.MemberAddEvent require.NoError(t, json.Unmarshal(env.Payload, &payload)) assert.Equal(t, wantRoomID, payload.RoomID, "outbox RoomID must match local room.ID so remote site converges") + assert.Equal(t, model.RoomTypeDM, payload.RoomType) assert.Equal(t, "alice", payload.RequesterAccount) assert.Equal(t, []string{"bob"}, payload.Accounts) assert.Contains(t, pubs[0].msgID, "site-B", @@ -1161,7 +1136,7 @@ func TestSyncCreateDM_CrossSite_OutboxPayloadConverges(t *testing.T) { handler2 := NewHandler(store, siteID, cap2.fn(), testKeyStore, testKeySender) _, err = handler2.handleSyncCreateDM(ctx, data) require.NoError(t, err) - pubs2 := cap2.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxTypeRoomCreated)) + pubs2 := cap2.outboxOnPrefix(subject.Outbox(siteID, "site-B", model.OutboxMemberAdded)) require.Len(t, pubs2, 1) assert.Equal(t, pubs[0].msgID, pubs2[0].msgID, "replay must produce identical Nats-Msg-Id so broker dedup blocks duplicate cross-site events")