From dc43174284ea93f25da738e4e207394c944679b8 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 7 May 2026 02:09:26 +0000 Subject: [PATCH 1/4] feat(room-worker,inbox-worker): origin-site MV fix per PR #145 spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements docs/superpowers/specs/2026-05-01-federated-room-origin-site-mv-fix-design.md in full. Federated rooms whose owning site adds or removes members now update that site's user-room and spotlight indexes for every affected member (same-site + cross-site), so CCS terms-lookup queries from any site resolve against the origin-site MV. room-worker (Changes 1-3): three additive log-and-continue publishes to chat.inbox.{originSiteID}.member_{added,removed}, wrapping the existing MemberAddEvent / MemberRemoveEvent in OutboxEvent so the wire format matches the federated `aggregate.>` lane that search-sync-worker's parseMemberEvent already decodes. Self-leave collapses to OutboxEvent.Type=member_removed at the wrapper while preserving the inner MemberRemoveEvent.Type=member_left, matching the cross-site OUTBOX convention. add-members skips the publish when actualAccounts is empty; remove-org reuses the existing len(accounts)>0 gate. inbox-worker (Change 4): scope the durable consumer's FilterSubjects to chat.inbox.{siteID}.aggregate.> so the new local-lane events reach search-sync-worker only. Without this, inbox-worker would re-process every cross-site member_added and emit duplicate-key churn against subscriptions room-worker already wrote locally. Tests: - room-worker/handler_test.go: six new unit tests covering the happy path for each handler method, the empty-accounts no-publish guards, and the self-leave wrapper-vs-inner type collapse. Existing publishedMsg gains an msgID field so dedup-ID assertions can compare against outboxDedupID directly. Existing length expectations bumped by one publish where applicable. - room-worker/integration_test.go: two real-Mongo tests assert the local-INBOX OutboxEvent + inner-payload structure end-to-end through processAddMembers and processRemoveMember. - inbox-worker/integration_test.go: NATS-via-testcontainers test publishes one local-lane and one aggregate-lane event, then asserts the consumer's NumPending = 1 — locking in the FilterSubjects scoping so a future regression that drops it surfaces immediately. Forward-only rollout per the spec; no backfill for pre-fix federated rooms. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- inbox-worker/integration_test.go | 59 ++++++ inbox-worker/main.go | 7 +- pkg/subject/subject.go | 10 + room-worker/handler.go | 43 +++++ room-worker/handler_test.go | 311 ++++++++++++++++++++++++++++++- room-worker/integration_test.go | 120 ++++++++++++ 6 files changed, 539 insertions(+), 11 deletions(-) diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index cc9429f12..5decfc4cb 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -9,14 +9,20 @@ import ( "testing" "time" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + natsmod "github.com/testcontainers/testcontainers-go/modules/nats" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "github.com/hmchangw/chat/pkg/model" "github.com/hmchangw/chat/pkg/natsutil" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" "github.com/hmchangw/chat/pkg/testutil" + "github.com/hmchangw/chat/pkg/testutil/testimages" ) func setupMongo(t *testing.T) *mongo.Database { @@ -408,3 +414,56 @@ func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) { assert.Nil(t, bobSub.Roles, "DMs have no roles") assert.False(t, bobSub.IsSubscribed, "DM does not set IsSubscribed=true") } + +// TestInboxWorker_FilterScoping_Integration verifies the consumer filters +// out the local lane: a local-lane publish stays unreachable to inbox-worker. +func TestInboxWorker_FilterScoping_Integration(t *testing.T) { + const siteID = "site-filter" + + ctx := context.Background() + natsContainer, err := natsmod.Run(ctx, testimages.NATS) + require.NoError(t, err) + t.Cleanup(func() { _ = natsContainer.Terminate(ctx) }) + + natsURL, err := natsContainer.ConnectionString(ctx) + require.NoError(t, err) + + nc, err := nats.Connect(natsURL) + require.NoError(t, err) + t.Cleanup(func() { nc.Close() }) + + js, err := jetstream.New(nc) + require.NoError(t, err) + + inboxCfg := stream.Inbox(siteID) + _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: inboxCfg.Name, + Subjects: inboxCfg.Subjects, + }) + require.NoError(t, err) + + cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{ + Durable: "inbox-worker", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubjects: []string{subject.InboxAggregateAll(siteID)}, + }) + require.NoError(t, err) + + _, err = js.Publish(ctx, subject.InboxMemberAdded(siteID), []byte(`{"type":"member_added"}`)) + require.NoError(t, err) + _, err = js.Publish(ctx, subject.InboxMemberAddedAggregate(siteID), []byte(`{"type":"member_added"}`)) + require.NoError(t, err) + + require.Eventually(t, func() bool { + info, err := js.Stream(ctx, inboxCfg.Name) + if err != nil { + return false + } + return info.CachedInfo().State.Msgs >= 2 + }, 2*time.Second, 50*time.Millisecond, "stream must accept both publishes") + + info, err := cons.Info(ctx) + require.NoError(t, err) + assert.EqualValues(t, 1, info.NumPending, + "FilterSubjects must scope inbox-worker to the aggregate.> lane only") +} diff --git a/inbox-worker/main.go b/inbox-worker/main.go index 295437bed..696e1c714 100644 --- a/inbox-worker/main.go +++ b/inbox-worker/main.go @@ -21,6 +21,7 @@ import ( "github.com/hmchangw/chat/pkg/otelutil" "github.com/hmchangw/chat/pkg/shutdown" "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" ) type config struct { @@ -210,9 +211,11 @@ func main() { inboxCfg := stream.Inbox(cfg.SiteID) + // Local lane is reserved for search-sync-worker; scope to aggregate.> only. cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{ - Durable: "inbox-worker", - AckPolicy: jetstream.AckExplicitPolicy, + Durable: "inbox-worker", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubjects: []string{subject.InboxAggregateAll(cfg.SiteID)}, }) if err != nil { slog.Error("create consumer failed", "error", err) diff --git a/pkg/subject/subject.go b/pkg/subject/subject.go index 5646f363d..b095871a1 100644 --- a/pkg/subject/subject.go +++ b/pkg/subject/subject.go @@ -127,6 +127,16 @@ func InboxMemberRemovedAggregate(siteID string) string { return fmt.Sprintf("chat.inbox.%s.aggregate.member_removed", siteID) } +// InboxAggregateAll returns the wildcard pattern matching every federated +// (aggregate-lane) event on a site's INBOX stream: +// `chat.inbox.{siteID}.aggregate.>`. Use with +// jetstream.ConsumerConfig.FilterSubjects to scope a consumer to the +// federated lane only — excluding local-lane publishes that are reserved +// for search-sync-worker. +func InboxAggregateAll(siteID string) string { + return fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID) +} + // InboxMemberEventSubjects returns the subject filters a consumer should use // to receive both local and federated member_added/member_removed events for // the given site. Use with jetstream.ConsumerConfig.FilterSubjects (NATS 2.10+). diff --git a/room-worker/handler.go b/room-worker/handler.go index cc8bcba4f..294eb2714 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -341,6 +341,21 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove slog.Error("member event publish failed", "error", err, "roomID", req.RoomID) } + // Wrapper Type collapses to member_removed even for self-leave so + // search-sync-worker dispatches on one MV op; inner Type is preserved. + inboxOutbox := model.OutboxEvent{ + Type: "member_removed", + SiteID: h.siteID, + DestSiteID: h.siteID, + Payload: memberEvtData, + Timestamp: now.UnixMilli(), + } + inboxData, _ := json.Marshal(inboxOutbox) + inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp) + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) + } + // System message sysMsgUser := model.SysMsgUser{ Account: user.Account, @@ -469,6 +484,19 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR if err := h.publish(ctx, subject.MemberEvent(req.RoomID), memberEvtData, ""); err != nil { slog.Error("member event publish failed", "error", err, "roomID", req.RoomID) } + + inboxOutbox := model.OutboxEvent{ + Type: "member_removed", + SiteID: h.siteID, + DestSiteID: h.siteID, + Payload: memberEvtData, + Timestamp: now.UnixMilli(), + } + inboxData, _ := json.Marshal(inboxOutbox) + inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp) + if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil { + slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID) + } } // System message @@ -741,6 +769,21 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error slog.Error("member add event publish failed", "error", err, "roomID", req.RoomID) } + if len(actualAccounts) > 0 { + inboxOutbox := model.OutboxEvent{ + Type: "member_added", + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: memberAddData, + Timestamp: now.UnixMilli(), + } + inboxData, _ := json.Marshal(inboxOutbox) + inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, outboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil { + slog.Error("local inbox member_added publish failed", "error", err, "roomID", req.RoomID) + } + } + membersAdded := model.MembersAdded{ Individuals: actualAccounts, Orgs: req.Orgs, diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index b83bd5b23..97f71e299 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -22,8 +22,9 @@ import ( ) type publishedMsg struct { - subj string - data []byte + subj string + data []byte + msgID string } func TestHandler_ProcessRoleUpdate_Promote(t *testing.T) { @@ -402,8 +403,8 @@ func TestHandler_ProcessRemoveMember_SelfLeave_IndividualOnly(t *testing.T) { err := h.processRemoveMember(context.Background(), data) require.NoError(t, err) - // Expect: subscription update + member change event + system message = 3 publishes - assert.Len(t, published, 3, "expected 3 publishes: sub update, member event, sys msg") + // Expect: subscription update + member change event + local INBOX + system message = 4 publishes + assert.Len(t, published, 4, "expected 4 publishes: sub update, member event, local INBOX, sys msg") subjSet := make(map[string]bool) for _, p := range published { @@ -412,6 +413,7 @@ func TestHandler_ProcessRemoveMember_SelfLeave_IndividualOnly(t *testing.T) { assert.True(t, subjSet[subject.SubscriptionUpdate(account)], "expected subscription update published") assert.True(t, subjSet[subject.MemberEvent(roomID)], "expected member event published") + assert.True(t, subjSet[subject.InboxMemberRemoved(siteID)], "expected local INBOX member_removed published") for _, p := range published { if p.subj != subject.SubscriptionUpdate(account) { @@ -584,7 +586,7 @@ func TestHandler_ProcessRemoveMember_OwnerRemovesIndividual(t *testing.T) { err := h.processRemoveMember(context.Background(), data) require.NoError(t, err) - assert.Len(t, published, 3, "expected 3 publishes: sub update, member event, sys msg") + assert.Len(t, published, 4, "expected 4 publishes: sub update, member event, local INBOX, sys msg") // Verify the sys msg has type "member_removed" for _, p := range published { @@ -1022,8 +1024,8 @@ func TestHandler_ProcessRemoveMember_OwnerRemovesOrg(t *testing.T) { err := h.processRemoveMember(context.Background(), data) require.NoError(t, err) - // Expect: 2 sub updates (carol, dave) + 1 member event + 1 sys msg = 4 publishes - assert.Len(t, published, 4, "expected 4 publishes: 2 sub updates, member event, sys msg") + // Expect: 2 sub updates (carol, dave) + 1 member event + 1 local INBOX + 1 sys msg = 5 publishes + assert.Len(t, published, 5, "expected 5 publishes: 2 sub updates, member event, local INBOX, sys msg") subjSet := make(map[string]bool) for _, p := range published { @@ -1080,8 +1082,8 @@ func TestHandler_ProcessRemoveMember_CrossSiteOutbox(t *testing.T) { err := h.processRemoveMember(context.Background(), data) require.NoError(t, err) - // Expect: sub update + member event + sys msg + outbox = 4 publishes - assert.Len(t, published, 4, "expected 4 publishes including outbox for federated user") + // Expect: sub update + member event + local INBOX + sys msg + outbox = 5 publishes + assert.Len(t, published, 5, "expected 5 publishes including local INBOX and outbox for federated user") outboxSubj := subject.Outbox(localSite, userSite, "member_removed") subjSet := make(map[string]bool) @@ -2446,3 +2448,294 @@ func TestHandler_ProcessAddMembers_HistorySharedSinceWinsOverTimestamp(t *testin assert.Equal(t, sharedSince, *addEvt.HistorySharedSince, "MemberAddEvent.HistorySharedSince must equal req.History.SharedSince, not req.Timestamp") } + +// findInboxPublish returns the message published on the local INBOX subject, +// or fails the test if none / more than one were captured. +func findInboxPublish(t *testing.T, published []publishedMsg, want string) publishedMsg { + t.Helper() + var matches []publishedMsg + for _, p := range published { + if p.subj == want { + matches = append(matches, p) + } + } + require.Lenf(t, matches, 1, "expected exactly 1 publish to %s, got %d (all: %+v)", want, len(matches), published) + return matches[0] +} + +func TestHandler_processAddMembers_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + const ( + roomID = "r1" + siteID = "site-a" + requester = "alice" + ) + + store.EXPECT().GetRoom(gomock.Any(), roomID).Return(&model.Room{ID: roomID, SiteID: siteID}, nil) + store.EXPECT().ListNewMembers(gomock.Any(), nil, []string{"charlie", "bob", "carol"}, roomID). + Return([]string{"charlie", "bob", "carol"}, nil) + store.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"charlie", "bob", "carol"}).Return([]model.User{ + {ID: "u-charlie", Account: "charlie", SiteID: siteID}, + {ID: "u-bob", Account: "bob", SiteID: "site-b"}, + {ID: "u-carol", Account: "carol", SiteID: "site-c"}, + }, nil) + store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + store.EXPECT().ReconcileMemberCounts(gomock.Any(), roomID).Return(nil) + store.EXPECT().HasOrgRoomMembers(gomock.Any(), roomID).Return(false, nil) + + var published []publishedMsg + h := NewHandler(store, siteID, func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.AddMembersRequest{ + RoomID: roomID, + Users: []string{"charlie", "bob", "carol"}, + RequesterAccount: requester, + Timestamp: 12345, + History: model.HistoryConfig{Mode: model.HistoryModeAll}, + } + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-add-local-inbox") + require.NoError(t, h.processAddMembers(ctx, reqData)) + + got := findInboxPublish(t, published, subject.InboxMemberAdded(siteID)) + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outboxEvt)) + assert.Equal(t, "member_added", outboxEvt.Type) + assert.Equal(t, siteID, outboxEvt.SiteID, "origin site must equal h.siteID") + assert.Equal(t, siteID, outboxEvt.DestSiteID, "self-loop publish: dest must equal origin") + assert.Greater(t, outboxEvt.Timestamp, int64(0), "OutboxEvent.Timestamp must be set") + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_added", inner.Type) + assert.Equal(t, roomID, inner.RoomID) + assert.Equal(t, siteID, inner.SiteID) + assert.ElementsMatch(t, []string{"charlie", "bob", "carol"}, inner.Accounts, + "local INBOX must carry full add set — same-site + cross-site") + + wantMsgID := outboxDedupID(ctx, siteID, fmt.Sprintf("%s:%s:%d", roomID, requester, req.Timestamp)) + assert.Equal(t, wantMsgID, got.msgID, "Nats-Msg-Id must follow outboxDedupID(ctx, siteID, payloadSeed)") +} + +func TestHandler_processAddMembers_NoLocalInboxOnEmptyAccounts(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + store.EXPECT().GetRoom(gomock.Any(), "r1").Return(&model.Room{ID: "r1", SiteID: "site-a"}, nil) + store.EXPECT().ListNewMembers(gomock.Any(), nil, []string{"bob"}, "r1").Return(nil, nil) + + var published []publishedMsg + h := NewHandler(store, "site-a", func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.AddMembersRequest{RoomID: "r1", Users: []string{"bob"}, RequesterAccount: "alice", Timestamp: 1000} + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-empty-accounts") + require.NoError(t, h.processAddMembers(ctx, reqData)) + + for _, p := range published { + assert.NotEqual(t, subject.InboxMemberAdded("site-a"), p.subj, + "no local INBOX publish should occur when actualAccounts is empty") + } +} + +func TestHandler_processRemoveIndividual_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + const ( + roomID = "r1" + account = "bob" + requester = "alice" + siteID = "site-a" + ) + + userResult := &UserWithMembership{ + User: model.User{ID: "u-bob", Account: account, SiteID: siteID}, + HasOrgMembership: false, + } + + store.EXPECT().GetUserWithMembership(gomock.Any(), roomID, account).Return(userResult, nil) + store.EXPECT().DeleteRoomMember(gomock.Any(), roomID, model.RoomMemberIndividual, "u-bob").Return(nil) + store.EXPECT().DeleteSubscription(gomock.Any(), roomID, account).Return(int64(1), nil) + store.EXPECT().ReconcileMemberCounts(gomock.Any(), roomID).Return(nil) + + var published []publishedMsg + h := NewHandler(store, siteID, func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.RemoveMemberRequest{RoomID: roomID, Requester: requester, Account: account, Timestamp: 9999} + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-rm-individual-inbox") + require.NoError(t, h.processRemoveMember(ctx, reqData)) + + got := findInboxPublish(t, published, subject.InboxMemberRemoved(siteID)) + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outboxEvt)) + assert.Equal(t, "member_removed", outboxEvt.Type) + assert.Equal(t, siteID, outboxEvt.SiteID) + assert.Equal(t, siteID, outboxEvt.DestSiteID) + assert.Greater(t, outboxEvt.Timestamp, int64(0)) + + var inner model.MemberRemoveEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_removed", inner.Type, "admin-remove inner type is member_removed") + assert.Equal(t, roomID, inner.RoomID) + assert.Equal(t, []string{account}, inner.Accounts) + + wantMsgID := outboxDedupID(ctx, siteID, fmt.Sprintf("%s:%s:%d", roomID, account, req.Timestamp)) + assert.Equal(t, wantMsgID, got.msgID, "Nats-Msg-Id must follow outboxDedupID(ctx, siteID, payloadSeed)") +} + +// Self-leave: wrapper Type collapses to member_removed while inner Type +// stays member_left — matches the cross-site OUTBOX convention so +// search-sync-worker dispatches on a single MV op. +func TestHandler_processRemoveIndividual_SelfLeavePublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + const ( + roomID = "r1" + account = "alice" + siteID = "site-a" + ) + + userResult := &UserWithMembership{ + User: model.User{ID: "u1", Account: account, SiteID: siteID}, + HasOrgMembership: false, + } + + store.EXPECT().GetUserWithMembership(gomock.Any(), roomID, account).Return(userResult, nil) + store.EXPECT().DeleteRoomMember(gomock.Any(), roomID, model.RoomMemberIndividual, "u1").Return(nil) + store.EXPECT().DeleteSubscription(gomock.Any(), roomID, account).Return(int64(1), nil) + store.EXPECT().ReconcileMemberCounts(gomock.Any(), roomID).Return(nil) + + var published []publishedMsg + h := NewHandler(store, siteID, func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.RemoveMemberRequest{RoomID: roomID, Requester: account, Account: account, Timestamp: 1000} + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-rm-self-leave-inbox") + require.NoError(t, h.processRemoveMember(ctx, reqData)) + + got := findInboxPublish(t, published, subject.InboxMemberRemoved(siteID)) + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outboxEvt)) + assert.Equal(t, "member_removed", outboxEvt.Type, + "OutboxEvent wrapper Type must be member_removed even for self-leave") + + var inner model.MemberRemoveEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_left", inner.Type, + "inner MemberRemoveEvent.Type is preserved as member_left for self-leave") +} + +func TestHandler_processRemoveOrg_PublishesLocalInbox(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + const ( + roomID = "r1" + orgID = "org-1" + requester = "alice" + siteID = "site-a" + ) + + orgMembers := []OrgMemberStatus{ + {Account: "carol", SiteID: siteID, SectName: "Eng", HasIndividualMembership: false}, + {Account: "dave", SiteID: "site-b", SectName: "Eng", HasIndividualMembership: false}, + } + + store.EXPECT().GetOrgMembersWithIndividualStatus(gomock.Any(), roomID, orgID).Return(orgMembers, nil) + store.EXPECT(). + DeleteSubscriptionsByAccounts(gomock.Any(), roomID, gomock.InAnyOrder([]string{"carol", "dave"})). + Return(int64(2), nil) + store.EXPECT().DeleteRoomMember(gomock.Any(), roomID, model.RoomMemberOrg, orgID).Return(nil) + store.EXPECT().ReconcileMemberCounts(gomock.Any(), roomID).Return(nil) + + var published []publishedMsg + h := NewHandler(store, siteID, func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.RemoveMemberRequest{RoomID: roomID, Requester: requester, OrgID: orgID, Timestamp: 4242} + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-rm-org-inbox") + require.NoError(t, h.processRemoveMember(ctx, reqData)) + + got := findInboxPublish(t, published, subject.InboxMemberRemoved(siteID)) + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(got.data, &outboxEvt)) + assert.Equal(t, "member_removed", outboxEvt.Type) + assert.Equal(t, siteID, outboxEvt.SiteID) + assert.Equal(t, siteID, outboxEvt.DestSiteID) + + var inner model.MemberRemoveEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_removed", inner.Type) + assert.Equal(t, roomID, inner.RoomID) + assert.Equal(t, orgID, inner.OrgID) + assert.ElementsMatch(t, []string{"carol", "dave"}, inner.Accounts, + "local INBOX must carry every removed account regardless of site") + + wantMsgID := outboxDedupID(ctx, siteID, fmt.Sprintf("%s:%s:%d", roomID, orgID, req.Timestamp)) + assert.Equal(t, wantMsgID, got.msgID, "Nats-Msg-Id must follow outboxDedupID(ctx, siteID, payloadSeed)") +} + +func TestHandler_processRemoveOrg_NoLocalInboxOnZeroAccounts(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + const ( + roomID = "r1" + orgID = "org-1" + siteID = "site-a" + ) + + orgMembers := []OrgMemberStatus{ + {Account: "carol", SiteID: siteID, SectName: "Eng", HasIndividualMembership: true}, + } + + store.EXPECT().GetOrgMembersWithIndividualStatus(gomock.Any(), roomID, orgID).Return(orgMembers, nil) + store.EXPECT().DeleteRoomMember(gomock.Any(), roomID, model.RoomMemberOrg, orgID).Return(nil) + store.EXPECT().ReconcileMemberCounts(gomock.Any(), roomID).Return(nil) + + var published []publishedMsg + h := NewHandler(store, siteID, func(_ context.Context, subj string, data []byte, msgID string) error { + published = append(published, publishedMsg{subj: subj, data: data, msgID: msgID}) + return nil + }) + + req := model.RemoveMemberRequest{RoomID: roomID, Requester: "alice", OrgID: orgID, Timestamp: 1000} + reqData, _ := json.Marshal(req) + + ctx := natsutil.WithRequestID(context.Background(), "req-rm-org-empty") + require.NoError(t, h.processRemoveMember(ctx, reqData)) + + for _, p := range published { + assert.NotEqual(t, subject.InboxMemberRemoved(siteID), p.subj, + "no local INBOX publish when no accounts are actually removed") + } +} diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index e8d44e966..660972b3d 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -851,3 +851,123 @@ func TestProcessCreateRoomIdempotentRedelivery(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(2), subCount, "redelivery must not create duplicate subs") } + +func TestProcessAddMembers_PublishesLocalInbox_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ID: "u_alice", Account: "alice", SiteID: "site-A", + EngName: "Alice", ChineseName: "爱丽丝"}) + mustInsertUser(t, db, &model.User{ID: "u_charlie", Account: "charlie", SiteID: "site-A", + EngName: "Charlie", ChineseName: "查理"}) + mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", SiteID: "site-B", + EngName: "Bob", ChineseName: "鲍勃"}) + + roomID := idgen.GenerateID() + const roomName = "federated-room" + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Name: roomName, Type: model.RoomTypeChannel, + SiteID: "site-A", CreatedBy: "u_alice", + CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), + }) + mustInsertSub(t, db, &model.Subscription{ + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, SiteID: "site-A", Name: roomName, RoomType: model.RoomTypeChannel, + Roles: []model.Role{model.RoleOwner}, + JoinedAt: time.Now().UTC(), + }) + + cap := &publishCapture{} + h := NewHandler(store, "site-A", cap.fn()) + const reqID = "0193abcd-0193-7abc-89ab-aaaa00000001" + ctx = natsutil.WithRequestID(ctx, reqID) + + now := time.Now().UTC().UnixMilli() + body, err := json.Marshal(model.AddMembersRequest{ + RoomID: roomID, + Users: []string{"charlie", "bob"}, + RequesterID: "u_alice", + RequesterAccount: "alice", + History: model.HistoryConfig{Mode: model.HistoryModeAll}, + Timestamp: now, + }) + require.NoError(t, err) + require.NoError(t, h.processAddMembers(ctx, body)) + + pubs := cap.outboxOnPrefix(subject.InboxMemberAdded("site-A")) + require.Len(t, pubs, 1, "exactly one local INBOX member_added publish per add-members call") + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(pubs[0].data, &outboxEvt)) + assert.Equal(t, "member_added", outboxEvt.Type) + assert.Equal(t, "site-A", outboxEvt.SiteID) + assert.Equal(t, "site-A", outboxEvt.DestSiteID, "self-loop: dest must equal origin") + + var inner model.MemberAddEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_added", inner.Type) + assert.Equal(t, roomID, inner.RoomID) + assert.Equal(t, "site-A", inner.SiteID) + assert.ElementsMatch(t, []string{"charlie", "bob"}, inner.Accounts, + "local INBOX must carry full add set — same-site (charlie) + remote (bob)") + assert.Equal(t, reqID+":site-A", pubs[0].msgID, + "Nats-Msg-Id must be outboxDedupID(ctx, originSite, payloadSeed) so JetStream dedups self-loop replays") +} + +func TestProcessRemoveIndividual_PublishesLocalInbox_Integration(t *testing.T) { + ctx := context.Background() + db := setupMongo(t) + store := NewMongoStore(db) + + mustInsertUser(t, db, &model.User{ID: "u_alice", Account: "alice", SiteID: "site-A"}) + mustInsertUser(t, db, &model.User{ID: "u_bob", Account: "bob", SiteID: "site-B", + EngName: "Bob", ChineseName: "鲍勃"}) + + roomID := idgen.GenerateID() + mustInsertRoom(t, db, &model.Room{ + ID: roomID, Name: "fed-room", Type: model.RoomTypeChannel, SiteID: "site-A", + CreatedBy: "u_alice", CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), + }) + mustInsertSub(t, db, &model.Subscription{ + ID: idgen.GenerateUUIDv7(), User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}, + RoomID: roomID, SiteID: "site-A", Name: "fed-room", RoomType: model.RoomTypeChannel, + Roles: []model.Role{model.RoleMember}, JoinedAt: time.Now().UTC(), + }) + _, err := db.Collection("room_members").InsertOne(ctx, model.RoomMember{ + ID: idgen.GenerateUUIDv7(), RoomID: roomID, Ts: time.Now().UTC(), + Member: model.RoomMemberEntry{ID: "u_bob", Type: model.RoomMemberIndividual, Account: "bob"}, + }) + require.NoError(t, err) + + cap := &publishCapture{} + h := NewHandler(store, "site-A", cap.fn()) + const reqID = "0193abcd-0193-7abc-89ab-aaaa00000002" + ctx = natsutil.WithRequestID(ctx, reqID) + + body, err := json.Marshal(model.RemoveMemberRequest{ + RoomID: roomID, + Requester: "alice", + Account: "bob", + Timestamp: time.Now().UTC().UnixMilli(), + }) + require.NoError(t, err) + require.NoError(t, h.processRemoveMember(ctx, body)) + + pubs := cap.outboxOnPrefix(subject.InboxMemberRemoved("site-A")) + require.Len(t, pubs, 1) + + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(pubs[0].data, &outboxEvt)) + assert.Equal(t, "member_removed", outboxEvt.Type) + assert.Equal(t, "site-A", outboxEvt.SiteID) + assert.Equal(t, "site-A", outboxEvt.DestSiteID) + + var inner model.MemberRemoveEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &inner)) + assert.Equal(t, "member_removed", inner.Type, "admin-remove: inner type is member_removed") + assert.Equal(t, roomID, inner.RoomID) + assert.Equal(t, []string{"bob"}, inner.Accounts) + assert.Equal(t, reqID+":site-A", pubs[0].msgID) +} From 31febfd8b3a11f18326937e6e1cdfbd685496e35 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 7 May 2026 06:17:34 +0000 Subject: [PATCH 2/4] test(inbox-worker): extract setupNATS helper per CodeRabbit nitpick Aligns the new NATS-testcontainer test with the CLAUDE.md convention "Write setup(t *testing.T) helpers that start a container, register t.Cleanup, and return a connected client". No behavior change. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- inbox-worker/integration_test.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index 5decfc4cb..4c24f0d04 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -415,28 +415,38 @@ func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) { assert.False(t, bobSub.IsSubscribed, "DM does not set IsSubscribed=true") } -// TestInboxWorker_FilterScoping_Integration verifies the consumer filters -// out the local lane: a local-lane publish stays unreachable to inbox-worker. -func TestInboxWorker_FilterScoping_Integration(t *testing.T) { - const siteID = "site-filter" - +// setupNATS starts a NATS container with JetStream enabled and returns a +// JetStream client tied to the test's lifetime. +func setupNATS(t *testing.T) (context.Context, jetstream.JetStream) { + t.Helper() ctx := context.Background() - natsContainer, err := natsmod.Run(ctx, testimages.NATS) + + c, err := natsmod.Run(ctx, testimages.NATS) require.NoError(t, err) - t.Cleanup(func() { _ = natsContainer.Terminate(ctx) }) + t.Cleanup(func() { _ = c.Terminate(ctx) }) - natsURL, err := natsContainer.ConnectionString(ctx) + url, err := c.ConnectionString(ctx) require.NoError(t, err) - nc, err := nats.Connect(natsURL) + nc, err := nats.Connect(url) require.NoError(t, err) t.Cleanup(func() { nc.Close() }) js, err := jetstream.New(nc) require.NoError(t, err) + return ctx, js +} + +// TestInboxWorker_FilterScoping_Integration verifies the consumer filters +// out the local lane: a local-lane publish stays unreachable to inbox-worker. +func TestInboxWorker_FilterScoping_Integration(t *testing.T) { + const siteID = "site-filter" + + ctx, js := setupNATS(t) + inboxCfg := stream.Inbox(siteID) - _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: inboxCfg.Name, Subjects: inboxCfg.Subjects, }) From e0c0a31cb5db19c27b40dc4f4f02669fd6959230 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 7 May 2026 09:09:35 +0000 Subject: [PATCH 3/4] feat(searchengine): env-gated TLS skip verify for ES connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in tlsSkipVerify bool to searchengine.New, plumbed from each service's config: - search-service: SEARCH_TLS_SKIP_VERIFY (default false) - search-sync-worker: SEARCH_TLS_SKIP_VERIFY (default false) Default-off keeps prod safe; ops opts in per environment for self-signed/internal ES clusters. When false, the factory uses the standard ES client transport — same behavior as before this PR. When true, clones http.DefaultTransport (preserving ProxyFromEnvironment, dial/TLS-handshake timeouts, HTTP/2, idle-conn tuning) and overrides only TLSClientConfig with InsecureSkipVerify=true and MinVersion=TLS 1.2, guarding the type assertion on http.DefaultTransport so we error out cleanly if a middleware (e.g. OTel) has replaced it. Also enables gosec G402 narrowly in .golangci.yml so the //nolint:gosec annotation in pkg/oidc and pkg/searchengine actually suppresses a real rule, and any future unannotated InsecureSkipVerify is rejected at lint time. Includes a goimports-only struct alignment tweak in room-worker/integration_test.go picked up while running make fmt — no behavior change. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- .golangci.yml | 8 +++++++ pkg/searchengine/factory.go | 23 +++++++++++++++++--- room-worker/integration_test.go | 14 ++++++------ search-service/integration_test.go | 4 ++-- search-service/main.go | 7 +++--- search-sync-worker/inbox_integration_test.go | 10 ++++----- search-sync-worker/integration_test.go | 4 ++-- search-sync-worker/main.go | 19 ++++++++-------- 8 files changed, 58 insertions(+), 31 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index eecc8847b..0ca4c5742 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,6 +9,7 @@ linters: - nilerr - bodyclose - exhaustive + - gosec settings: exhaustive: default-signifies-exhaustive: true @@ -16,6 +17,13 @@ linters: enabled-tags: - diagnostic - performance + gosec: + # Narrowly scoped to G402 (TLS InsecureSkipVerify) so the existing + # //nolint:gosec annotations in pkg/oidc and pkg/searchengine are + # actually suppressing a real rule, and any new unannotated + # InsecureSkipVerify is rejected at lint time. + includes: + - G402 exclusions: presets: - std-error-handling diff --git a/pkg/searchengine/factory.go b/pkg/searchengine/factory.go index ba1620ea0..aa6000906 100644 --- a/pkg/searchengine/factory.go +++ b/pkg/searchengine/factory.go @@ -2,19 +2,36 @@ package searchengine import ( "context" + "crypto/tls" "fmt" + "net/http" "time" "github.com/elastic/go-elasticsearch/v8" ) // New creates a SearchEngine for the given backend ("elasticsearch" or "opensearch"). -// It verifies connectivity via Ping before returning. -func New(ctx context.Context, backend, url string) (SearchEngine, error) { +// It verifies connectivity via Ping before returning. When tlsSkipVerify is +// true, server certificate verification is disabled — intended for +// self-signed/internal ES clusters; MUST stay false in production. +func New(ctx context.Context, backend, url string, tlsSkipVerify bool) (SearchEngine, error) { var transport Transporter switch backend { case "elasticsearch": - client, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{url}}) + esCfg := elasticsearch.Config{Addresses: []string{url}} + if tlsSkipVerify { + dt, ok := http.DefaultTransport.(*http.Transport) + if !ok { + return nil, fmt.Errorf("create elasticsearch client: http.DefaultTransport is not *http.Transport") + } + httpTransport := dt.Clone() + httpTransport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // intentional: opt-in via config for self-signed ES certs + MinVersion: tls.VersionTLS12, + } + esCfg.Transport = httpTransport + } + client, err := elasticsearch.NewClient(esCfg) if err != nil { return nil, fmt.Errorf("create elasticsearch client: %w", err) } diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index 660972b3d..a9375c6be 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -752,9 +752,9 @@ func TestProcessAddMembers_OutboxPerRemoteSite(t *testing.T) { }) // Owner sub. mustInsertSub(t, db, &model.Subscription{ - ID: idgen.GenerateUUIDv7(), - User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, - RoomID: roomID, SiteID: "site-A", Name: roomName, RoomType: model.RoomTypeChannel, + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, SiteID: "site-A", Name: roomName, RoomType: model.RoomTypeChannel, Roles: []model.Role{model.RoleOwner}, JoinedAt: time.Now().UTC(), }) @@ -872,9 +872,9 @@ func TestProcessAddMembers_PublishesLocalInbox_Integration(t *testing.T) { CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), }) mustInsertSub(t, db, &model.Subscription{ - ID: idgen.GenerateUUIDv7(), - User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, - RoomID: roomID, SiteID: "site-A", Name: roomName, RoomType: model.RoomTypeChannel, + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: "u_alice", Account: "alice"}, + RoomID: roomID, SiteID: "site-A", Name: roomName, RoomType: model.RoomTypeChannel, Roles: []model.Role{model.RoleOwner}, JoinedAt: time.Now().UTC(), }) @@ -933,7 +933,7 @@ func TestProcessRemoveIndividual_PublishesLocalInbox_Integration(t *testing.T) { mustInsertSub(t, db, &model.Subscription{ ID: idgen.GenerateUUIDv7(), User: model.SubscriptionUser{ID: "u_bob", Account: "bob"}, RoomID: roomID, SiteID: "site-A", Name: "fed-room", RoomType: model.RoomTypeChannel, - Roles: []model.Role{model.RoleMember}, JoinedAt: time.Now().UTC(), + Roles: []model.Role{model.RoleMember}, JoinedAt: time.Now().UTC(), }) _, err := db.Collection("room_members").InsertOne(ctx, model.RoomMember{ ID: idgen.GenerateUUIDv7(), RoomID: roomID, Ts: time.Now().UTC(), diff --git a/search-service/integration_test.go b/search-service/integration_test.go index 7a5a494d8..81362bc6e 100644 --- a/search-service/integration_test.go +++ b/search-service/integration_test.go @@ -90,9 +90,9 @@ func setupCCSFixture(t *testing.T) *ccsFixture { waitForRemoteConnected(t, localURL, "remote1", 120*time.Second) t.Logf("CCS fixture: remote1 connected") - localEngine, err := searchengine.New(ctx, "elasticsearch", localURL) + localEngine, err := searchengine.New(ctx, "elasticsearch", localURL, false) require.NoError(t, err, "build searchengine for local") - remoteEngine, err := searchengine.New(ctx, "elasticsearch", remoteURL) + remoteEngine, err := searchengine.New(ctx, "elasticsearch", remoteURL, false) require.NoError(t, err, "build searchengine for remote") t.Logf("CCS fixture: starting valkey") diff --git a/search-service/main.go b/search-service/main.go index 2e383b40d..7f1240d29 100644 --- a/search-service/main.go +++ b/search-service/main.go @@ -22,8 +22,9 @@ import ( // ESConfig bundles the search backend knobs. BACKEND is the key // `pkg/searchengine.New` reads to choose between elasticsearch/opensearch. type ESConfig struct { - URL string `env:"URL,required"` - Backend string `env:"BACKEND" envDefault:"elasticsearch"` + URL string `env:"URL,required"` + Backend string `env:"BACKEND" envDefault:"elasticsearch"` + TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"` } type ValkeyConfig struct { @@ -80,7 +81,7 @@ func main() { os.Exit(1) } - engine, err := searchengine.New(ctx, cfg.ES.Backend, cfg.ES.URL) + engine, err := searchengine.New(ctx, cfg.ES.Backend, cfg.ES.URL, cfg.ES.TLSSkipVerify) if err != nil { slog.Error("search engine connect failed", "error", err) os.Exit(1) diff --git a/search-sync-worker/inbox_integration_test.go b/search-sync-worker/inbox_integration_test.go index 8821e22b1..7ff291253 100644 --- a/search-sync-worker/inbox_integration_test.go +++ b/search-sync-worker/inbox_integration_test.go @@ -153,7 +153,7 @@ func TestSpotlightSync_Integration(t *testing.T) { indexName := "spotlight-site-spot-v1-chat" // --- ES template + index --- - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) @@ -245,7 +245,7 @@ func TestSpotlightSync_BulkInvite(t *testing.T) { siteID := "site-spot-bulk" indexName := "spotlight-site-spot-bulk-v1-chat" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) @@ -318,7 +318,7 @@ func TestUserRoomSync_Integration(t *testing.T) { siteID := "site-ur" indexName := "user-room-site-ur" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) @@ -440,7 +440,7 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { siteID := "site-ur-bulk" indexName := "user-room-site-ur-bulk" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) @@ -555,7 +555,7 @@ func TestUserRoomSync_LWWGuard(t *testing.T) { siteID := "site-lww" indexName := "user-room-site-lww" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) diff --git a/search-sync-worker/integration_test.go b/search-sync-worker/integration_test.go index 04f34b6f1..99d597498 100644 --- a/search-sync-worker/integration_test.go +++ b/search-sync-worker/integration_test.go @@ -310,7 +310,7 @@ func TestSearchSyncIntegration(t *testing.T) { // --- Setup search engine + template --- prefix := "msgs-inttest-v1" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err, "create search engine") // Wait for cluster to be green before creating indices. @@ -496,7 +496,7 @@ func TestCustomAnalyzer(t *testing.T) { ctx := context.Background() prefix := "analyzer-test-v1" - engine, err := searchengine.New(ctx, "elasticsearch", esURL) + engine, err := searchengine.New(ctx, "elasticsearch", esURL, false) require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) diff --git a/search-sync-worker/main.go b/search-sync-worker/main.go index 3881f0e2d..1b733cafd 100644 --- a/search-sync-worker/main.go +++ b/search-sync-worker/main.go @@ -40,14 +40,15 @@ type bootstrapConfig struct { } type config struct { - NatsURL string `env:"NATS_URL,required"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID,required"` - SearchURL string `env:"SEARCH_URL,required"` - SearchBackend string `env:"SEARCH_BACKEND" envDefault:"elasticsearch"` - MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"` - SpotlightIndex string `env:"SPOTLIGHT_INDEX" envDefault:""` - UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` + NatsURL string `env:"NATS_URL,required"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID,required"` + SearchURL string `env:"SEARCH_URL,required"` + SearchBackend string `env:"SEARCH_BACKEND" envDefault:"elasticsearch"` + SearchTLSSkipVerify bool `env:"SEARCH_TLS_SKIP_VERIFY" envDefault:"false"` + MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"` + SpotlightIndex string `env:"SPOTLIGHT_INDEX" envDefault:""` + UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` // FetchBatchSize is the maximum number of JetStream messages to pull // per Fetch() round-trip. Smaller values give lower latency per message @@ -118,7 +119,7 @@ func main() { os.Exit(1) } - engine, err := searchengine.New(ctx, cfg.SearchBackend, cfg.SearchURL) + engine, err := searchengine.New(ctx, cfg.SearchBackend, cfg.SearchURL, cfg.SearchTLSSkipVerify) if err != nil { slog.Error("search engine connect failed", "error", err) os.Exit(1) From 48143649b4f3b83d05b915ed1ec032fc98336658 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 03:51:51 +0000 Subject: [PATCH 4/4] feat(auth): support multiple allowed OIDC audiences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the single-audience verifier with a multi-audience allow-list so a shared Keycloak realm that issues tokens for several client audiences (the common org pattern) can be served by one auth-service. pkg/oidc: - Config.Audience (string) → Config.Audiences ([]string). - Disable go-oidc's built-in single-audience check via oidc.Config{SkipClientIDCheck: true} and enforce the allow-list in Validate after cryptographic verification: a token is accepted when any of its `aud` entries matches any configured audience. - New ErrNoAudiences and ErrAudienceNotAllowed sentinels for clear error propagation; NewValidator fails fast on an empty Audiences list. - Single Verify call per request — no retry-on-mismatch loop. auth-service: - OIDC_AUDIENCE → OIDC_AUDIENCES (comma-separated; envSeparator:","). - Required-config check updated; deploy/.env.example and deploy/docker-compose.yml renamed to match. Tests: - pkg/oidc/oidc_test.go: table-driven tests for containsAudience (single/multi/empty cases) plus NewValidator empty-audiences guard. pkg/oidc had no tests before this commit. https://claude.ai/code/session_01UkLD7hpaypxjeh5zbEWTjp --- auth-service/deploy/.env.example | 3 +- auth-service/deploy/docker-compose.yml | 2 +- auth-service/main.go | 12 +++---- pkg/oidc/oidc.go | 45 +++++++++++++++++--------- pkg/oidc/oidc_test.go | 37 +++++++++++++++++++++ 5 files changed, 76 insertions(+), 23 deletions(-) create mode 100644 pkg/oidc/oidc_test.go diff --git a/auth-service/deploy/.env.example b/auth-service/deploy/.env.example index e9bdce2c8..2a401a023 100644 --- a/auth-service/deploy/.env.example +++ b/auth-service/deploy/.env.example @@ -13,7 +13,8 @@ AUTH_SIGNING_KEY=SAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX # Uses Docker service name (auth-service runs inside Docker network). # For host-based dev, change to http://localhost:9090/realms/chatapp OIDC_ISSUER_URL=http://keycloak:8080/realms/chatapp -OIDC_AUDIENCE=nats-chat +# Comma-separated list of acceptable `aud` values. +OIDC_AUDIENCES=nats-chat # Skip TLS cert verification for OIDC issuer (dev only — self-signed certs) TLS_SKIP_VERIFY=false diff --git a/auth-service/deploy/docker-compose.yml b/auth-service/deploy/docker-compose.yml index 494fb6eb7..6e943e2f9 100644 --- a/auth-service/deploy/docker-compose.yml +++ b/auth-service/deploy/docker-compose.yml @@ -16,7 +16,7 @@ services: - DEV_MODE=${DEV_MODE:-true} - NATS_JWT_EXPIRY=2h - OIDC_ISSUER_URL=http://keycloak:8080/realms/chatapp - - OIDC_AUDIENCE=nats-chat + - OIDC_AUDIENCES=nats-chat - TLS_SKIP_VERIFY=false networks: - chat-local diff --git a/auth-service/main.go b/auth-service/main.go index 05a549816..650a1403c 100644 --- a/auth-service/main.go +++ b/auth-service/main.go @@ -23,9 +23,9 @@ type config struct { NATSJWTExpiry time.Duration `env:"NATS_JWT_EXPIRY" envDefault:"2h"` // OIDC settings — required when DEV_MODE is false. - OIDCIssuerURL string `env:"OIDC_ISSUER_URL"` - OIDCAudience string `env:"OIDC_AUDIENCE"` - TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"` + OIDCIssuerURL string `env:"OIDC_ISSUER_URL"` + OIDCAudiences []string `env:"OIDC_AUDIENCES" envSeparator:","` + TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"` } func main() { @@ -56,14 +56,14 @@ func run() error { slog.Info("dev mode enabled — OIDC validation disabled") handler = NewAuthHandler(nil, signingKP, cfg.NATSJWTExpiry, true) } else { - if cfg.OIDCIssuerURL == "" || cfg.OIDCAudience == "" { - return fmt.Errorf("OIDC_ISSUER_URL and OIDC_AUDIENCE are required when DEV_MODE is false") + if cfg.OIDCIssuerURL == "" || len(cfg.OIDCAudiences) == 0 { + return fmt.Errorf("OIDC_ISSUER_URL and OIDC_AUDIENCES are required when DEV_MODE is false") } // Initialize OIDC validator — connects to issuer and fetches JWKS keys. oidcValidator, err := pkgoidc.NewValidator(ctx, pkgoidc.Config{ IssuerURL: cfg.OIDCIssuerURL, - Audience: cfg.OIDCAudience, + Audiences: cfg.OIDCAudiences, TLSSkipVerify: cfg.TLSSkipVerify, }) if err != nil { diff --git a/pkg/oidc/oidc.go b/pkg/oidc/oidc.go index 78302d6f6..39fe0daa1 100644 --- a/pkg/oidc/oidc.go +++ b/pkg/oidc/oidc.go @@ -5,7 +5,9 @@ import ( "crypto/tls" "errors" "fmt" + "log/slog" "net/http" + "slices" "time" "github.com/coreos/go-oidc/v3/oidc" @@ -25,13 +27,17 @@ type Claims struct { Extra map[string]interface{} } -// ErrTokenExpired is returned when the SSO token has passed its expiry time. -var ErrTokenExpired = fmt.Errorf("oidc: token has expired") +var ( + ErrTokenExpired = errors.New("oidc: token has expired") + ErrNoAudiences = errors.New("oidc: at least one allowed audience is required") + ErrAudienceNotAllowed = errors.New("oidc: token audience not in allowed list") +) // Config controls how the OIDC validator behaves. type Config struct { - IssuerURL string - Audience string + IssuerURL string + // A token is accepted when any of its `aud` claim entries appears here. + Audiences []string TLSSkipVerify bool } @@ -39,14 +45,17 @@ type Config struct { type Validator struct { verifier *oidc.IDTokenVerifier httpClient *http.Client - audience string + audiences []string } const issuerDiscoveryTimeout = 10 * time.Second // NewValidator connects to the OIDC issuer and fetches its JWKS keys. -// Fails fast if the issuer is unreachable. func NewValidator(ctx context.Context, cfg Config) (*Validator, error) { + if len(cfg.Audiences) == 0 { + return nil, ErrNoAudiences + } + var httpClient *http.Client if cfg.TLSSkipVerify { @@ -62,7 +71,6 @@ func NewValidator(ctx context.Context, cfg Config) (*Validator, error) { ctx = oidc.ClientContext(ctx, httpClient) } - // Ensure issuer discovery cannot hang indefinitely. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, issuerDiscoveryTimeout) @@ -74,22 +82,19 @@ func NewValidator(ctx context.Context, cfg Config) (*Validator, error) { return nil, fmt.Errorf("connect to oidc issuer %q: %w", cfg.IssuerURL, err) } - oidcConfig := &oidc.Config{ - ClientID: cfg.Audience, - } + // SkipClientIDCheck: we enforce a multi-audience allow-list ourselves below. + oidcConfig := &oidc.Config{SkipClientIDCheck: true} return &Validator{ verifier: provider.Verifier(oidcConfig), httpClient: httpClient, - audience: cfg.Audience, + audiences: cfg.Audiences, }, nil } // Validate verifies the raw OIDC token string and extracts user claims. -// Returns ErrTokenExpired if the token's exp claim is in the past — expiry -// is enforced by go-oidc's Verifier, we just translate its sentinel error. +// Returns ErrTokenExpired when go-oidc reports an expired exp claim. func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, error) { - // Re-attach the custom HTTP client so JWKS fetches also use TLSSkipVerify. if v.httpClient != nil { ctx = oidc.ClientContext(ctx, v.httpClient) } @@ -103,6 +108,11 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro return Claims{}, fmt.Errorf("oidc token verification failed: %w", err) } + if !containsAudience(idToken.Audience, v.audiences) { + slog.Warn("oidc audience mismatch", "token_aud", idToken.Audience, "allowed", v.audiences) + return Claims{}, ErrAudienceNotAllowed + } + var tokenClaims struct { Email string `json:"email"` Name string `json:"name"` @@ -118,7 +128,6 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro return Claims{}, fmt.Errorf("parse oidc token claims: %w", err) } - // Parse all claims into Extra for custom fields (roles, groups, etc.) var allClaims map[string]interface{} if err := idToken.Claims(&allClaims); err != nil { return Claims{}, fmt.Errorf("parse oidc extra claims: %w", err) @@ -145,3 +154,9 @@ func (v *Validator) Validate(ctx context.Context, rawToken string) (Claims, erro Extra: allClaims, }, nil } + +func containsAudience(tokenAud, allowed []string) bool { + return slices.ContainsFunc(tokenAud, func(t string) bool { + return slices.Contains(allowed, t) + }) +} diff --git a/pkg/oidc/oidc_test.go b/pkg/oidc/oidc_test.go new file mode 100644 index 000000000..a9ede0ee2 --- /dev/null +++ b/pkg/oidc/oidc_test.go @@ -0,0 +1,37 @@ +package oidc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContainsAudience(t *testing.T) { + cases := []struct { + name string + tokenAud []string + allowed []string + wantMatch bool + }{ + {"single token aud matches single allowed", []string{"a"}, []string{"a"}, true}, + {"token aud matches one of many allowed", []string{"b"}, []string{"a", "b", "c"}, true}, + {"one of many token auds matches allowed", []string{"x", "b"}, []string{"a", "b"}, true}, + {"no match", []string{"x"}, []string{"a", "b"}, false}, + {"empty token aud", nil, []string{"a"}, false}, + {"empty allowed", []string{"a"}, nil, false}, + {"both empty", nil, nil, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.wantMatch, containsAudience(tc.tokenAud, tc.allowed)) + }) + } +} + +func TestNewValidator_RejectsEmptyAudiences(t *testing.T) { + _, err := NewValidator(t.Context(), Config{ + IssuerURL: "http://example.invalid", + Audiences: nil, + }) + assert.ErrorIs(t, err, ErrNoAudiences) +}