diff --git a/docs/superpowers/plans/2026-05-19-botdm-resubscribe-upsert.md b/docs/superpowers/plans/2026-05-19-botdm-resubscribe-upsert.md new file mode 100644 index 000000000..e8fdb4c20 --- /dev/null +++ b/docs/superpowers/plans/2026-05-19-botdm-resubscribe-upsert.md @@ -0,0 +1,954 @@ +# BotDM Re-subscribe Upsert + `DisableNotification` Field — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Re-creating a botDM for a user who already has a subscription must refresh `DisableNotification → false`, `IsSubscribed → true`, and `JoinedAt → new acceptedAt` while preserving runtime state (`LastSeenAt`, `HasMention`, `ThreadUnread`, `Alert`) and identity (`_id`, `u`). + +**Architecture:** Add `DisableNotification bool` to `model.Subscription`. Add a new `BulkUpsertSubscriptions` store method that issues a `BulkWrite` of `UpdateOneModel` upserts keyed on `(roomId, u.account)`, with `$set` carrying the three re-activation fields and `$setOnInsert` carrying identity + zero-value runtime defaults. Wire the two botDM call sites in `room-worker` (async `processCreateRoom` botDM branch and sync `processSyncCreateDM` botDM branch) to the new method. Mirror the sync path's post-write re-read into the async path so the in-memory subs handed to `finishCreateRoom` carry persisted `_id`/`JoinedAt`. Channel-room, regular-DM, and add-member paths are untouched. + +**Tech Stack:** Go 1.25, MongoDB (`go.mongodb.org/mongo-driver/v2`), `go.uber.org/mock` (mockgen), `stretchr/testify`, `testcontainers-go` for integration tests. All commands wrapped via root `Makefile`. + +**Reference spec:** `docs/superpowers/specs/2026-05-19-botdm-resubscribe-upsert-design.md` + +--- + +## File Structure + +**Modify:** +- `pkg/model/subscription.go` — add `DisableNotification` field +- `pkg/model/model_test.go` — extend Subscription round-trip case +- `room-worker/store.go` — declare `BulkUpsertSubscriptions` on interface +- `room-worker/store_mongo.go` — implement `BulkUpsertSubscriptions` +- `room-worker/handler.go` — switch botDM branches to upsert + add async re-read +- `room-worker/handler_test.go` — update botDM unit tests for new expectations +- `room-worker/integration_test.go` — add re-join refresh + regression tests +- `room-worker/mock_store_test.go` — regenerated via `make generate` (do not hand-edit) + +**No new files.** + +--- + +## Task 1: Add `DisableNotification` field to `model.Subscription` + +**Files:** +- Modify: `pkg/model/subscription.go` (around line 41) +- Modify: `pkg/model/model_test.go` (the `TestSubscriptionJSON` "with optional fields set" case, around line 453) + +- [ ] **Step 1: Write the failing test** + +Edit `pkg/model/model_test.go`. In the `TestSubscriptionJSON` "with optional fields set" subtest, add `DisableNotification: true,` immediately after the existing `Alert: true,` line, so the round-trip covers the new field: + +```go +s := model.Subscription{ + ID: "s1", + User: model.SubscriptionUser{ID: "u1", Account: "alice"}, + RoomID: "r1", + RoomType: model.RoomTypeChannel, + SiteID: "site-a", + Roles: []model.Role{model.RoleOwner}, + HistorySharedSince: &hss, + JoinedAt: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), + LastSeenAt: &lsa, + HasMention: true, + ThreadUnread: []string{"parent-1", "parent-2"}, + Alert: true, + DisableNotification: true, +} +``` + +Also extend `TestSubscriptionJSON_ThreadUnreadOmittedAlertAlwaysPresent` to assert `disableNotification` is always present in JSON even when false: + +```go +disableVal, hasDisable := raw["disableNotification"] +assert.True(t, hasDisable, "disableNotification must be present in JSON even when false") +assert.Equal(t, false, disableVal) +``` + +(Add these two lines next to the existing `alertVal, hasAlert := raw["alert"]` block in the same test.) + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=pkg/model` +Expected: FAIL — `DisableNotification` is undefined on `model.Subscription`, and the round-trip would fail anyway because the field doesn't exist. + +- [ ] **Step 3: Add the field** + +Edit `pkg/model/subscription.go`. In the `Subscription` struct (around line 27), insert the field immediately after `Alert`: + +```go +Alert bool `json:"alert" bson:"alert"` +DisableNotification bool `json:"disableNotification" bson:"disableNotification"` +``` + +No `omitempty` — matches the convention of `Alert`/`HasMention`. + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=pkg/model` +Expected: PASS — both Subscription round-trip and the "alert/disableNotification always present" assertion succeed. + +- [ ] **Step 5: Commit** + +```bash +git add pkg/model/subscription.go pkg/model/model_test.go +git commit -m "feat(model): add DisableNotification field to Subscription" +``` + +--- + +## Task 2: Declare `BulkUpsertSubscriptions` on the store interface + regenerate mocks + +**Files:** +- Modify: `room-worker/store.go` (around line 38, next to `BulkCreateSubscriptions`) +- Regenerated: `room-worker/mock_store_test.go` + +- [ ] **Step 1: Add the interface method** + +Edit `room-worker/store.go`. Immediately after the existing `BulkCreateSubscriptions` line (line 38) inside the `SubscriptionStore` interface, add: + +```go +// BulkUpsertSubscriptions inserts each sub and, on a (roomId, u.account) +// collision with an existing document, refreshes the re-activation fields +// (DisableNotification → false, IsSubscribed, JoinedAt) while preserving +// the existing document's runtime state (LastSeenAt, HasMention, +// ThreadUnread, Alert) and identity (_id, u). Intended for botDM +// re-creation paths only; channel/DM/add-member paths must continue to +// use BulkCreateSubscriptions for safe redelivery idempotency. +BulkUpsertSubscriptions(ctx context.Context, subs []*model.Subscription) error +``` + +- [ ] **Step 2: Regenerate mocks** + +Run: `make generate SERVICE=room-worker` +Expected: `room-worker/mock_store_test.go` updated to include `BulkUpsertSubscriptions` and matching `EXPECT()` helper. Do not hand-edit the file. + +- [ ] **Step 3: Confirm Red state, do not commit** + +Run: `make build SERVICE=room-worker` +Expected: FAIL with `*MongoStore does not implement SubscriptionStore (missing method BulkUpsertSubscriptions)`. That's the desired Red — Task 3 adds the implementation. Interface + mock + impl are one logical change and ship in Task 3's commit. Do not commit anything from this task in isolation. + +--- + +## Task 3: Implement `BulkUpsertSubscriptions` on `MongoStore` + +**Files:** +- Modify: `room-worker/store_mongo.go` (immediately after `BulkCreateSubscriptions`, around line 335) +- Modify: `room-worker/integration_test.go` (append a new test) + +This task is integration-test-driven. The unit-test surface for `MongoStore` would be a Mongo mock and add no real signal; the real correctness gates are the Mongo upsert semantics, which only an integration test against a real container can verify. + +- [ ] **Step 1: Write the failing integration test** + +Append to `room-worker/integration_test.go`: + +```go +func TestMongoStore_BulkUpsertSubscriptions_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + oldJoinedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + newJoinedAt := time.Date(2026, 5, 19, 0, 0, 0, 0, time.UTC) + lastSeen := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC) + + t.Run("update branch refreshes re-activation fields and preserves runtime state", func(t *testing.T) { + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-id-1", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: "room-update-1", + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "helper.bot", + IsSubscribed: false, + DisableNotification: true, + JoinedAt: oldJoinedAt, + LastSeenAt: &lastSeen, + HasMention: true, + Alert: true, + ThreadUnread: []string{"parent-1"}, + }) + + // Caller-supplied sub uses a different _id; upsert MUST keep the existing _id. + newSub := &model.Subscription{ + ID: "caller-supplied-id-DIFFERENT", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: "room-update-1", + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "helper.bot", + IsSubscribed: true, + JoinedAt: newJoinedAt, + } + require.NoError(t, store.BulkUpsertSubscriptions(ctx, []*model.Subscription{newSub})) + + got, err := store.GetSubscription(ctx, "alice", "room-update-1") + require.NoError(t, err) + + // Identity preserved. + assert.Equal(t, "existing-id-1", got.ID, "_id must be preserved on update") + + // Re-activation fields refreshed. + assert.False(t, got.DisableNotification, "DisableNotification must be cleared") + assert.True(t, got.IsSubscribed, "IsSubscribed must be refreshed") + assert.True(t, got.JoinedAt.Equal(newJoinedAt), "JoinedAt must be updated; got %v want %v", got.JoinedAt, newJoinedAt) + + // Runtime state preserved. + assert.True(t, got.HasMention, "HasMention must be preserved") + assert.True(t, got.Alert, "Alert must be preserved") + require.NotNil(t, got.LastSeenAt) + assert.True(t, got.LastSeenAt.Equal(lastSeen), "LastSeenAt must be preserved") + assert.Equal(t, []string{"parent-1"}, got.ThreadUnread, "ThreadUnread must be preserved") + }) + + t.Run("insert branch initialises identity and zero-value runtime defaults", func(t *testing.T) { + newSub := &model.Subscription{ + ID: "fresh-insert-id", + User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}, + RoomID: "room-insert-1", + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "helper.bot", + IsSubscribed: true, + JoinedAt: newJoinedAt, + } + require.NoError(t, store.BulkUpsertSubscriptions(ctx, []*model.Subscription{newSub})) + + got, err := store.GetSubscription(ctx, "bob", "room-insert-1") + require.NoError(t, err) + + assert.Equal(t, "fresh-insert-id", got.ID) + assert.Equal(t, "u_bob", got.User.ID) + assert.Equal(t, "site-A", got.SiteID) + assert.Equal(t, model.RoomTypeBotDM, got.RoomType) + assert.Equal(t, "helper.bot", got.Name) + assert.True(t, got.IsSubscribed) + assert.False(t, got.DisableNotification) + assert.True(t, got.JoinedAt.Equal(newJoinedAt)) + assert.False(t, got.HasMention) + assert.False(t, got.Alert) + assert.Nil(t, got.LastSeenAt) + assert.Empty(t, got.ThreadUnread) + }) + + t.Run("empty slice is a no-op", func(t *testing.T) { + require.NoError(t, store.BulkUpsertSubscriptions(ctx, nil)) + require.NoError(t, store.BulkUpsertSubscriptions(ctx, []*model.Subscription{})) + }) +} +``` + +- [ ] **Step 2: Run the integration test to verify it fails** + +Run: `make test-integration SERVICE=room-worker` +Expected: COMPILE FAIL — `BulkUpsertSubscriptions` is not defined on `*MongoStore`. + +- [ ] **Step 3: Implement `BulkUpsertSubscriptions`** + +Edit `room-worker/store_mongo.go`. Immediately after `BulkCreateSubscriptions` (after line 335), add: + +```go +// BulkUpsertSubscriptions upserts each sub keyed on (roomId, u.account). +// On collision with an existing document, $set refreshes the three +// re-activation fields (disableNotification → false, isSubscribed, +// joinedAt) and leaves runtime fields (lastSeenAt, hasMention, +// threadUnread, alert) untouched. On insert, $setOnInsert initialises +// identity (_id, u, roomId, siteId, roomType, name, roles) plus +// hasMention/alert zero values. Used exclusively by botDM creation +// paths — see store.go for the interface comment. +func (s *MongoStore) BulkUpsertSubscriptions(ctx context.Context, subs []*model.Subscription) error { + if len(subs) == 0 { + return nil + } + models := make([]mongo.WriteModel, 0, len(subs)) + for _, sub := range subs { + filter := bson.M{"roomId": sub.RoomID, "u.account": sub.User.Account} + update := bson.M{ + "$set": bson.M{ + "disableNotification": false, + "isSubscribed": sub.IsSubscribed, + "joinedAt": sub.JoinedAt, + }, + "$setOnInsert": bson.M{ + "_id": sub.ID, + "u": sub.User, + "roomId": sub.RoomID, + "siteId": sub.SiteID, + "roomType": sub.RoomType, + "name": sub.Name, + "roles": sub.Roles, + "hasMention": false, + "alert": false, + }, + } + models = append(models, mongo.NewUpdateOneModel(). + SetFilter(filter). + SetUpdate(update). + SetUpsert(true)) + } + opts := options.BulkWrite().SetOrdered(false) + if _, err := s.subscriptions.BulkWrite(ctx, models, opts); err != nil { + return fmt.Errorf("bulk upsert %d subscriptions: %w", len(subs), err) + } + return nil +} +``` + +- [ ] **Step 4: Run the integration test to verify it passes** + +Run: `make test-integration SERVICE=room-worker` +Expected: PASS for `TestMongoStore_BulkUpsertSubscriptions_Integration` (all three subtests). + +- [ ] **Step 5: Run lint** + +Run: `make lint` +Expected: PASS — no formatting or vet errors. + +- [ ] **Step 6: Commit** + +```bash +git add room-worker/store.go room-worker/store_mongo.go room-worker/mock_store_test.go room-worker/integration_test.go +git commit -m "feat(room-worker): add BulkUpsertSubscriptions store method + +Adds a botDM-only upsert path: on (roomId, u.account) collision, refreshes +DisableNotification → false, IsSubscribed, and JoinedAt while preserving +the existing document's _id and runtime state (LastSeenAt, HasMention, +ThreadUnread, Alert). Existing BulkCreateSubscriptions is unchanged — +channel/DM/add-member paths keep their safe insert-only contract." +``` + +--- + +## Task 4: Wire `processCreateRoom` botDM branch to upsert + re-read canonical subs + +**Files:** +- Modify: `room-worker/handler.go` (the botDM branch in `processCreateRoom`, around line 1158-1166) +- Modify: `room-worker/handler_test.go` (`TestProcessCreateRoom_BotDM_HasIsSubscribed`, around line 2073) + +- [ ] **Step 1: Update the unit test to assert the new call shape (Red)** + +Edit `room-worker/handler_test.go`. Replace the body of `TestProcessCreateRoom_BotDM_HasIsSubscribed` (lines 2073-2114) with: + +```go +func TestProcessCreateRoom_BotDM_HasIsSubscribed(t *testing.T) { + h, mockStore, getPublished := newCreateRoomTestHandler(t) + ctx := natsutil.WithRequestID(context.Background(), testRequestID) + + requester := &model.User{ID: "u_alice", Account: "alice", EngName: "Alice A", ChineseName: "艾麗斯", SiteID: "site-A"} + bot := &model.User{ID: "u_bot", Account: "helper.bot", SiteID: "site-A"} + + mockStore.EXPECT().GetUser(gomock.Any(), "alice").Return(requester, nil) + mockStore.EXPECT().GetUser(gomock.Any(), "helper.bot").Return(bot, nil) + mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) + + var capturedSubs []*model.Subscription + mockStore.EXPECT().BulkUpsertSubscriptions(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, subs []*model.Subscription) error { + capturedSubs = subs + return nil + }) + + // After upsert, handler re-reads canonical sub pair via FindDMSubscription. + // Return the same in-memory subs (no dup-key collision in this happy path). + mockStore.EXPECT().FindDMSubscription(gomock.Any(), "alice", "helper.bot"). + DoAndReturn(func(_ context.Context, _, _ string) (*model.Subscription, error) { + return capturedSubs[0], nil + }) + mockStore.EXPECT().FindDMSubscription(gomock.Any(), "helper.bot", "alice"). + DoAndReturn(func(_ context.Context, _, _ string) (*model.Subscription, error) { + return capturedSubs[1], nil + }) + + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-bot-1").Return(nil) + + body := makeCreateRoomBody(t, &model.CreateRoomRequest{ + RoomID: "room-bot-1", RequesterAccount: "alice", + Users: []string{"helper.bot"}, + Timestamp: time.Now().UnixMilli(), + }) + require.NoError(t, h.processCreateRoom(ctx, body)) + + require.Len(t, capturedSubs, 2) + + // human side (alice): Name = bot's account, IsSubscribed = true + humanSub := capturedSubs[0] + assert.Equal(t, "u_alice", humanSub.User.ID) + assert.Equal(t, bot.Account, humanSub.Name) + assert.True(t, humanSub.IsSubscribed) + + // bot side: Name = requester's account, IsSubscribed = false + botSub := capturedSubs[1] + assert.Equal(t, "u_bot", botSub.User.ID) + assert.Equal(t, requester.Account, botSub.Name) + assert.False(t, botSub.IsSubscribed) + + assert.Empty(t, messagesCanonical(getPublished(), "site-A"), "botDM must emit no sys-messages") +} +``` + +- [ ] **Step 2: Run the test to verify it fails** + +Run: `make test SERVICE=room-worker` +Expected: FAIL — handler still calls `BulkCreateSubscriptions`, but the mock now expects `BulkUpsertSubscriptions` + two `FindDMSubscription` calls. gomock reports unexpected `BulkCreateSubscriptions` call. + +- [ ] **Step 3: Wire the handler** + +Edit `room-worker/handler.go`. Replace the botDM/DM dispatch block (lines 1155-1171, the `switch roomType` case `model.RoomTypeDM, model.RoomTypeBotDM:`) with: + +```go +switch roomType { +case model.RoomTypeDM, model.RoomTypeBotDM: + var subs []*model.Subscription + if roomType == model.RoomTypeBotDM { + subs = buildBotDMSubs(requester, counterpart, room, acceptedAt) + if err := h.store.BulkUpsertSubscriptions(ctx, subs); err != nil { + return fmt.Errorf("bulk upsert subs: %w", err) + } + // Upsert may have hit an existing row whose _id/JoinedAt differ + // from the in-memory pair. Re-read so finishCreateRoom's + // subscription.update / MemberAddEvent fan-out carries persisted + // values. Mirrors the sync DM path. + requesterSub, err := h.store.FindDMSubscription(ctx, requester.Account, counterpart.Account) + if err != nil { + return fmt.Errorf("find requester sub after upsert: %w", err) + } + counterpartSub, err := h.store.FindDMSubscription(ctx, counterpart.Account, requester.Account) + if err != nil { + return fmt.Errorf("find counterpart sub after upsert: %w", err) + } + subs = []*model.Subscription{requesterSub, counterpartSub} + } else { + subs = buildDMSubs(requester, counterpart, room, acceptedAt) + if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + return fmt.Errorf("bulk create subs: %w", err) + } + } + return h.finishCreateRoom(ctx, &req, room, requester, []model.User{*requester, *counterpart}, subs, requestID, now) +case model.RoomTypeChannel: + return h.processCreateRoomChannel(ctx, &req, room, requester, requestID, acceptedAt, now) +default: + return newPermanent("unknown room type %q", roomType) +} +``` + +- [ ] **Step 4: Run the test to verify it passes** + +Run: `make test SERVICE=room-worker` +Expected: PASS — `TestProcessCreateRoom_BotDM_HasIsSubscribed` and all sibling tests pass. In particular `TestProcessCreateRoom_DM_BuildsTwoSubs` MUST still pass (regression guard that regular DMs stay on `BulkCreateSubscriptions`). + +- [ ] **Step 5: Commit** + +```bash +git add room-worker/handler.go room-worker/handler_test.go +git commit -m "feat(room-worker): use BulkUpsertSubscriptions on async botDM path + +processCreateRoom's botDM branch now upserts subscriptions so muted/ +inactive botDM rooms are reactivated when the user re-opens them, and +re-reads the canonical sub pair via FindDMSubscription so downstream +events carry persisted _id/JoinedAt. Regular-DM branch is unchanged." +``` + +--- + +## Task 5: Wire `processSyncCreateDM` botDM branch to upsert + +**Files:** +- Modify: `room-worker/handler.go` (the sync DM bulk-create at lines 1561-1570) +- Modify: `room-worker/handler_test.go` — `TestHandleSyncCreateDM_BotDM_RequesterSubIsSubscribedTrue` at line 2771 (botDM branch — change expectation). Do NOT touch `TestHandleSyncCreateDM_DM_PersistsSubsAndReturnsRequester` (line 2705), `TestHandleSyncCreateDM_ReturnsCanonicalPersistedSub` (line 2807), or `TestHandleSyncCreateDM_BulkCreateSubsTransientError` (line 2985) — all three exercise the regular-DM branch (`RoomType: model.RoomTypeDM`) and serve as the regression guard that the DM branch still uses `BulkCreateSubscriptions`. + +- [ ] **Step 1: Update the botDM unit test to expect `BulkUpsertSubscriptions` (Red)** + +Edit `room-worker/handler_test.go`. In `TestHandleSyncCreateDM_BotDM_RequesterSubIsSubscribedTrue`, change line 2783 from: + +```go +store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) +``` + +to: + +```go +store.EXPECT().BulkUpsertSubscriptions(gomock.Any(), gomock.Any()).Return(nil) +``` + +Leave the rest of the test (FindDMSubscription x2, CreateRoom, FindUsersByAccounts, assertions) unchanged. + +- [ ] **Step 2: Run the test to verify it fails** + +Run: `make test SERVICE=room-worker` +Expected: FAIL — handler still calls `BulkCreateSubscriptions` in the sync botDM branch; mock expects `BulkUpsertSubscriptions`. + +- [ ] **Step 3: Wire the handler** + +Edit `room-worker/handler.go`. Replace the sync-DM bulk-create dispatch (lines 1561-1570) with: + +```go +// validateSyncCreateDMShape already gated this to {dm, botDM}. +var subs []*model.Subscription +if req.RoomType == model.RoomTypeBotDM { + subs = buildBotDMSubs(requester, other, room, acceptedAt) + if err := h.store.BulkUpsertSubscriptions(ctx, subs); err != nil { + return nil, fmt.Errorf("bulk upsert subs: %w", err) + } +} else { + subs = buildDMSubs(requester, other, room, acceptedAt) + if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { + return nil, fmt.Errorf("bulk create subs: %w", err) + } +} +``` + +The existing `FindDMSubscription` re-read at lines 1574-1582 stays — it now serves both branches correctly. The `publishSubscriptionUpdates` and outbox calls below it are unchanged. + +- [ ] **Step 4: Run the test to verify it passes** + +Run: `make test SERVICE=room-worker` +Expected: PASS — sync botDM test, sync regular-DM tests, and all other handler tests pass. + +- [ ] **Step 5: Commit** + +```bash +git add room-worker/handler.go room-worker/handler_test.go +git commit -m "feat(room-worker): use BulkUpsertSubscriptions on sync botDM path + +processSyncCreateDM's botDM branch now upserts subscriptions so muted/ +inactive cross-site botDM rooms are reactivated on re-create. The +existing post-write FindDMSubscription re-read handles the canonical +sub fetch unchanged. Regular-DM branch is unchanged." +``` + +--- + +## Task 6: End-to-end integration test — botDM re-join refresh via `processCreateRoom` + +**Files:** +- Modify: `room-worker/integration_test.go` + +- [ ] **Step 1: Write the failing integration test** + +Append to `room-worker/integration_test.go`: + +```go +// TestProcessCreateRoom_BotDM_ReSubscribe_Integration verifies the +// end-to-end re-join refresh: pre-seed a muted/inactive botDM +// subscription, then run processCreateRoom for the same (room, user) +// pair and assert the canonical row's mute/active state was refreshed +// and runtime fields were preserved. +func TestProcessCreateRoom_BotDM_ReSubscribe_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ + ID: "u_alice", Account: "alice", SiteID: "site-A", + EngName: "Alice", ChineseName: "爱丽丝", + }) + mustInsertUser(t, db, &model.User{ + ID: "u_helper_bot", Account: "helper.bot", SiteID: "site-A", + }) + + roomID := idgen.BuildDMRoomID("u_alice", "u_helper_bot") + oldJoinedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + lastSeen := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC) + + // Pre-seed: muted, inactive, with unread mention + runtime state. + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Type: model.RoomTypeBotDM, SiteID: "site-A", + CreatedBy: "u_alice", CreatedAt: oldJoinedAt, UpdatedAt: oldJoinedAt, + UIDs: []string{"u_alice", "u_helper_bot"}, + Accounts: []string{"alice", "helper.bot"}, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-human-sub", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "helper.bot", + IsSubscribed: false, + DisableNotification: true, + JoinedAt: oldJoinedAt, + LastSeenAt: &lastSeen, + HasMention: true, + Alert: true, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-bot-sub", + User: model.SubscriptionUser{ID: "u_helper_bot", Account: "helper.bot"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "alice", + IsSubscribed: false, + JoinedAt: oldJoinedAt, + }) + + h := newIntegrationHandler(t, store, "site-A") + const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" + ctx = natsutil.WithRequestID(ctx, reqID) + + body, err := json.Marshal(model.CreateRoomRequest{ + RoomID: roomID, + Users: []string{"helper.bot"}, + RequesterID: "u_alice", + RequesterAccount: "alice", + Timestamp: time.Now().UTC().UnixMilli(), + }) + require.NoError(t, err) + require.NoError(t, h.processCreateRoom(ctx, body)) + + // Human sub: identity preserved, re-activation refreshed, runtime preserved. + human, err := store.GetSubscription(ctx, "alice", roomID) + require.NoError(t, err) + assert.Equal(t, "existing-human-sub", human.ID, "_id preserved") + assert.False(t, human.DisableNotification, "DisableNotification cleared") + assert.True(t, human.IsSubscribed, "IsSubscribed refreshed") + assert.False(t, human.JoinedAt.Equal(oldJoinedAt), "JoinedAt updated to acceptedAt") + assert.True(t, human.HasMention, "HasMention preserved") + assert.True(t, human.Alert, "Alert preserved") + require.NotNil(t, human.LastSeenAt) + assert.True(t, human.LastSeenAt.Equal(lastSeen), "LastSeenAt preserved") + + // Bot sub: identity preserved, IsSubscribed stays false (bot-side semantics). + botSub, err := store.GetSubscription(ctx, "helper.bot", roomID) + require.NoError(t, err) + assert.Equal(t, "existing-bot-sub", botSub.ID, "_id preserved") + assert.False(t, botSub.IsSubscribed, "bot side stays IsSubscribed=false") + assert.False(t, botSub.DisableNotification, "bot side DisableNotification cleared (idempotent no-op)") + + // Only 2 subs total — no duplicates created. + subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": roomID}) + require.NoError(t, err) + assert.Equal(t, int64(2), subCount, "no duplicate subs after re-create") +} +``` + +- [ ] **Step 2: Run the integration test to verify it passes** + +Run: `make test-integration SERVICE=room-worker` +Expected: PASS for `TestProcessCreateRoom_BotDM_ReSubscribe_Integration`. + +(This task does not have a separate Red step because the handler change in Task 4 already exists. The test exists to verify end-to-end correctness against real Mongo and lock in the behavior as a regression guard. If it fails, the bug is in Task 4's wiring — return to that task.) + +- [ ] **Step 3: Commit** + +```bash +git add room-worker/integration_test.go +git commit -m "test(room-worker): integration test for botDM re-subscribe refresh + +End-to-end test that processCreateRoom on a muted/inactive botDM +refreshes DisableNotification/IsSubscribed/JoinedAt while preserving +the existing _id and runtime fields (HasMention, Alert, LastSeenAt)." +``` + +--- + +## Task 7: Regression integration test — regular DM does NOT upsert + +**Files:** +- Modify: `room-worker/integration_test.go` + +- [ ] **Step 1: Write the regression test** + +Append to `room-worker/integration_test.go`: + +```go +// TestProcessCreateRoom_DM_DoesNotUpsert_Integration locks in that +// processCreateRoom's regular-DM branch keeps its insert-only contract: +// a pre-existing regular-DM subscription's state (specifically +// DisableNotification = true and an old JoinedAt) must NOT be refreshed +// when processCreateRoom is replayed for the same (room, user) pair. +// This regression guard prevents accidental upsert wiring on the DM +// branch in future edits. +func TestProcessCreateRoom_DM_DoesNotUpsert_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ + ID: "u_alice", Account: "alice", SiteID: "site-A", + EngName: "Alice", ChineseName: "爱丽丝", + }) + mustInsertUser(t, db, &model.User{ + ID: "u_bob", Account: "bob", SiteID: "site-A", + EngName: "Bob", ChineseName: "鲍勃", + }) + + roomID := idgen.BuildDMRoomID("u_alice", "u_bob") + oldJoinedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Type: model.RoomTypeDM, SiteID: "site-A", + CreatedBy: "u_alice", CreatedAt: oldJoinedAt, UpdatedAt: oldJoinedAt, + UIDs: []string{"u_alice", "u_bob"}, + Accounts: []string{"alice", "bob"}, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-alice-sub", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeDM, + Name: "bob", + DisableNotification: true, + JoinedAt: oldJoinedAt, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-bob-sub", + User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeDM, + Name: "alice", + JoinedAt: oldJoinedAt, + }) + + h := newIntegrationHandler(t, store, "site-A") + const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" + ctx = natsutil.WithRequestID(ctx, reqID) + + body, err := json.Marshal(model.CreateRoomRequest{ + RoomID: roomID, + Users: []string{"bob"}, + RequesterID: "u_alice", + RequesterAccount: "alice", + Timestamp: time.Now().UTC().UnixMilli(), + }) + require.NoError(t, err) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got, err := store.GetSubscription(ctx, "alice", roomID) + require.NoError(t, err) + assert.True(t, got.DisableNotification, + "regular-DM path must NOT clear DisableNotification on re-create (insert-only contract)") + assert.True(t, got.JoinedAt.Equal(oldJoinedAt), + "regular-DM path must NOT refresh JoinedAt on re-create (insert-only contract)") +} +``` + +- [ ] **Step 2: Run the integration test to verify it passes** + +Run: `make test-integration SERVICE=room-worker` +Expected: PASS — confirms the regular-DM branch keeps the pre-existing sub untouched, including the muted state and old `JoinedAt`. + +- [ ] **Step 3: Commit** + +```bash +git add room-worker/integration_test.go +git commit -m "test(room-worker): regression test for regular-DM insert-only contract + +Locks in that processCreateRoom's regular-DM branch must NOT refresh a +pre-existing subscription's DisableNotification or JoinedAt — only the +botDM branch upserts. Guards against accidental upsert wiring spreading +to the regular-DM path in future edits." +``` + +--- + +## Task 8: Verification gate — full test + lint + SAST + +**Files:** None. + +- [ ] **Step 1: Regenerate all mocks (defensive)** + +Run: `make generate` +Expected: no diff if Task 2 was committed correctly. + +- [ ] **Step 2: Run full unit-test suite with race detector** + +Run: `make test` +Expected: PASS across all services. + +- [ ] **Step 3: Run integration tests for the touched service** + +Run: `make test-integration SERVICE=room-worker` +Expected: PASS for all room-worker integration tests, including the two new ones. + +- [ ] **Step 4: Run lint** + +Run: `make lint` +Expected: PASS. + +- [ ] **Step 5: Run SAST** + +Run: `make sast` +Expected: PASS (no medium+ findings introduced by this change). + +- [ ] **Step 6: Verify branch coverage on touched files** + +Run: +```bash +go test -tags=integration -race -coverprofile=/tmp/cov.out ./room-worker/... \ + && go tool cover -func=/tmp/cov.out | grep -E "(BulkUpsertSubscriptions|processCreateRoom|processSyncCreateDM)" +``` +Expected: coverage on `BulkUpsertSubscriptions` ≥90%; both modified handler branches exercised. If coverage on the new method is below 90%, add a missing test rather than padding numbers. + +- [ ] **Step 7: Push** + +```bash +git push -u origin claude/fix-room-notification-settings-RqyO1 +``` + +--- + +## Spec Coverage Map + +| Spec section | Implementing task(s) | +|---|---| +| §3.1 `DisableNotification` field on `model.Subscription` | Task 1 | +| §3.2 `newSub` signature unchanged (zero-value default) | Task 4 (implicit — no change made to `newSub`) | +| §3.3 `BulkUpsertSubscriptions` interface + mock | Task 2 | +| §3.3 `BulkUpsertSubscriptions` Mongo impl (`$set` / `$setOnInsert`) | Task 3 | +| §3.4 Async `processCreateRoom` botDM branch switch + re-read | Task 4 | +| §3.4 Sync `processSyncCreateDM` botDM branch switch | Task 5 | +| §3.5 Sync re-read (already present) — async re-read mirrors it | Task 4 | +| Testing §1 Model round-trip | Task 1 | +| Testing §2 Handler unit tests for botDM branches | Tasks 4, 5 | +| Testing §3 Integration test — re-join refresh | Task 6 | +| Testing §3 Integration test — fresh insert | Task 3 (subtest within store integration test) | +| Testing §3 Integration test — regular-DM regression | Task 7 | +| Testing §4 Coverage gate ≥90% | Task 8 | +| Out-of-scope: notification-worker filter, mute toggle endpoint, frontend UI | (deferred — explicitly out of scope per spec) | +| §3a `BulkCreateSubscriptions` → `$setOnInsert` upsert | Addendum Task A1 | +| §3b `FindDMSubscriptionPair` single-query re-read | Addendum Task A2 | + +--- + +## Addendum — Post-Review Changes (commit `d4bb50c`) + +Three review comments from `mliu33` on PR #202 triggered follow-up changes +after the original plan landed. Spec sections §3a and §3b cover the design; +the tasks below cover the implementation. + +### Addendum Task A1: Convert `BulkCreateSubscriptions` to `$setOnInsert` upsert + +**Files:** +- Modify: `room-worker/store_mongo.go` — `BulkCreateSubscriptions` impl +- Modify: `room-worker/handler_test.go` — drop the "non-dup-key" wording in + the `BulkCreateSubsTransientError` comment; the test itself is unchanged + (a transient error still surfaces as "internal error") + +- [x] Replace the `InsertMany + SetOrdered(false) + IsDuplicateKeyError` + swallow with a `BulkWrite` of `mongoutil.UpsertModel(filter, $setOnInsert)` + models, one per sub, filter keyed on `(roomId, u.account)`. Use the same + `options.BulkWrite().SetOrdered(false)` so partial collisions don't halt + the batch. +- [x] No interface change — same signature, same semantics observed by + callers. `BulkUpsertSubscriptions` (botDM refresh) stays untouched. +- [x] Confirm `TestProcessCreateRoom_DM_DoesNotUpsert_Integration` still + passes — `$setOnInsert` is a no-op on collision, so the pre-seeded + `DisableNotification = true` and old `JoinedAt` remain. + +### Addendum Task A2: `FindDMSubscriptionPair` single-query re-read + +**Files:** +- Modify: `room-worker/store.go` — declare on `SubscriptionStore` interface +- Modify: `room-worker/store_mongo.go` — Mongo impl +- Modify: `room-worker/handler.go` — `findDMSubscriptionPair` helper now + wraps the new store method; both call sites pass `room.ID` + + `requester.Account` +- Modify: `room-worker/handler_test.go` — every test that previously mocked + two `FindDMSubscription` calls now mocks one `FindDMSubscriptionPair` call + (returning both subs) +- Regenerate: `room-worker/mock_store_test.go` via `make generate` + +- [x] Add interface method: + ```go + FindDMSubscriptionPair(ctx context.Context, roomID, requesterAccount string) (*model.Subscription, *model.Subscription, error) + ``` +- [x] Mongo impl: one `Find` on + `{"roomId": roomID, "roomType": {"$in": [dm, botDM]}}`, decode into + `[]model.Subscription`, partition by `u.account`. Return + `model.ErrSubscriptionNotFound` if fewer than two results or if + `requesterAccount` isn't among them. +- [x] Update handler helper signature from + `findDMSubscriptionPair(ctx, requester, counterpart *model.User)` to + `findDMSubscriptionPair(ctx, roomID, requesterAccount string)`. +- [x] Update both call sites (async `processCreateRoom` botDM branch, sync + `handleSyncCreateDM` post-bulk re-read). +- [x] `make generate` → `make test SERVICE=room-worker` → `make lint`. + +### Addendum verification + +- [x] `make lint` — 0 issues +- [x] `make test SERVICE=room-worker` — pass +- [x] Commit + push: `d4bb50c refactor(room-worker): single-query DM sub + pair + idempotent-upsert BulkCreate` + +--- + +## Addendum 2 — Post-Review (user-service Owns Re-subscribe) + +A second review round pivoted the design: **user-service** will own the +re-subscribe semantic via `SetUserAppSubscribe` (clears +`DisableNotification`, sets `IsSubscribed = true`) and only calls +`createRoomSync` when no subscription exists. That collapses room-worker's +re-subscribe code path: both DM and botDM share the same idempotent +insert, and the `BulkUpsertSubscriptions` refresh path is no longer +needed. + +### Addendum Task B1: Remove `BulkUpsertSubscriptions` + +**Files:** +- Modify: `room-worker/store.go` — delete interface method + doc +- Modify: `room-worker/store_mongo.go` — delete Mongo impl +- Regenerate: `room-worker/mock_store_test.go` +- Modify: `room-worker/handler.go` — both call sites switch to + `BulkCreateSubscriptions` +- Modify: `room-worker/handler_test.go` — all + `EXPECT().BulkUpsertSubscriptions` lines become `BulkCreateSubscriptions` +- Modify: `room-worker/integration_test.go` — delete + `TestMongoStore_BulkUpsertSubscriptions_Integration`; invert + `TestProcessCreateRoom_BotDM_ReSubscribe_Integration` into + `TestProcessCreateRoom_BotDM_DoesNotUpsert_Integration` + +- [x] Drop the if/else dispatch in `processCreateRoom` DM branch — both + room types call `BulkCreateSubscriptions` followed by the canonical + re-read via `FindDMSubscriptionPair`. +- [x] Same simplification in `handleSyncCreateDM`. +- [x] Remove the botDM-specific `joinedAt` carve-out on the dup-room path: + `joinedAt = existing.CreatedAt` now applies to both DM and botDM (the + previous `if req.RoomType == model.RoomTypeDM` gate is gone). +- [x] Invert the botDM integration test — it now asserts that a + pre-seeded muted/inactive sub is **preserved** on redelivery (mirrors + the existing DM regression guard). + +### Addendum Task B2: Remove `FindDMSubscription` + +**Files:** +- Modify: `room-worker/store.go` — delete interface method + doc +- Modify: `room-worker/store_mongo.go` — delete Mongo impl +- Regenerate: `room-worker/mock_store_test.go` + +- [x] No call sites remain in `room-worker` after Addendum Task A2 + (`FindDMSubscriptionPair` covers the only consumer). Delete the + interface method and the Mongo impl. + +### Addendum Task B3: Tighten `FindDMSubscriptionPair` + +**Files:** +- Modify: `room-worker/store_mongo.go` +- Modify: `room-worker/handler.go` — inline the helper (pure pass-through + after we drop the redundant wrap; callers add their own context) + +- [x] Count check: `len(subs) < 2` → `len(subs) != 2` — fail loudly if + Mongo ever returns more than two subs for a DM/botDM room. +- [x] Stop double-wrapping the driver error. The store returns the raw + error from `Find` / `cursor.All`; callers (handler) already add + operational context (`"re-read DM subs after write: %w"`). Single + wrap, not double. +- [x] Inline `findDMSubscriptionPair` helper — it was a pure pass-through + after the wrap was removed. + +### Addendum 2 verification + +- [x] `make generate SERVICE=room-worker` +- [x] `make lint` — 0 issues +- [x] `make test SERVICE=room-worker` — pass + +### Spec coverage update + +| Spec section | Implementing task(s) | +|---|---| +| Post-Review Revision: `BulkUpsertSubscriptions` removed | Addendum Task B1 | +| Post-Review Revision: `FindDMSubscription` removed | Addendum Task B2 | +| Post-Review Revision: shared `joinedAt = existing.CreatedAt` for DM + botDM | Addendum Task B1 | +| Post-Review Revision: `FindDMSubscriptionPair` != 2 check + single error wrap | Addendum Task B3 | diff --git a/docs/superpowers/specs/2026-05-19-botdm-resubscribe-upsert-design.md b/docs/superpowers/specs/2026-05-19-botdm-resubscribe-upsert-design.md new file mode 100644 index 000000000..83bf1b270 --- /dev/null +++ b/docs/superpowers/specs/2026-05-19-botdm-resubscribe-upsert-design.md @@ -0,0 +1,326 @@ +# BotDM Re-subscribe Upsert + `DisableNotification` Field + +**Date:** 2026-05-19 +**Branch:** `claude/fix-room-notification-settings-RqyO1` +**Scope:** `pkg/model`, `room-worker` + +## Problem + +A user can mute notifications on a botDM room (the user-facing toggle for that +field will land in a follow-up PR). When the same user later tries to "open the +app" again — which on the backend re-issues `chat.user..request.room.create` +or `chat.user..request.room.syncCreateDM` for the bot — the +subscription document already exists, so the current `BulkCreateSubscriptions` +swallows the duplicate-key error and returns silently. The pre-existing sub +keeps its old state, including `DisableNotification = true`, so the user +remains muted and the re-activation is a no-op. + +The desired behaviour is: re-creating a botDM for a user who already has a +subscription **refreshes the mute/active state** on that subscription — +`DisableNotification` is cleared, `IsSubscribed` is set to `true`, and +`JoinedAt` is updated to the new acceptance timestamp. Runtime state +(`LastSeenAt`, `HasMention`, `ThreadUnread`, `Alert`) is preserved. + +`IsSubscribed = true` only appears on the human side of a botDM today +(`inbox-worker/handler.go:241-246` hard-codes `false` everywhere else), so the +"re-join" semantics described above are conceptually botDM-only. All other +membership write paths (channel rooms, regular DMs, add-member) keep their +current safe insert-only behaviour, which is the right semantics for JetStream +redelivery idempotency in those flows. + +## Scope + +In scope: + +- Add `DisableNotification bool` to `model.Subscription`. +- Add a new store method `BulkUpsertSubscriptions` that implements the re-join + refresh semantics described above. +- Convert the existing `BulkCreateSubscriptions` Mongo impl to a + `$setOnInsert`-only upsert (insert-only contract preserved, dup-key error + path eliminated). See §3a. +- Add a new store method `FindDMSubscriptionPair` that returns both subs of a + DM/botDM room in a single Mongo `Find`, replacing the two sequential + `FindDMSubscription` calls previously used to re-read canonical state. See + §3b. +- Wire the two botDM call sites in `room-worker` to use the new method. +- Re-read canonical subs after the async-path upsert so the + `subscription.update` / `MemberAddEvent` fan-out carries persisted state. + +Explicitly out of scope (deferred to follow-up PRs): + +- A user-facing toggle endpoint to set `DisableNotification`. +- `notification-worker` filtering: this PR does **not** change + `notification-worker`. It will still fan notifications to every sub + regardless of `DisableNotification`. End-to-end mute behaviour is wired up + separately. +- Any change to channel-room, regular-DM, or add-member persistence semantics. + +## Design + +### 1. Model — `pkg/model/subscription.go` + +Add a single field next to `Alert`: + +```go +Alert bool `json:"alert" bson:"alert"` +DisableNotification bool `json:"disableNotification" bson:"disableNotification"` +``` + +No `omitempty`. Matches the convention of `Alert` and `HasMention` and ensures +the bson document always carries an explicit value. Existing Mongo records +without the field decode to Go zero value (`false`) — the desired default +("notifications enabled") — so no migration is required. + +Extend the `Subscription` round-trip case in `pkg/model/model_test.go` to set +`DisableNotification: true` so JSON and BSON marshalling are covered. + +### 2. Subscription builder — `room-worker/handler.go` + +`newSub` (`handler.go:1040`) constructs every subscription used by every +membership write path. The zero value of `DisableNotification` already gives +the desired default (`false`) on fresh inserts, so the `newSub` signature does +not change. Adding a parameter that is always `false` would be noise. + +### 3. Store — `room-worker/store.go` + `room-worker/store_mongo.go` + +Add a new interface method alongside `BulkCreateSubscriptions`: + +```go +// BulkUpsertSubscriptions inserts the given subscriptions and, on a +// (roomId, u.account) collision with an existing document, refreshes the +// re-activation fields (DisableNotification → false, IsSubscribed, +// JoinedAt) while preserving the existing document's runtime state +// (LastSeenAt, HasMention, ThreadUnread, Alert) and identity (_id, u). +// Intended for botDM re-creation paths only; channel/DM/add-member paths +// must continue to use BulkCreateSubscriptions for safe redelivery +// idempotency. +BulkUpsertSubscriptions(ctx context.Context, subs []*model.Subscription) error +``` + +Implementation in `store_mongo.go` uses `BulkWrite` with one +`UpdateOneModel` per sub: + +- **Filter:** `{"roomId": sub.RoomID, "u.account": sub.User.Account}`. +- **`$set`:** `disableNotification: false`, `isSubscribed: sub.IsSubscribed`, + `joinedAt: sub.JoinedAt`. These are the re-activation fields. +- **`$setOnInsert`:** `_id, u, roomId, siteId, roomType, name, roles, + hasMention: false, alert: false`. These are immutable identity fields plus + zero-value initialisers for the runtime fields, applied only on first + insert. +- **Upsert:** `true`. Unordered so partial collisions don't halt the batch. + +On the update branch, runtime fields (`LastSeenAt`, `HasMention`, +`ThreadUnread`, `Alert`) are not in `$set`, so Mongo preserves whatever it +currently holds — an unread mention that arrived between mute and +re-subscribe is not erased. On the insert branch, `$setOnInsert` +initialises `hasMention` and `alert` to `false`; `lastSeenAt` and +`threadUnread` remain unset (their bson tags are `omitempty`). + +Both `$set` and `$setOnInsert` payloads are constructed via the shared +`mongoutil.UpsertModel(filter, update)` helper, which bakes in +`SetUpsert(true)` on the `UpdateOneModel` so call sites can't forget it. + +`mock_store_test.go` is regenerated via `make generate`. + +### 3a. Convert `BulkCreateSubscriptions` to `$setOnInsert` upsert + +`BulkCreateSubscriptions` previously used `InsertMany` with +`SetOrdered(false)` and silently swallowed `mongo.IsDuplicateKeyError`. That +worked, but the dup-key error path is implicit and a reviewer flagged it as +brittle when reasoning about JetStream redelivery. + +Switch the Mongo implementation to `BulkWrite` with one `UpdateOneModel` per +sub, using `$setOnInsert: sub` and `SetUpsert(true)` (via +`mongoutil.UpsertModel`). On a `(roomId, u.account)` collision, `$setOnInsert` +is a no-op so the existing document is preserved entirely — the insert-only +contract for channel/DM/add-member paths is unchanged. The benefit is that +the dup-key error path is eliminated; redelivery idempotency is expressed +explicitly in the idiom rather than via error swallowing. + +This matches the pattern already used by `inbox-worker.BulkCreateSubscriptions` +(`inbox-worker/main.go:96-116`). + +The `TestProcessCreateRoom_DM_DoesNotUpsert_Integration` regression test still +locks in that the regular-DM path does not refresh fields on re-create. + +### 3b. `FindDMSubscriptionPair` — single-query re-read + +`FindDMSubscription(ctx, account, targetName)` returns one sub by +`{u.account, name}`. The botDM and DM re-read path needs both subs of a room, +which previously required two sequential `FindDMSubscription` calls. Add: + +```go +// FindDMSubscriptionPair returns both subs of a DM/botDM room in a single +// query. The first return value is the sub owned by requesterAccount, the +// second is the counterpart's. Returns ErrSubscriptionNotFound if the +// room has fewer than two matching subs or if requesterAccount is not +// among them. +FindDMSubscriptionPair(ctx context.Context, roomID, requesterAccount string) (*model.Subscription, *model.Subscription, error) +``` + +Mongo impl: a single `Find` on +`{"roomId": roomID, "roomType": {"$in": [dm, botDM]}}`, then partition the +returned slice by `u.account` (== requesterAccount → requester sub; else → +counterpart sub). DM/botDM rooms always have exactly two subs, so no +projection or limit is needed. + +The handler's `findDMSubscriptionPair` helper now wraps the new store +method (single round-trip) instead of issuing two `FindDMSubscription` +calls. + +### 4. Wire into botDM paths — `room-worker/handler.go` + +Two call sites switch from `BulkCreateSubscriptions` to +`BulkUpsertSubscriptions`: + +1. `processCreateRoom` botDM branch — `handler.go:1163`. The regular-DM + branch immediately above (`buildDMSubs` at `handler.go:1161`) stays on + `BulkCreateSubscriptions`. +2. `processSyncCreateDM` botDM branch — `handler.go:1568`. The regular-DM + path in the same function stays on `BulkCreateSubscriptions`. + +Concretely, both sites become a conditional dispatch: + +```go +if roomType == model.RoomTypeBotDM { + err = h.store.BulkUpsertSubscriptions(ctx, subs) +} else { + err = h.store.BulkCreateSubscriptions(ctx, subs) +} +``` + +All other call sites (`processCreateRoomChannel` at `handler.go:1229`, +add-member flow) are unchanged. + +### 5. Re-read after upsert (async path) + +Both paths re-read the canonical sub pair via the new +`FindDMSubscriptionPair(ctx, room.ID, requester.Account)` (one Mongo +round-trip). + +The sync path (`processSyncCreateDM`) already re-reads after the bulk call +(`handler.go:1610`); just swap the two `FindDMSubscription` calls for the +single `FindDMSubscriptionPair`. + +The async `processCreateRoom` botDM branch previously handed the in-memory +`subs` directly to `finishCreateRoom`. On an upsert that hit an existing row, +the in-memory `sub.ID` and `sub.JoinedAt` may not match the persisted +document. Mirror the sync path: after a successful `BulkUpsertSubscriptions` +in the async botDM branch, call `FindDMSubscriptionPair`, replace the +in-memory subs with the canonical pair, and pass that into +`finishCreateRoom`. This keeps `subscription.update` events, +`MemberAddEvent`s, and reconcile counts operating on persisted state. + +## Testing + +Per CLAUDE.md, all new code goes through the Red-Green-Refactor TDD cycle. + +### Unit tests — `room-worker/handler_test.go` + +- `processCreateRoom` botDM happy path: expect `BulkUpsertSubscriptions` + (not `BulkCreateSubscriptions`) and the two new `FindDMSubscription` + re-reads. Verify the events published downstream carry the re-read sub + values (different `_id` than the in-memory pre-upsert sub). +- `processCreateRoom` regular-DM, `processCreateRoomChannel`, and + add-member paths: regression-guard that they still expect + `BulkCreateSubscriptions` and **not** `BulkUpsertSubscriptions`. +- `processSyncCreateDM` botDM happy path: expect + `BulkUpsertSubscriptions`. Regular-DM branch still expects + `BulkCreateSubscriptions`. + +### Integration tests — `room-worker/integration_test.go` + +- **Re-join refresh:** Pre-seed a botDM subscription with + `DisableNotification = true`, `IsSubscribed = false`, and an old + `JoinedAt`. Also set `HasMention = true`, `Alert = true`, and a + `LastSeenAt`. Run `processCreateRoom` for the same (roomID, account) + pair with a new `acceptedAt`. Assert: + - `_id` unchanged from the pre-seeded row. + - `DisableNotification == false`, `IsSubscribed == true`, + `JoinedAt == new acceptedAt`. + - `HasMention == true`, `Alert == true`, `LastSeenAt` unchanged + (runtime state preserved). +- **Fresh insert:** No pre-seeded row. Run `processCreateRoom` botDM. + Assert sub is created with `DisableNotification == false`, + `IsSubscribed == true` (human side) / `false` (bot side), + `HasMention == false`, `Alert == false`. +- **Regular-DM regression:** Pre-seed a regular-DM sub with + `DisableNotification = true`. Run `processCreateRoom` for the same + pair (regular DM, not botDM). Assert `DisableNotification` is still + `true` — regular-DM path does **not** upsert, so the pre-existing + state is preserved (current behaviour, regression guard). + +### Model tests + +`pkg/model/model_test.go` Subscription round-trip case extended to set +`DisableNotification = true`, verifying both JSON and BSON tag correctness. + +### Coverage + +Target ≥90% on the touched files (`room-worker/handler.go`, +`room-worker/store_mongo.go`) per the project's coverage rules. Cover both +the upsert-as-insert and upsert-as-update branches, the re-read failure +path, and the regular-DM no-upsert regression. + +## Risks & Open Questions + +- **Concurrent mute + re-subscribe race.** If the user mutes + (`DisableNotification = true`) at the same instant they re-open the app + (triggering this upsert), the upsert can overwrite the just-muted value. + Mitigation: the user-facing mute endpoint (out of scope here) lands later + and will be the dominant source of mutes; an accidental clear from a + racing re-creation is recoverable by re-muting. Calling this out so the + follow-up PR's design can address it if needed. + +- **`JoinedAt` semantics drift.** Refreshing `JoinedAt` on every botDM + re-creation means it is now "last re-join time", not "first join time". + No current consumer treats `JoinedAt` as "first join" for botDMs, but + worth flagging in case downstream analytics rely on it. + +## Out of Scope (Follow-Up PRs) + +- User-facing endpoint to toggle `DisableNotification` (probably an HTTP + PATCH on the subscription, or a NATS RPC; routed through `room-worker` + or `auth-service`). +- `notification-worker` filter: skip subs where + `DisableNotification == true` before fan-out. +- Frontend mute toggle UI. +- `docs/client-api.md` update — only required once the client-facing + toggle endpoint lands (this PR exposes no new request/response schema). + +--- + +## Post-Review Revision — Re-subscribe Owned by user-service + +After review (PR #202), the design pivoted: **`user-service` owns the +re-subscribe semantic** (it will clear `DisableNotification` and set +`IsSubscribed = true` via `SetUserAppSubscribe`, and only calls +`createRoomSync` when no subscription exists for the pair). That means +`room-worker`'s job for both DM and botDM is now strictly JetStream +redelivery idempotency, not field refresh. + +Concrete changes vs. §3.3 / §3.4 above: + +- **`BulkUpsertSubscriptions` is removed entirely** (interface, Mongo impl, + mocks). The "re-activation refresh" branch documented in §3.3 no longer + exists in `room-worker`. +- **`FindDMSubscription` is also removed** — it was only used by the + removed re-read paths, and `FindDMSubscriptionPair` (§3b) covers the + remaining cases. +- **Both DM and botDM use `BulkCreateSubscriptions`** (the + `$setOnInsert`-only upsert from §3a). On a JetStream redelivery the + persisted sub is preserved unchanged for both room types. +- **No `joinedAt` carve-out for botDM.** The sync `handleSyncCreateDM` + duplicate-room branch now reuses `existing.CreatedAt` as `joinedAt` for + **both** DM and botDM (drops the `if req.RoomType == model.RoomTypeDM` + gate). +- **`FindDMSubscriptionPair` robustness:** count check tightened from + `< 2` to `!= 2`, and the Mongo impl no longer wraps the driver error + (the handler already adds operational context — single wrap, not + double). + +Risks section update: the "concurrent mute + re-subscribe race" risk is +no longer applicable to `room-worker` — it belongs to whichever flow +calls `SetUserAppSubscribe` and is the responsibility of `user-service`. +The "`JoinedAt` semantics drift" risk also disappears: room-worker no +longer refreshes `JoinedAt` for botDM. diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index 7bbb307ec..acbf340ee 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -455,18 +455,19 @@ func TestSubscriptionJSON(t *testing.T) { hss := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) lsa := time.Date(2026, 1, 2, 0, 0, 0, 0, time.UTC) s := model.Subscription{ - ID: "s1", - User: model.SubscriptionUser{ID: "u1", Account: "alice"}, - RoomID: "r1", - RoomType: model.RoomTypeChannel, - SiteID: "site-a", - Roles: []model.Role{model.RoleOwner}, - HistorySharedSince: &hss, - JoinedAt: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), - LastSeenAt: &lsa, - HasMention: true, - ThreadUnread: []string{"parent-1", "parent-2"}, - Alert: true, + ID: "s1", + User: model.SubscriptionUser{ID: "u1", Account: "alice"}, + RoomID: "r1", + RoomType: model.RoomTypeChannel, + SiteID: "site-a", + Roles: []model.Role{model.RoleOwner}, + HistorySharedSince: &hss, + JoinedAt: time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC), + LastSeenAt: &lsa, + HasMention: true, + ThreadUnread: []string{"parent-1", "parent-2"}, + Alert: true, + DisableNotification: true, } roundTrip(t, &s, &model.Subscription{}) }) @@ -518,6 +519,10 @@ func TestSubscriptionJSON_ThreadUnreadOmittedAlertAlwaysPresent(t *testing.T) { assert.True(t, hasAlert, "alert must be present in JSON even when false") assert.Equal(t, false, alertVal) + disableVal, hasDisable := raw["disableNotification"] + assert.True(t, hasDisable, "disableNotification must be present in JSON even when false") + assert.Equal(t, false, disableVal) + var dst model.Subscription require.NoError(t, json.Unmarshal(data, &dst)) assert.Nil(t, dst.ThreadUnread, "absent threadUnread must unmarshal to nil") diff --git a/pkg/model/subscription.go b/pkg/model/subscription.go index 4b3ed1c75..787b15277 100644 --- a/pkg/model/subscription.go +++ b/pkg/model/subscription.go @@ -25,20 +25,21 @@ type SubscriptionUser struct { } type Subscription struct { - ID string `json:"id" bson:"_id"` - User SubscriptionUser `json:"u" bson:"u"` - RoomID string `json:"roomId" bson:"roomId"` - SiteID string `json:"siteId" bson:"siteId"` - Roles []Role `json:"roles" bson:"roles"` - Name string `json:"name" bson:"name"` - RoomType RoomType `json:"roomType" bson:"roomType"` - IsSubscribed bool `json:"isSubscribed,omitempty" bson:"isSubscribed,omitempty"` - HistorySharedSince *time.Time `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"` - JoinedAt time.Time `json:"joinedAt" bson:"joinedAt"` - LastSeenAt *time.Time `json:"lastSeenAt,omitempty" bson:"lastSeenAt,omitempty"` - HasMention bool `json:"hasMention" bson:"hasMention"` - ThreadUnread []string `json:"threadUnread,omitempty" bson:"threadUnread,omitempty"` - Alert bool `json:"alert" bson:"alert"` + ID string `json:"id" bson:"_id"` + User SubscriptionUser `json:"u" bson:"u"` + RoomID string `json:"roomId" bson:"roomId"` + SiteID string `json:"siteId" bson:"siteId"` + Roles []Role `json:"roles" bson:"roles"` + Name string `json:"name" bson:"name"` + RoomType RoomType `json:"roomType" bson:"roomType"` + IsSubscribed bool `json:"isSubscribed,omitempty" bson:"isSubscribed,omitempty"` + HistorySharedSince *time.Time `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"` + JoinedAt time.Time `json:"joinedAt" bson:"joinedAt"` + LastSeenAt *time.Time `json:"lastSeenAt,omitempty" bson:"lastSeenAt,omitempty"` + HasMention bool `json:"hasMention" bson:"hasMention"` + ThreadUnread []string `json:"threadUnread,omitempty" bson:"threadUnread,omitempty"` + Alert bool `json:"alert" bson:"alert"` + DisableNotification bool `json:"disableNotification" bson:"disableNotification"` } // SubscriptionHRInfo carries the counterpart's HR-directory record on a diff --git a/room-worker/handler.go b/room-worker/handler.go index 3eea7f97a..7b5d19e21 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -1021,6 +1021,7 @@ func resolveRoomName(req *model.CreateRoomRequest, roomType model.RoomType) stri } // buildDMSubs returns the two DM subs (each names the counterpart, IsSubscribed=false). + func buildDMSubs(requester, other *model.User, room *model.Room, acceptedAt time.Time) []*model.Subscription { return []*model.Subscription{ newSub(idgen.GenerateUUIDv7(), requester, room, nil, other.Account, false, acceptedAt), @@ -1163,6 +1164,14 @@ func (h *Handler) processCreateRoom(ctx context.Context, data []byte) (err error if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { return fmt.Errorf("bulk create subs: %w", err) } + // Re-read canonical subs: BulkCreate is a $setOnInsert upsert, so on a + // JetStream redelivery the in-memory _id/JoinedAt may not match the + // persisted document. Hand the canonical pair to finishCreateRoom. + requesterSub, counterpartSub, err := h.store.FindDMSubscriptionPair(ctx, room.ID, requester.Account) + if err != nil { + return fmt.Errorf("re-read DM subs after write: %w", err) + } + subs = []*model.Subscription{requesterSub, counterpartSub} return h.finishCreateRoom(ctx, &req, room, requester, []model.User{*requester, *counterpart}, subs, requestID, now) case model.RoomTypeChannel: return h.processCreateRoomChannel(ctx, &req, room, requester, requestID, acceptedAt, now) @@ -1508,7 +1517,13 @@ func (h *Handler) handleSyncCreateDM(ctx context.Context, data []byte) (*model.S return nil, errUserLookupFailed } - acceptedAt := time.Now().UTC() + // roomCreatedAt drives the room doc and the outbox dedup seed (must stay + // stable across NATS retries). joinedAt drives the subscription's JoinedAt + // field — on a dup-key retry it tracks the room's original creation time + // so JetStream redelivery is idempotent (user-service guards against + // genuine re-subscribe so we never need to refresh JoinedAt here). + roomCreatedAt := time.Now().UTC() + joinedAt := roomCreatedAt roomID := idgen.BuildDMRoomID(requester.ID, other.ID) uids, accounts := model.BuildDMParticipants(requester, other) @@ -1529,8 +1544,8 @@ func (h *Handler) handleSyncCreateDM(ctx context.Context, data []byte) (*model.S AppCount: appCount, UIDs: uids, Accounts: accounts, - CreatedAt: acceptedAt, - UpdatedAt: acceptedAt, + CreatedAt: roomCreatedAt, + UpdatedAt: roomCreatedAt, } if err := h.store.CreateRoom(ctx, room); err != nil { if !mongo.IsDuplicateKeyError(err) { @@ -1554,36 +1569,31 @@ func (h *Handler) handleSyncCreateDM(ctx context.Context, data []byte) (*model.S } // Sync-path duplicate-key: forward-only — no UIDs/Accounts backfill on the existing room. room = existing - acceptedAt = existing.CreatedAt + joinedAt = existing.CreatedAt } - // validateSyncCreateDMShape already gated this to {dm, botDM}. + // validateSyncCreateDMShape already gated this to {dm, botDM}. Both share + // the same idempotent-insert path: BulkCreateSubscriptions does + // $setOnInsert so a JetStream redelivery is a Mongo no-op, and the + // subsequent FindDMSubscriptionPair returns the canonical persisted pair. var subs []*model.Subscription if req.RoomType == model.RoomTypeBotDM { - subs = buildBotDMSubs(requester, other, room, acceptedAt) + subs = buildBotDMSubs(requester, other, room, joinedAt) } else { - subs = buildDMSubs(requester, other, room, acceptedAt) + subs = buildDMSubs(requester, other, room, joinedAt) } - if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil { return nil, fmt.Errorf("bulk create subs: %w", err) } - // Re-read canonical subs: BulkCreateSubscriptions swallows dup-key races, so the - // in-memory subs may carry IDs/JoinedAt that never made it to Mongo. Publish from - // the persisted pair instead. - requesterSub, err := h.store.FindDMSubscription(ctx, requester.Account, other.Account) - if err != nil { - return nil, fmt.Errorf("find requester sub after insert: %w", err) - } - otherSub, err := h.store.FindDMSubscription(ctx, other.Account, requester.Account) + requesterSub, otherSub, err := h.store.FindDMSubscriptionPair(ctx, room.ID, requester.Account) if err != nil { - return nil, fmt.Errorf("find counterpart sub after insert: %w", err) + return nil, fmt.Errorf("re-read DM subs after write: %w", err) } h.publishSubscriptionUpdates(ctx, []*model.Subscription{requesterSub, otherSub}, requestID) // Outbox failure means the remote site won't learn about the room; fail the request. - if err := h.publishSyncDMOutbox(ctx, room, requester, other, acceptedAt); err != nil { + if err := h.publishSyncDMOutbox(ctx, room, requester, other, requesterSub.JoinedAt); err != nil { return nil, fmt.Errorf("publish room_created outbox: %w", err) } @@ -1626,7 +1636,7 @@ func (h *Handler) publishSubscriptionUpdates(ctx context.Context, subs []*model. } } -func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, requester, other *model.User, acceptedAt time.Time) error { +func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, requester, other *model.User, joinedAt time.Time) error { if other.SiteID == "" || other.SiteID == h.siteID { return nil } @@ -1640,7 +1650,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req Accounts: []string{other.Account}, SiteID: room.SiteID, RequesterAccount: requester.Account, - JoinedAt: acceptedAt.UnixMilli(), + JoinedAt: joinedAt.UnixMilli(), Timestamp: now, } pData, err := json.Marshal(memberEvt) @@ -1658,7 +1668,10 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req if err != nil { return fmt.Errorf("marshal outbox envelope: %w", err) } - payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, acceptedAt.UnixMilli()) + // Dedup seed keys on room.CreatedAt (stable across retries and re-subscribes) + // rather than joinedAt — botDM re-subscribes carry a fresh joinedAt that + // would otherwise defeat JetStream dedup on a NATS request retry. + payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, room.CreatedAt.UnixMilli()) return h.publish(ctx, subject.Outbox(room.SiteID, other.SiteID, model.OutboxMemberAdded), eData, diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index 097434d16..3dd9d67d0 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -2015,6 +2015,10 @@ func TestProcessCreateRoom_DM_BuildsTwoSubs(t *testing.T) { capturedSubs = subs return nil }) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), "room-dm-1", "alice"). + DoAndReturn(func(_ context.Context, _, _ string) (*model.Subscription, *model.Subscription, error) { + return capturedSubs[0], capturedSubs[1], nil + }) mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-1").Return(nil) body := makeCreateRoomBody(t, &model.CreateRoomRequest{ @@ -2057,6 +2061,10 @@ func TestProcessCreateRoom_DM_EmitsNoSysMessages(t *testing.T) { mockStore.EXPECT().GetUser(gomock.Any(), "bob").Return(other, nil) mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), "room-dm-1", "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}}, + nil) mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-1").Return(nil) body := makeCreateRoomBody(t, &model.CreateRoomRequest{ @@ -2087,6 +2095,14 @@ func TestProcessCreateRoom_BotDM_HasIsSubscribed(t *testing.T) { capturedSubs = subs return nil }) + + // After bulk-create, handler re-reads canonical sub pair via FindDMSubscriptionPair. + // Return the same in-memory subs (no collision in this happy path). + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice"). + DoAndReturn(func(_ context.Context, _, _ string) (*model.Subscription, *model.Subscription, error) { + return capturedSubs[0], capturedSubs[1], nil + }) + mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-bot-1").Return(nil) body := makeCreateRoomBody(t, &model.CreateRoomRequest{ @@ -2721,20 +2737,21 @@ func TestHandleSyncCreateDM_DM_PersistsSubsAndReturnsRequester(t *testing.T) { captured = subs return nil }) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - ID: "canonical-alice-sub", - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, - RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), - Name: "bob", - RoomType: model.RoomTypeDM, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - ID: "canonical-bob-sub", - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), - Name: "alice", - RoomType: model.RoomTypeDM, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{ + ID: "canonical-alice-sub", + User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, + RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), + Name: "bob", + RoomType: model.RoomTypeDM, + }, + &model.Subscription{ + ID: "canonical-bob-sub", + User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, + RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), + Name: "alice", + RoomType: model.RoomTypeDM, + }, nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2781,12 +2798,13 @@ func TestHandleSyncCreateDM_BotDM_RequesterSubIsSubscribedTrue(t *testing.T) { return nil }) store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "helper.bot").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, IsSubscribed: true, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "helper.bot", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bot", Account: "helper.bot"}, IsSubscribed: false, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{ + User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, IsSubscribed: true, + }, + &model.Subscription{ + User: model.SubscriptionUser{ID: "u-bot", Account: "helper.bot"}, IsSubscribed: false, + }, nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeBotDM, RequesterAccount: "alice", OtherAccount: "helper.bot"} data := marshalReq(t, req) @@ -2802,8 +2820,9 @@ func TestHandleSyncCreateDM_BotDM_RequesterSubIsSubscribedTrue(t *testing.T) { assert.Equal(t, 1, insertedRoom.AppCount) } -// On dup-key race, BulkCreateSubscriptions swallows the error and the in-memory subs -// carry stale state; the handler must return the canonical persisted sub via FindDMSubscription. +// On a (roomId, u.account) collision, BulkCreateSubscriptions upserts with +// $setOnInsert (no field refresh) so the in-memory subs carry stale state; +// the handler must return the canonical persisted sub via FindDMSubscriptionPair. func TestHandleSyncCreateDM_ReturnsCanonicalPersistedSub(t *testing.T) { h, store, _ := newSyncDMTestHandler(t) @@ -2819,11 +2838,12 @@ func TestHandleSyncCreateDM_ReturnsCanonicalPersistedSub(t *testing.T) { Name: "bob", RoomType: model.RoomTypeDM, } - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(existingSub, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - ID: "canonical-bob-sub", User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), Name: "alice", RoomType: model.RoomTypeDM, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + existingSub, + &model.Subscription{ + ID: "canonical-bob-sub", User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, + RoomID: idgen.BuildDMRoomID("u-alice", "u-bob"), Name: "alice", RoomType: model.RoomTypeDM, + }, nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2858,12 +2878,10 @@ func TestHandleSyncCreateDM_PublishesSubscriptionUpdateForBothUsers(t *testing.T store.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return([]model.User{*requester, *other}, nil) store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}}, + nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2886,12 +2904,10 @@ func TestHandleSyncCreateDM_CrossSite_EmitsOutbox(t *testing.T) { store.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return([]model.User{*requester, *other}, nil) store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}}, + nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2931,12 +2947,10 @@ func TestHandleSyncCreateDM_SameSite_NoOutbox(t *testing.T) { store.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return([]model.User{*requester, *other}, nil) store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}}, + nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2967,12 +2981,10 @@ func TestHandleSyncCreateDM_OutboxPublishFails_FailsRequest(t *testing.T) { }, nil) store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}}, + nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -2981,7 +2993,7 @@ func TestHandleSyncCreateDM_OutboxPublishFails_FailsRequest(t *testing.T) { assert.Equal(t, "internal error", sanitizeSyncDMError(err)) } -// BulkCreateSubscriptions returning a non-dup-key error must surface as "internal error". +// BulkCreateSubscriptions returning a transient error must surface as "internal error". func TestHandleSyncCreateDM_BulkCreateSubsTransientError(t *testing.T) { h, store, _ := newSyncDMTestHandler(t) @@ -3026,12 +3038,10 @@ func TestHandleSyncCreateDM_IdempotentRecreate_UsesExistingCreatedAt(t *testing. captured = subs return nil }) - store.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, JoinedAt: originalCreatedAt, - }, nil) - store.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice").Return(&model.Subscription{ - User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, JoinedAt: originalCreatedAt, - }, nil) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, JoinedAt: originalCreatedAt}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bob", Account: "bob"}, JoinedAt: originalCreatedAt}, + nil) req := model.SyncCreateDMRequest{RoomType: model.RoomTypeDM, RequesterAccount: "alice", OtherAccount: "bob"} data := marshalReq(t, req) @@ -3045,6 +3055,51 @@ func TestHandleSyncCreateDM_IdempotentRecreate_UsesExistingCreatedAt(t *testing. } } +// On a CreateRoom dup-key for a botDM, the in-memory sub.JoinedAt must +// track existing.CreatedAt — same idempotency rule as regular DM. The +// "re-subscribe refresh" semantic is owned by user-service, not by +// room-worker; here the upsert is purely for JetStream redelivery +// idempotency. +func TestHandleSyncCreateDM_BotDM_Recreate_PreservesExistingCreatedAt(t *testing.T) { + h, store, _ := newSyncDMTestHandler(t) + + requester := &model.User{ID: "u-alice", Account: "alice", SiteID: "site-a"} + bot := &model.User{ID: "u-bot", Account: "helper.bot", SiteID: "site-a"} + store.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return([]model.User{*requester, *bot}, nil) + + originalCreatedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + roomID := idgen.BuildDMRoomID("u-alice", "u-bot") + dupErr := mongo.WriteException{WriteErrors: []mongo.WriteError{{Code: 11000}}} + store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(dupErr) + store.EXPECT().GetRoom(gomock.Any(), gomock.Any()).Return(&model.Room{ + ID: roomID, Type: model.RoomTypeBotDM, SiteID: "site-a", + Name: "", CreatedBy: "u-alice", + CreatedAt: originalCreatedAt, UpdatedAt: originalCreatedAt, + }, nil) + + var captured []*model.Subscription + store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, subs []*model.Subscription) error { + captured = subs + return nil + }) + store.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u-alice", Account: "alice"}, JoinedAt: originalCreatedAt}, + &model.Subscription{User: model.SubscriptionUser{ID: "u-bot", Account: "helper.bot"}, JoinedAt: originalCreatedAt}, + nil) + + req := model.SyncCreateDMRequest{RoomType: model.RoomTypeBotDM, RequesterAccount: "alice", OtherAccount: "helper.bot"} + data := marshalReq(t, req) + _, err := h.handleSyncCreateDM(newRequestCtx(), data) + require.NoError(t, err) + + require.Len(t, captured, 2) + for _, s := range captured { + assert.True(t, s.JoinedAt.Equal(originalCreatedAt), + "botDM sub.JoinedAt must reuse existing.CreatedAt on dup-key (JetStream redelivery idempotency)") + } +} + type inboxCapturedPublish struct { subj string data []byte @@ -3088,6 +3143,10 @@ func TestProcessCreateRoom_DM_PublishesLocalInbox(t *testing.T) { mockStore.EXPECT().GetUser(gomock.Any(), "bob").Return(other, nil) mockStore.EXPECT().CreateRoom(gomock.Any(), gomock.Any()).Return(nil) mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), "room-dm-inbox", "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}}, + nil) mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-inbox").Return(nil) ts := time.Now().UnixMilli() @@ -4009,6 +4068,10 @@ func TestProcessCreateRoom_DM_SetsParticipantFields(t *testing.T) { return nil }) mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), "room-dm-fields", "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u_zzz", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u_aaa", Account: "bob"}}, + nil) mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-dm-fields").Return(nil) body := makeCreateRoomBody(t, &model.CreateRoomRequest{ @@ -4044,6 +4107,10 @@ func TestProcessCreateRoom_BotDM_SetsParticipantFields(t *testing.T) { return nil }) mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: "u_zzz", Account: "alice"}}, + &model.Subscription{User: model.SubscriptionUser{ID: "u_aaa", Account: "supportbot.bot"}}, + nil) mockStore.EXPECT().ReconcileMemberCounts(gomock.Any(), "room-botdm-fields").Return(nil) body := makeCreateRoomBody(t, &model.CreateRoomRequest{ @@ -4079,10 +4146,10 @@ func TestHandleSyncCreateDM_SetsParticipantFieldsOnInitialCreate(t *testing.T) { return nil }) mockStore.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) - mockStore.EXPECT().FindDMSubscription(gomock.Any(), "alice", "bob"). - Return(&model.Subscription{User: model.SubscriptionUser{ID: requester.ID, Account: requester.Account}}, nil) - mockStore.EXPECT().FindDMSubscription(gomock.Any(), "bob", "alice"). - Return(&model.Subscription{User: model.SubscriptionUser{ID: other.ID, Account: other.Account}}, nil) + mockStore.EXPECT().FindDMSubscriptionPair(gomock.Any(), gomock.Any(), "alice").Return( + &model.Subscription{User: model.SubscriptionUser{ID: requester.ID, Account: requester.Account}}, + &model.Subscription{User: model.SubscriptionUser{ID: other.ID, Account: other.Account}}, + nil) reqBody, err := json.Marshal(model.SyncCreateDMRequest{ RequesterAccount: "alice", diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index 18488bc31..87ce1a1bc 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -1310,3 +1310,152 @@ func TestIntegration_CreateRoom_FansOutRoomKeyEvent(t *testing.T) { "key fan-out must reach every local-site member", ) } + +// TestProcessCreateRoom_BotDM_DoesNotUpsert_Integration locks in that +// processCreateRoom's botDM branch keeps its insert-only contract on a +// JetStream redelivery: a pre-existing muted, inactive botDM subscription +// must NOT be refreshed (DisableNotification, IsSubscribed, JoinedAt +// untouched). The re-subscribe refresh semantic is owned by user-service. +func TestProcessCreateRoom_BotDM_DoesNotUpsert_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ + ID: "u_alice", Account: "alice", SiteID: "site-A", + EngName: "Alice", ChineseName: "爱丽丝", + }) + mustInsertUser(t, db, &model.User{ + ID: "u_helper_bot", Account: "helper.bot", SiteID: "site-A", + }) + + roomID := idgen.BuildDMRoomID("u_alice", "u_helper_bot") + oldJoinedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Type: model.RoomTypeBotDM, SiteID: "site-A", + CreatedBy: "u_alice", CreatedAt: oldJoinedAt, UpdatedAt: oldJoinedAt, + UIDs: []string{"u_alice", "u_helper_bot"}, + Accounts: []string{"alice", "helper.bot"}, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-human-sub", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "helper.bot", + IsSubscribed: false, + DisableNotification: true, + JoinedAt: oldJoinedAt, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-bot-sub", + User: model.SubscriptionUser{ID: "u_helper_bot", Account: "helper.bot"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeBotDM, + Name: "alice", + IsSubscribed: false, + JoinedAt: oldJoinedAt, + }) + + h := newIntegrationHandler(t, store, "site-A") + const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" + ctx = natsutil.WithRequestID(ctx, reqID) + + body, err := json.Marshal(model.CreateRoomRequest{ + RoomID: roomID, + Users: []string{"helper.bot"}, + RequesterID: "u_alice", + RequesterAccount: "alice", + Timestamp: time.Now().UTC().UnixMilli(), + }) + require.NoError(t, err) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got, err := store.GetSubscription(ctx, "alice", roomID) + require.NoError(t, err) + assert.True(t, got.DisableNotification, + "botDM path must NOT clear DisableNotification on redelivery (insert-only contract)") + assert.False(t, got.IsSubscribed, + "botDM path must NOT refresh IsSubscribed on redelivery (insert-only contract)") + assert.True(t, got.JoinedAt.Equal(oldJoinedAt), + "botDM path must NOT refresh JoinedAt on redelivery (insert-only contract)") + + subCount, err := db.Collection("subscriptions").CountDocuments(ctx, bson.M{"roomId": roomID}) + require.NoError(t, err) + assert.Equal(t, int64(2), subCount, "no duplicate subs after re-delivery") +} + +// TestProcessCreateRoom_DM_DoesNotUpsert_Integration locks in that +// processCreateRoom's regular-DM branch keeps its insert-only contract: +// a pre-existing regular-DM subscription's state (specifically +// DisableNotification = true and an old JoinedAt) must NOT be refreshed +// when processCreateRoom is replayed for the same (room, user) pair. +// This regression guard prevents accidental upsert wiring on the DM +// branch in future edits. +func TestProcessCreateRoom_DM_DoesNotUpsert_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ + ID: "u_alice", Account: "alice", SiteID: "site-A", + EngName: "Alice", ChineseName: "爱丽丝", + }) + mustInsertUser(t, db, &model.User{ + ID: "u_bob", Account: "bob", SiteID: "site-A", + EngName: "Bob", ChineseName: "鲍勃", + }) + + roomID := idgen.BuildDMRoomID("u_alice", "u_bob") + oldJoinedAt := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Type: model.RoomTypeDM, SiteID: "site-A", + CreatedBy: "u_alice", CreatedAt: oldJoinedAt, UpdatedAt: oldJoinedAt, + UIDs: []string{"u_alice", "u_bob"}, + Accounts: []string{"alice", "bob"}, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-alice-sub", + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeDM, + Name: "bob", + DisableNotification: true, + JoinedAt: oldJoinedAt, + }) + mustInsertSub(t, db, &model.Subscription{ + ID: "existing-bob-sub", + User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}, + RoomID: roomID, + SiteID: "site-A", + RoomType: model.RoomTypeDM, + Name: "alice", + JoinedAt: oldJoinedAt, + }) + + h := newIntegrationHandler(t, store, "site-A") + const reqID = "0193abcd-0193-7abc-89ab-0193abcd0193" + ctx = natsutil.WithRequestID(ctx, reqID) + + body, err := json.Marshal(model.CreateRoomRequest{ + RoomID: roomID, + Users: []string{"bob"}, + RequesterID: "u_alice", + RequesterAccount: "alice", + Timestamp: time.Now().UTC().UnixMilli(), + }) + require.NoError(t, err) + require.NoError(t, h.processCreateRoom(ctx, body)) + + got, err := store.GetSubscription(ctx, "alice", roomID) + require.NoError(t, err) + assert.True(t, got.DisableNotification, + "regular-DM path must NOT clear DisableNotification on re-create (insert-only contract)") + assert.True(t, got.JoinedAt.Equal(oldJoinedAt), + "regular-DM path must NOT refresh JoinedAt on re-create (insert-only contract)") +} diff --git a/room-worker/mock_store_test.go b/room-worker/mock_store_test.go index 63d1a490b..9f0454d3e 100644 --- a/room-worker/mock_store_test.go +++ b/room-worker/mock_store_test.go @@ -170,19 +170,20 @@ func (mr *MockSubscriptionStoreMockRecorder) DeleteSubscriptionsByAccounts(ctx, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubscriptionsByAccounts", reflect.TypeOf((*MockSubscriptionStore)(nil).DeleteSubscriptionsByAccounts), ctx, roomID, accounts) } -// FindDMSubscription mocks base method. -func (m *MockSubscriptionStore) FindDMSubscription(ctx context.Context, account, targetName string) (*model.Subscription, error) { +// FindDMSubscriptionPair mocks base method. +func (m *MockSubscriptionStore) FindDMSubscriptionPair(ctx context.Context, roomID, requesterAccount string) (*model.Subscription, *model.Subscription, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindDMSubscription", ctx, account, targetName) + ret := m.ctrl.Call(m, "FindDMSubscriptionPair", ctx, roomID, requesterAccount) ret0, _ := ret[0].(*model.Subscription) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(*model.Subscription) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } -// FindDMSubscription indicates an expected call of FindDMSubscription. -func (mr *MockSubscriptionStoreMockRecorder) FindDMSubscription(ctx, account, targetName any) *gomock.Call { +// FindDMSubscriptionPair indicates an expected call of FindDMSubscriptionPair. +func (mr *MockSubscriptionStoreMockRecorder) FindDMSubscriptionPair(ctx, roomID, requesterAccount any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindDMSubscription", reflect.TypeOf((*MockSubscriptionStore)(nil).FindDMSubscription), ctx, account, targetName) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindDMSubscriptionPair", reflect.TypeOf((*MockSubscriptionStore)(nil).FindDMSubscriptionPair), ctx, roomID, requesterAccount) } // FindUsersByAccounts mocks base method. diff --git a/room-worker/store.go b/room-worker/store.go index 1ea9c6591..a8733b218 100644 --- a/room-worker/store.go +++ b/room-worker/store.go @@ -35,6 +35,11 @@ type OrgMemberStatus struct { type SubscriptionStore interface { // --- existing methods (invite flow) --- CreateSubscription(ctx context.Context, sub *model.Subscription) error + // BulkCreateSubscriptions upserts each sub keyed on (roomId, u.account) + // via $setOnInsert; collisions (e.g. JetStream redelivery) are a Mongo + // no-op so the persisted sub is preserved unchanged. Used by every + // membership write path (channel, DM, botDM, add-member); the + // re-subscribe semantic for botDM is owned by user-service. BulkCreateSubscriptions(ctx context.Context, subs []*model.Subscription) error // ListByRoom returns all subscriptions for roomID across every site. ListByRoom(ctx context.Context, roomID string) ([]model.Subscription, error) @@ -45,8 +50,12 @@ type SubscriptionStore interface { GetRoom(ctx context.Context, roomID string) (*model.Room, error) GetSubscription(ctx context.Context, account, roomID string) (*model.Subscription, error) GetUser(ctx context.Context, account string) (*model.User, error) - // FindDMSubscription returns the requester's dm/botDM sub by Name; ErrSubscriptionNotFound on miss. - FindDMSubscription(ctx context.Context, account, targetName string) (*model.Subscription, error) + // FindDMSubscriptionPair returns both subs of a DM/botDM room in a + // single query. The first return value is the sub owned by + // requesterAccount, the second is the counterpart's. Returns + // ErrSubscriptionNotFound if the room does not have exactly two + // matching subs or if requesterAccount is not among them. + FindDMSubscriptionPair(ctx context.Context, roomID, requesterAccount string) (*model.Subscription, *model.Subscription, error) AddRole(ctx context.Context, account, roomID string, role model.Role) error RemoveRole(ctx context.Context, account, roomID string, role model.Role) error diff --git a/room-worker/store_mongo.go b/room-worker/store_mongo.go index 228d8df9f..ebc0ccc45 100644 --- a/room-worker/store_mongo.go +++ b/room-worker/store_mongo.go @@ -11,6 +11,7 @@ import ( "go.mongodb.org/mongo-driver/v2/mongo/options" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/mongoutil" "github.com/hmchangw/chat/pkg/pipelines" ) @@ -317,19 +318,24 @@ func (s *MongoStore) DeleteRoomMember(ctx context.Context, roomID string, member return nil } +// BulkCreateSubscriptions upserts each sub idempotently, keyed on +// (roomId, u.account). On collision with an existing document (e.g. a +// JetStream redelivery of the same create/add-member event), $setOnInsert +// is a no-op so the persisted sub is preserved unchanged — preserving the +// insert-only contract for channel/DM/add-member paths while avoiding +// the duplicate-key error path entirely. func (s *MongoStore) BulkCreateSubscriptions(ctx context.Context, subs []*model.Subscription) error { if len(subs) == 0 { return nil } - docs := make([]interface{}, len(subs)) - for i, sub := range subs { - docs[i] = sub + models := make([]mongo.WriteModel, 0, len(subs)) + for _, sub := range subs { + filter := bson.M{"roomId": sub.RoomID, "u.account": sub.User.Account} + models = append(models, mongoutil.UpsertModel(filter, bson.M{"$setOnInsert": sub})) } - opts := options.InsertMany().SetOrdered(false) - if _, err := s.subscriptions.InsertMany(ctx, docs, opts); err != nil { - if !mongo.IsDuplicateKeyError(err) { - return fmt.Errorf("bulk create %d subscriptions: %w", len(subs), err) - } + opts := options.BulkWrite().SetOrdered(false) + if _, err := s.subscriptions.BulkWrite(ctx, models, opts); err != nil { + return fmt.Errorf("bulk create %d subscriptions: %w", len(subs), err) } return nil } @@ -430,19 +436,35 @@ func (s *MongoStore) GetSubscriptionAccounts(ctx context.Context, roomID string) return accounts, nil } -// FindDMSubscription returns the requester's dm/botDM sub by Name; ErrSubscriptionNotFound on miss. -func (s *MongoStore) FindDMSubscription(ctx context.Context, account, targetName string) (*model.Subscription, error) { - var sub model.Subscription - err := s.subscriptions.FindOne(ctx, bson.M{ - "u.account": account, - "name": targetName, - "roomType": bson.M{"$in": []model.RoomType{model.RoomTypeDM, model.RoomTypeBotDM}}, - }).Decode(&sub) - if errors.Is(err, mongo.ErrNoDocuments) { - return nil, model.ErrSubscriptionNotFound - } +// FindDMSubscriptionPair returns both subs of a DM/botDM room in a single +// query. The first return value is the sub owned by requesterAccount, the +// second is the counterpart's. +func (s *MongoStore) FindDMSubscriptionPair(ctx context.Context, roomID, requesterAccount string) (*model.Subscription, *model.Subscription, error) { + cursor, err := s.subscriptions.Find(ctx, bson.M{ + "roomId": roomID, + "roomType": bson.M{"$in": []model.RoomType{model.RoomTypeDM, model.RoomTypeBotDM}}, + }) if err != nil { - return nil, fmt.Errorf("find dm subscription: %w", err) + return nil, nil, err } - return &sub, nil + var subs []model.Subscription + if err := cursor.All(ctx, &subs); err != nil { + return nil, nil, err + } + if len(subs) != 2 { + return nil, nil, model.ErrSubscriptionNotFound + } + var requesterSub, counterpartSub *model.Subscription + for i := range subs { + switch subs[i].User.Account { + case requesterAccount: + requesterSub = &subs[i] + default: + counterpartSub = &subs[i] + } + } + if requesterSub == nil || counterpartSub == nil { + return nil, nil, model.ErrSubscriptionNotFound + } + return requesterSub, counterpartSub, nil }