diff --git a/inbox-worker/handler.go b/inbox-worker/handler.go index cd7d25c59..370edb1ae 100644 --- a/inbox-worker/handler.go +++ b/inbox-worker/handler.go @@ -78,8 +78,8 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) joinedAt := time.UnixMilli(event.JoinedAt).UTC() var historySharedSince *time.Time - if event.HistorySharedSince > 0 { - t := time.UnixMilli(event.HistorySharedSince).UTC() + if event.HistorySharedSince != nil && *event.HistorySharedSince > 0 { + t := time.UnixMilli(*event.HistorySharedSince).UTC() historySharedSince = &t } diff --git a/inbox-worker/handler_test.go b/inbox-worker/handler_test.go index d3413e747..b38f30104 100644 --- a/inbox-worker/handler_test.go +++ b/inbox-worker/handler_test.go @@ -166,13 +166,14 @@ func TestHandleEvent_MemberAdded(t *testing.T) { pub := &mockPublisher{} h := NewHandler(store, pub) + hssMillis := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli() change := model.MemberAddEvent{ Type: "member_added", RoomID: "room-1", Accounts: []string{"bob"}, SiteID: "site-b", JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), - HistorySharedSince: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + HistorySharedSince: &hssMillis, Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), } changeData, err := json.Marshal(change) @@ -240,6 +241,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) { joinedAt := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC) historyShared := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC) + hssMillis := historyShared.UnixMilli() change := model.MemberAddEvent{ Type: "member_added", @@ -247,7 +249,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) { Accounts: []string{"carol"}, SiteID: "site-b", JoinedAt: joinedAt.UnixMilli(), - HistorySharedSince: historyShared.UnixMilli(), + HistorySharedSince: &hssMillis, Timestamp: joinedAt.UnixMilli(), } changeData, _ := json.Marshal(change) @@ -467,13 +469,14 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) { pub := &mockPublisher{} h := NewHandler(store, pub) + hssMillis := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli() change := model.MemberAddEvent{ Type: "member_added", RoomID: "room-1", Accounts: []string{"account-bob"}, SiteID: "site-b", JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), - HistorySharedSince: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + HistorySharedSince: &hssMillis, Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), } changeData, err := json.Marshal(change) @@ -529,6 +532,7 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) { joinedAt := time.Date(2026, 4, 5, 10, 30, 0, 0, time.UTC) historyShared := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + hssMillis := historyShared.UnixMilli() change := model.MemberAddEvent{ Type: "member_added", @@ -536,7 +540,7 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) { Accounts: []string{"alice", "bob"}, SiteID: "site-b", JoinedAt: joinedAt.UnixMilli(), - HistorySharedSince: historyShared.UnixMilli(), + HistorySharedSince: &hssMillis, Timestamp: joinedAt.UnixMilli(), } changeData, _ := json.Marshal(change) @@ -609,13 +613,13 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) { h := NewHandler(store, pub) change := model.MemberAddEvent{ - Type: "member_added", - RoomID: "room-1", - Accounts: []string{"dave"}, - SiteID: "site-b", - JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), - HistorySharedSince: 0, // means "all history" - Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + Type: "member_added", + RoomID: "room-1", + Accounts: []string{"dave"}, + SiteID: "site-b", + JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + // HistorySharedSince nil → "all history" + Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), } changeData, _ := json.Marshal(change) diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index c449b998d..c33ac06c4 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -71,10 +71,11 @@ func TestInboxWorker_MemberAdded_Integration(t *testing.T) { } // Create outbox event for member_added + hssMillis := time.Now().UTC().UnixMilli() change := model.MemberAddEvent{ Type: "member_added", RoomID: "r1", Accounts: []string{"u2"}, SiteID: "site-b", JoinedAt: time.Now().UTC().UnixMilli(), - HistorySharedSince: time.Now().UTC().UnixMilli(), + HistorySharedSince: &hssMillis, Timestamp: time.Now().UTC().UnixMilli(), } changeData, _ := json.Marshal(change) diff --git a/message-worker/store_cassandra.go b/message-worker/store_cassandra.go index c9eb0b066..77b28b460 100644 --- a/message-worker/store_cassandra.go +++ b/message-worker/store_cassandra.go @@ -62,17 +62,17 @@ func NewCassandraStore(session *gocql.Session) *CassandraStore { // If either insert fails the error is returned immediately; JetStream will redeliver the message. func (s *CassandraStore) SaveMessage(ctx context.Context, msg *model.Message, sender *cassParticipant, siteID string) error { if err := s.cassSession.Query( - `INSERT INTO messages_by_room (room_id, created_at, message_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - msg.RoomID, msg.CreatedAt, msg.ID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, + `INSERT INTO messages_by_room (room_id, created_at, message_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data, tshow) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + msg.RoomID, msg.CreatedAt, msg.ID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, msg.TShow, ).WithContext(ctx).Exec(); err != nil { return fmt.Errorf("insert messages_by_room %s: %w", msg.ID, err) } if err := s.cassSession.Query( - `INSERT INTO messages_by_id (message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, + `INSERT INTO messages_by_id (message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data, tshow) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, msg.TShow, ).WithContext(ctx).Exec(); err != nil { return fmt.Errorf("insert messages_by_id %s: %w", msg.ID, err) } @@ -89,10 +89,10 @@ func (s *CassandraStore) SaveThreadMessage(ctx context.Context, msg *model.Messa if err := s.cassSession.Query( `INSERT INTO messages_by_id (message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions, - thread_room_id, thread_parent_id, thread_parent_created_at, type, sys_msg_data) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + thread_room_id, thread_parent_id, thread_parent_created_at, type, sys_msg_data, tshow) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), - threadRoomID, msg.ThreadParentMessageID, msg.ThreadParentMessageCreatedAt, msg.Type, msg.SysMsgData, + threadRoomID, msg.ThreadParentMessageID, msg.ThreadParentMessageCreatedAt, msg.Type, msg.SysMsgData, msg.TShow, ).WithContext(ctx).Exec(); err != nil { return fmt.Errorf("insert messages_by_id %s: %w", msg.ID, err) } diff --git a/pkg/model/event.go b/pkg/model/event.go index 9d7fd2767..861f01c56 100644 --- a/pkg/model/event.go +++ b/pkg/model/event.go @@ -44,18 +44,20 @@ type UpdateRoleRequest struct { // search-sync-worker. One event represents a bulk add/remove of N Accounts // against a single room; downstream consumers fan out per-account. // -// HistorySharedSince is a single event-level flag shared by all accounts in -// the bulk: when non-zero, the room is history-restricted and consumers MUST -// skip indexing the entire event (the search service handles restricted rooms -// via DB+cache at query time). JoinedAt is only meaningful on add events and -// omitted on removes. +// HistorySharedSince == nil means the entire bulk is unrestricted; non-nil +// means all Accounts in the bulk are restricted from that timestamp. The +// user-room collection routes these into restrictedRooms{}; the spotlight +// collection skips non-nil events entirely for MVP. Publishers MUST emit nil +// for unrestricted rooms — never &0 and never a non-positive timestamp — so +// the Go↔painless boundary sentinel (hss <= 0 → unrestricted) stays sound. +// JoinedAt is only meaningful on add events and omitted on removes. type InboxMemberEvent struct { RoomID string `json:"roomId"` RoomName string `json:"roomName"` RoomType RoomType `json:"roomType"` SiteID string `json:"siteId"` Accounts []string `json:"accounts"` - HistorySharedSince int64 `json:"historySharedSince,omitempty"` + HistorySharedSince *int64 `json:"historySharedSince,omitempty"` JoinedAt int64 `json:"joinedAt,omitempty"` Timestamp int64 `json:"timestamp" bson:"timestamp"` } @@ -99,7 +101,7 @@ type MemberAddEvent struct { Accounts []string `json:"accounts" bson:"accounts"` SiteID string `json:"siteId" bson:"siteId"` JoinedAt int64 `json:"joinedAt" bson:"joinedAt"` - HistorySharedSince int64 `json:"historySharedSince" bson:"historySharedSince"` + HistorySharedSince *int64 `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"` Timestamp int64 `json:"timestamp" bson:"timestamp"` } diff --git a/pkg/model/message.go b/pkg/model/message.go index fcca753c1..e331dade6 100644 --- a/pkg/model/message.go +++ b/pkg/model/message.go @@ -12,6 +12,7 @@ type Message struct { CreatedAt time.Time `json:"createdAt" bson:"createdAt"` ThreadParentMessageID string `json:"threadParentMessageId,omitempty" bson:"threadParentMessageId,omitempty"` ThreadParentMessageCreatedAt *time.Time `json:"threadParentMessageCreatedAt,omitempty" bson:"threadParentMessageCreatedAt,omitempty"` + TShow bool `json:"tshow,omitempty" bson:"tshow,omitempty"` Type string `json:"type,omitempty" bson:"type,omitempty"` SysMsgData []byte `json:"sysMsgData,omitempty" bson:"sysMsgData,omitempty"` } diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index f39368532..582e95315 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -187,6 +187,39 @@ func TestMessageJSON(t *testing.T) { _, hasSysMsgData := raw["sysMsgData"] assert.False(t, hasSysMsgData, "sysMsgData should be omitted when nil") }) + + t.Run("tshow round-trips when true", func(t *testing.T) { + m := model.Message{ + ID: "m1", RoomID: "r1", UserID: "u1", UserAccount: "alice", + Content: "thread reply shown in main feed", + CreatedAt: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC), + TShow: true, + } + data, err := json.Marshal(&m) + require.NoError(t, err) + + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + assert.Equal(t, true, raw["tshow"]) + + var dst model.Message + require.NoError(t, json.Unmarshal(data, &dst)) + assert.True(t, dst.TShow) + }) + + t.Run("tshow omitted on the wire when false", func(t *testing.T) { + m := model.Message{ + ID: "m1", RoomID: "r1", UserID: "u1", UserAccount: "alice", + Content: "plain message", + CreatedAt: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC), + } + data, err := json.Marshal(&m) + require.NoError(t, err) + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + _, present := raw["tshow"] + assert.False(t, present, "tshow should be omitted when false") + }) } func TestSendMessageRequestJSON(t *testing.T) { @@ -619,24 +652,30 @@ func TestInboxMemberEventJSON(t *testing.T) { }) t.Run("add event, restricted room carries HistorySharedSince", func(t *testing.T) { + hss := int64(1735689500000) src := model.InboxMemberEvent{ RoomID: "r1", RoomName: "engineering", RoomType: model.RoomTypeGroup, SiteID: "site-a", Accounts: []string{"alice"}, - HistorySharedSince: 1735689500000, + HistorySharedSince: &hss, JoinedAt: 1735689600000, Timestamp: 1735689600000, } data, err := json.Marshal(&src) require.NoError(t, err) + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + assert.EqualValues(t, hss, raw["historySharedSince"]) + var dst model.InboxMemberEvent require.NoError(t, json.Unmarshal(data, &dst)) - assert.Equal(t, src, dst) + require.NotNil(t, dst.HistorySharedSince) + assert.Equal(t, hss, *dst.HistorySharedSince) }) - t.Run("remove event omits HistorySharedSince and JoinedAt when zero", func(t *testing.T) { + t.Run("remove event omits HistorySharedSince and JoinedAt when nil/zero", func(t *testing.T) { src := model.InboxMemberEvent{ RoomID: "r1", RoomName: "engineering", @@ -650,9 +689,13 @@ func TestInboxMemberEventJSON(t *testing.T) { var raw map[string]any require.NoError(t, json.Unmarshal(data, &raw)) _, hasHSS := raw["historySharedSince"] - assert.False(t, hasHSS, "historySharedSince should be omitted when zero") + assert.False(t, hasHSS, "historySharedSince should be omitted when nil") _, hasJoinedAt := raw["joinedAt"] assert.False(t, hasJoinedAt, "joinedAt should be omitted when zero") + + var dst model.InboxMemberEvent + require.NoError(t, json.Unmarshal(data, &dst)) + assert.Nil(t, dst.HistorySharedSince, "unrestricted event must decode HistorySharedSince as nil") }) } @@ -879,20 +922,51 @@ func TestMembersAddedJSON(t *testing.T) { } func TestMemberAddEventJSON(t *testing.T) { - src := model.MemberAddEvent{ - Type: "member_added", - RoomID: "r1", - Accounts: []string{"alice", "bob"}, - SiteID: "site-a", - JoinedAt: 1735689600000, - HistorySharedSince: 1735689600000, - Timestamp: 1735689600000, - } - data, err := json.Marshal(src) - require.NoError(t, err) - var dst model.MemberAddEvent - require.NoError(t, json.Unmarshal(data, &dst)) - assert.Equal(t, src, dst) + t.Run("restricted room round-trips HistorySharedSince pointer", func(t *testing.T) { + hss := int64(1735689600000) + src := model.MemberAddEvent{ + Type: "member_added", + RoomID: "r1", + Accounts: []string{"alice", "bob"}, + SiteID: "site-a", + JoinedAt: 1735689600000, + HistorySharedSince: &hss, + Timestamp: 1735689600000, + } + data, err := json.Marshal(src) + require.NoError(t, err) + + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + assert.EqualValues(t, hss, raw["historySharedSince"]) + + var dst model.MemberAddEvent + require.NoError(t, json.Unmarshal(data, &dst)) + require.NotNil(t, dst.HistorySharedSince) + assert.Equal(t, hss, *dst.HistorySharedSince) + }) + + t.Run("unrestricted room omits historySharedSince on the wire", func(t *testing.T) { + src := model.MemberAddEvent{ + Type: "member_added", + RoomID: "r1", + Accounts: []string{"alice"}, + SiteID: "site-a", + JoinedAt: 1735689600000, + Timestamp: 1735689600000, + } + data, err := json.Marshal(src) + require.NoError(t, err) + + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + _, hasHSS := raw["historySharedSince"] + assert.False(t, hasHSS, "historySharedSince must be omitted when nil") + + var dst model.MemberAddEvent + require.NoError(t, json.Unmarshal(data, &dst)) + assert.Nil(t, dst.HistorySharedSince) + }) } func TestListRoomMembersRequestJSON(t *testing.T) { diff --git a/room-worker/handler.go b/room-worker/handler.go index bc7cb63cd..c4be0eedd 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -620,9 +620,19 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { } // 8. Publish MemberAddEvent (actualAccounts was built above alongside subs) - var historySharedSince int64 + // Never emit &0 — the painless sentinel `hss <= 0` would misroute the + // restricted room into `rooms[]`. If the upstream request is malformed + // (restricted mode but missing timestamp), leave the pointer nil + log — + // we can't silently translate to &0. + var historySharedSincePtr *int64 if req.History.Mode == model.HistoryModeNone { - historySharedSince = req.Timestamp + if req.Timestamp > 0 { + v := req.Timestamp + historySharedSincePtr = &v + } else { + slog.Error("restricted history with missing timestamp, emitting nil", + "roomID", req.RoomID, "mode", req.History.Mode) + } } memberAddEvt := model.MemberAddEvent{ Type: "member_added", @@ -630,7 +640,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { Accounts: actualAccounts, SiteID: room.SiteID, JoinedAt: req.Timestamp, - HistorySharedSince: historySharedSince, + HistorySharedSince: historySharedSincePtr, Timestamp: now.UnixMilli(), } memberAddData, _ := json.Marshal(memberAddEvt) @@ -681,7 +691,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { Accounts: accounts, SiteID: room.SiteID, JoinedAt: req.Timestamp, - HistorySharedSince: historySharedSince, + HistorySharedSince: historySharedSincePtr, Timestamp: now.UnixMilli(), } siteEvtData, _ := json.Marshal(siteEvt) diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index 0f72558d1..9a8fb2379 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -688,6 +688,120 @@ func TestHandler_ProcessAddMembers_HistoryAll(t *testing.T) { require.NoError(t, err) } +// findMemberAddEvent returns the decoded MemberAddEvent published locally on +// RoomMemberEvent(roomID). Fails the test if no such publish occurred. +func findMemberAddEvent(t *testing.T, published []publishedMsg, roomID string) (model.MemberAddEvent, []byte) { + t.Helper() + want := subject.RoomMemberEvent(roomID) + for _, p := range published { + if p.subj != want { + continue + } + var evt model.MemberAddEvent + require.NoError(t, json.Unmarshal(p.data, &evt)) + return evt, p.data + } + t.Fatalf("no MemberAddEvent published to %s (got %d messages)", want, len(published)) + return model.MemberAddEvent{}, nil +} + +// TestHandler_ProcessAddMembers_RestrictedPropagatesPointer verifies that a +// restricted room (HistoryModeNone) emits a MemberAddEvent whose +// HistorySharedSince is a non-nil pointer equal to the request timestamp, +// both for the same-site RoomMemberEvent publish and for the batched +// cross-site outbox event. +func TestHandler_ProcessAddMembers_RestrictedPropagatesPointer(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + var published []publishedMsg + publish := func(_ context.Context, subj string, data []byte) error { + published = append(published, publishedMsg{subj: subj, data: data}) + return nil + } + h := NewHandler(store, "site-a", publish) + + store.EXPECT().GetRoom(gomock.Any(), "r1").Return(&model.Room{ID: "r1", SiteID: "site-a"}, nil) + store.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"bob", "charlie"}).Return([]model.User{ + {ID: "u2", Account: "bob", SiteID: "site-a"}, + {ID: "u3", Account: "charlie", SiteID: "site-b"}, + }, nil) + store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + store.EXPECT().IncrementUserCount(gomock.Any(), "r1", 2).Return(nil) + store.EXPECT().HasOrgRoomMembers(gomock.Any(), "r1").Return(false, nil) + + const reqTS int64 = 1744300000000 + req := model.AddMembersRequest{ + RoomID: "r1", Users: []string{"bob", "charlie"}, + History: model.HistoryConfig{Mode: model.HistoryModeNone}, + Timestamp: reqTS, + } + reqData, _ := json.Marshal(req) + require.NoError(t, h.processAddMembers(context.Background(), reqData)) + + // Local RoomMemberEvent: HSS must be a non-nil pointer equal to request ts. + memberAddEvt, _ := findMemberAddEvent(t, published, "r1") + require.NotNil(t, memberAddEvt.HistorySharedSince, + "restricted room must publish non-nil HistorySharedSince") + assert.Equal(t, reqTS, *memberAddEvt.HistorySharedSince) + + // Batched outbox to site-b: same HSS pointer on the payload. + var foundOutbox bool + for _, p := range published { + if !strings.Contains(p.subj, "outbox") { + continue + } + foundOutbox = true + var outboxEvt model.OutboxEvent + require.NoError(t, json.Unmarshal(p.data, &outboxEvt)) + var change model.MemberAddEvent + require.NoError(t, json.Unmarshal(outboxEvt.Payload, &change)) + require.NotNil(t, change.HistorySharedSince, + "outbox restricted payload must carry HistorySharedSince") + assert.Equal(t, reqTS, *change.HistorySharedSince) + } + assert.True(t, foundOutbox, "expected a batched outbox publish for site-b") +} + +// TestHandler_ProcessAddMembers_UnrestrictedOmitsFieldFromWire verifies that +// an unrestricted room (HistoryModeAll) produces a MemberAddEvent whose JSON +// wire form does NOT contain the "historySharedSince" key. This is the +// documented publisher contract. +func TestHandler_ProcessAddMembers_UnrestrictedOmitsFieldFromWire(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockSubscriptionStore(ctrl) + + var published []publishedMsg + publish := func(_ context.Context, subj string, data []byte) error { + published = append(published, publishedMsg{subj: subj, data: data}) + return nil + } + h := NewHandler(store, "site-a", publish) + + store.EXPECT().GetRoom(gomock.Any(), "r1").Return(&model.Room{ID: "r1", SiteID: "site-a"}, nil) + store.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"bob"}).Return([]model.User{ + {ID: "u2", Account: "bob", SiteID: "site-a"}, + }, nil) + store.EXPECT().BulkCreateSubscriptions(gomock.Any(), gomock.Any()).Return(nil) + store.EXPECT().IncrementUserCount(gomock.Any(), "r1", 1).Return(nil) + store.EXPECT().HasOrgRoomMembers(gomock.Any(), "r1").Return(false, nil) + + req := model.AddMembersRequest{ + RoomID: "r1", Users: []string{"bob"}, + History: model.HistoryConfig{Mode: model.HistoryModeAll}, + } + reqData, _ := json.Marshal(req) + require.NoError(t, h.processAddMembers(context.Background(), reqData)) + + evt, raw := findMemberAddEvent(t, published, "r1") + assert.Nil(t, evt.HistorySharedSince, "unrestricted event must decode HSS as nil") + + var rawMap map[string]any + require.NoError(t, json.Unmarshal(raw, &rawMap)) + _, present := rawMap["historySharedSince"] + assert.False(t, present, "unrestricted event must omit historySharedSince on the wire") +} + func TestHandler_ProcessAddMembers_WithOrgs(t *testing.T) { ctrl := gomock.NewController(t) store := NewMockSubscriptionStore(ctrl) diff --git a/search-sync-worker/inbox_integration_test.go b/search-sync-worker/inbox_integration_test.go index 663d3056f..5f7ef1b6e 100644 --- a/search-sync-worker/inbox_integration_test.go +++ b/search-sync-worker/inbox_integration_test.go @@ -36,13 +36,13 @@ func createInboxStream(t *testing.T, ctx context.Context, js jetstream.JetStream } // buildInboxMemberEvent constructs an InboxMemberEvent payload for tests. -// `historySharedSince` of 0 means unrestricted; any non-zero value marks the -// entire bulk as restricted and search-sync should skip the whole event. +// `historySharedSince` is nil for unrestricted; non-nil marks the bulk as +// restricted — user-room routes it into restrictedRooms{}, spotlight skips. // `joinedAt` is only meaningful on add events; pass 0 for removes. func buildInboxMemberEvent( roomID, roomName, siteID string, accounts []string, - historySharedSince int64, + historySharedSince *int64, joinedAt int64, timestamp int64, ) model.InboxMemberEvent { @@ -58,6 +58,10 @@ func buildInboxMemberEvent( } } +func int64Ptr(v int64) *int64 { + return &v +} + // publishInboxMemberEvent wraps an InboxMemberEvent inside an OutboxEvent // with the given Type and publishes it to `subj`. Caller picks `subj` // (local vs. aggregate, added vs. removed) via pkg/subject builders. @@ -175,25 +179,25 @@ func TestSpotlightSync_Integration(t *testing.T) { // Local member_added: alice joins engineering publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r-eng", "engineering", siteID, []string{"alice"}, 0, joinedAt, 1000), + buildInboxMemberEvent("r-eng", "engineering", siteID, []string{"alice"}, nil, joinedAt, 1000), ) // Local member_added: alice joins platform publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r-platform", "platform", siteID, []string{"alice"}, 0, joinedAt, 1100), + buildInboxMemberEvent("r-platform", "platform", siteID, []string{"alice"}, nil, joinedAt, 1100), ) // Federated (aggregate) member_added: bob joins engineering via a cross-site event publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAddedAggregate(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r-eng", "engineering", siteID, []string{"bob"}, 0, joinedAt, 1200), + buildInboxMemberEvent("r-eng", "engineering", siteID, []string{"bob"}, nil, joinedAt, 1200), ) // Federated (aggregate) member_removed: alice leaves platform publishInboxMemberEvent(t, ctx, js, subject.InboxMemberRemovedAggregate(siteID), model.OutboxMemberRemoved, - buildInboxMemberEvent("r-platform", "platform", siteID, []string{"alice"}, 0, 0, 1300), + buildInboxMemberEvent("r-platform", "platform", siteID, []string{"alice"}, nil, 0, 1300), ) drainConsumer(t, ctx, cons, handler, 4) @@ -263,7 +267,7 @@ func TestSpotlightSync_BulkInvite(t *testing.T) { // One bulk-invite event adds 3 users to r-platform at once. payload := buildInboxMemberEvent("r-platform", "platform", siteID, - []string{"dave", "erin", "frank"}, 0, joinedAt, 5000) + []string{"dave", "erin", "frank"}, nil, joinedAt, 5000) publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, payload) @@ -292,7 +296,7 @@ func TestSpotlightSync_BulkInvite(t *testing.T) { t.Run("bulk remove evicts all three docs", func(t *testing.T) { // Same 3 accounts, now removed in one event. remove := buildInboxMemberEvent("r-platform", "platform", siteID, - []string{"dave", "erin", "frank"}, 0, 0, 6000) + []string{"dave", "erin", "frank"}, nil, 0, 6000) publishInboxMemberEvent(t, ctx, js, subject.InboxMemberRemoved(siteID), model.OutboxMemberRemoved, remove) drainConsumer(t, ctx, cons, handler, 1) @@ -339,42 +343,42 @@ func TestUserRoomSync_Integration(t *testing.T) { // alice joins 3 rooms via local events publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r1", "general", siteID, []string{"alice"}, 0, joinedAt, 1000)) + buildInboxMemberEvent("r1", "general", siteID, []string{"alice"}, nil, joinedAt, 1000)) publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r2", "random", siteID, []string{"alice"}, 0, joinedAt, 1100)) + buildInboxMemberEvent("r2", "random", siteID, []string{"alice"}, nil, joinedAt, 1100)) publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r3", "eng", siteID, []string{"alice"}, 0, joinedAt, 1200)) + buildInboxMemberEvent("r3", "eng", siteID, []string{"alice"}, nil, joinedAt, 1200)) // bob joins r1 via a federated (aggregate) event publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAddedAggregate(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r1", "general", siteID, []string{"bob"}, 0, joinedAt, 1300)) + buildInboxMemberEvent("r1", "general", siteID, []string{"bob"}, nil, joinedAt, 1300)) // alice leaves r2 via a local event publishInboxMemberEvent(t, ctx, js, subject.InboxMemberRemoved(siteID), model.OutboxMemberRemoved, - buildInboxMemberEvent("r2", "random", siteID, []string{"alice"}, 0, 0, 1400)) + buildInboxMemberEvent("r2", "random", siteID, []string{"alice"}, nil, 0, 1400)) - // alice joins a restricted room → whole event should be SKIPPED by user- - // room-sync (event-level HistorySharedSince != 0). The message still - // arrives at the consumer; the handler acks it without buffering. + // alice joins a restricted room. user-room now stores it in + // `restrictedRooms{}` instead of skipping. + const restrictedHSS int64 = 1743984000000 publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, - buildInboxMemberEvent("r-restricted", "archives", siteID, []string{"alice"}, 1743984000000, joinedAt, 1500)) + buildInboxMemberEvent("r-restricted", "archives", siteID, []string{"alice"}, int64Ptr(restrictedHSS), joinedAt, 1500)) drainConsumer(t, ctx, cons, handler, 6) refreshIndex(t, esURL, indexName) // --- Verify --- - t.Run("alice rooms reflect adds and one remove", func(t *testing.T) { + t.Run("alice rooms reflect unrestricted adds and one remove", func(t *testing.T) { doc := getDoc(t, esURL, indexName, "alice") require.NotNil(t, doc, "alice user-room doc should exist") rooms := toStringSlice(t, doc["rooms"]) assert.ElementsMatch(t, []string{"r1", "r3"}, rooms, - "alice should be in r1, r3 (r2 removed, r-restricted skipped)") + "alice rooms[] should hold unrestricted adds minus the remove") }) t.Run("bob created via federated event", func(t *testing.T) { @@ -384,7 +388,7 @@ func TestUserRoomSync_Integration(t *testing.T) { assert.ElementsMatch(t, []string{"r1"}, rooms) }) - t.Run("roomTimestamps retained after remove", func(t *testing.T) { + t.Run("roomTimestamps retained after remove and for restricted", func(t *testing.T) { doc := getDoc(t, esURL, indexName, "alice") require.NotNil(t, doc) rts, ok := doc["roomTimestamps"].(map[string]any) @@ -398,16 +402,21 @@ func TestUserRoomSync_Integration(t *testing.T) { "r2 timestamp should be bumped to the remove's event timestamp, not deleted") assert.Equal(t, float64(1200), rts["r3"]) - // Restricted room was never applied → no timestamp entry. - assert.NotContains(t, rts, "r-restricted") + // Restricted-room adds also stamp roomTimestamps so LWW guards both + // paths uniformly. + assert.Equal(t, float64(1500), rts["r-restricted"]) }) - t.Run("restricted room not present in rooms array", func(t *testing.T) { + t.Run("restricted room lands in restrictedRooms map", func(t *testing.T) { doc := getDoc(t, esURL, indexName, "alice") require.NotNil(t, doc) rooms := toStringSlice(t, doc["rooms"]) assert.NotContains(t, rooms, "r-restricted", - "restricted rooms are skipped by user-room-sync") + "restricted rooms must NOT appear in rooms[]") + restricted, ok := doc["restrictedRooms"].(map[string]any) + require.True(t, ok, "restrictedRooms should be a flattened map") + assert.EqualValues(t, restrictedHSS, restricted["r-restricted"], + "restrictedRooms[r-restricted] should equal the event HSS") }) t.Run("createdAt and updatedAt stamped from upsert", func(t *testing.T) { @@ -455,16 +464,15 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, buildInboxMemberEvent("r-platform", "platform", siteID, - []string{"dave", "erin", "frank"}, 0, joinedAt, 5000)) + []string{"dave", "erin", "frank"}, nil, joinedAt, 5000)) - // Restricted bulk: 3 users to r-archives with non-zero HistorySharedSince - // → whole event skipped, no docs written. The message still arrives at - // the consumer (it matches the FilterSubjects) so drainConsumer below - // counts it. + // Restricted bulk: 3 users to r-archives with non-nil HistorySharedSince. + // User-room now routes these into `restrictedRooms{}` instead of skipping. + const archivesHSS int64 = 1743984000000 publishInboxMemberEvent(t, ctx, js, subject.InboxMemberAdded(siteID), model.OutboxMemberAdded, buildInboxMemberEvent("r-archives", "archives", siteID, - []string{"heidi", "ivan", "judy"}, 1743984000000, joinedAt, 5100)) + []string{"heidi", "ivan", "judy"}, int64Ptr(archivesHSS), joinedAt, 5100)) drainConsumer(t, ctx, cons, handler, 2) refreshIndex(t, esURL, indexName) @@ -475,13 +483,21 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { require.NotNil(t, doc, "%s should be upserted", account) rooms := toStringSlice(t, doc["rooms"]) assert.ElementsMatch(t, []string{"r-platform"}, rooms) + assert.Empty(t, doc["restrictedRooms"], + "%s must not have any restricted rooms", account) } }) - t.Run("restricted bulk writes no docs", func(t *testing.T) { + t.Run("restricted bulk routes all three users into restrictedRooms", func(t *testing.T) { for _, account := range []string{"heidi", "ivan", "judy"} { - assert.Nil(t, getDoc(t, esURL, indexName, account), - "restricted %s must not be upserted", account) + doc := getDoc(t, esURL, indexName, account) + require.NotNil(t, doc, "restricted %s should still be upserted", account) + assert.Empty(t, toStringSlice(t, doc["rooms"]), + "%s rooms[] must not contain the restricted room", account) + restricted, ok := doc["restrictedRooms"].(map[string]any) + require.True(t, ok, "%s must have restrictedRooms map", account) + assert.EqualValues(t, archivesHSS, restricted["r-archives"], + "%s restrictedRooms[r-archives] should equal event HSS", account) } }) @@ -489,7 +505,7 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { publishInboxMemberEvent(t, ctx, js, subject.InboxMemberRemoved(siteID), model.OutboxMemberRemoved, buildInboxMemberEvent("r-platform", "platform", siteID, - []string{"dave", "erin", "frank"}, 0, 0, 6000)) + []string{"dave", "erin", "frank"}, nil, 0, 6000)) drainConsumer(t, ctx, cons, handler, 1) refreshIndex(t, esURL, indexName) @@ -500,6 +516,24 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { "%s rooms should be empty after bulk remove", account) } }) + + t.Run("bulk remove evicts rooms from restricted users", func(t *testing.T) { + publishInboxMemberEvent(t, ctx, js, + subject.InboxMemberRemoved(siteID), model.OutboxMemberRemoved, + buildInboxMemberEvent("r-archives", "archives", siteID, + []string{"heidi", "ivan", "judy"}, int64Ptr(archivesHSS), 0, 6100)) + drainConsumer(t, ctx, cons, handler, 1) + refreshIndex(t, esURL, indexName) + + for _, account := range []string{"heidi", "ivan", "judy"} { + doc := getDoc(t, esURL, indexName, account) + require.NotNil(t, doc, "%s user doc should still exist (ghost)", account) + restricted, _ := doc["restrictedRooms"].(map[string]any) + _, stillHas := restricted["r-archives"] + assert.False(t, stillHas, + "%s restrictedRooms[r-archives] should be evicted after remove", account) + } + }) } // --- User-room LWW guard integration test --- @@ -547,7 +581,7 @@ func TestUserRoomSync_LWWGuard(t *testing.T) { j = joinedAt } publishInboxMemberEvent(t, ctx, js, subj, eventType, - buildInboxMemberEvent(roomID, "room "+roomID, siteID, []string{"charlie"}, 0, j, ts)) + buildInboxMemberEvent(roomID, "room "+roomID, siteID, []string{"charlie"}, nil, j, ts)) } getCharlieState := func() ([]string, map[string]any) { diff --git a/search-sync-worker/inbox_stream.go b/search-sync-worker/inbox_stream.go index 528a0e3cf..0ccf13981 100644 --- a/search-sync-worker/inbox_stream.go +++ b/search-sync-worker/inbox_stream.go @@ -81,9 +81,9 @@ func (b *inboxMemberCollection) FilterSubjects(siteID string) []string { // InboxMemberEvent payload and validates the common preconditions shared by // all inbox-member collections. // -// Callers are responsible for the event-level restricted-room short-circuit: -// when payload.HistorySharedSince != 0 the entire event should be skipped -// (the search service handles restricted rooms via DB+cache at query time). +// Callers decide how to handle the event-level restricted-room flag: +// - spotlight keeps the MVP skip on `payload.HistorySharedSince != nil` +// - user-room routes the event into `restrictedRooms{}` on the doc func parseMemberEvent(data []byte) (*model.OutboxEvent, *model.InboxMemberEvent, error) { var evt model.OutboxEvent if err := json.Unmarshal(data, &evt); err != nil { diff --git a/search-sync-worker/messages.go b/search-sync-worker/messages.go index aada5e1f7..600a7b938 100644 --- a/search-sync-worker/messages.go +++ b/search-sync-worker/messages.go @@ -83,6 +83,7 @@ type MessageSearchIndex struct { CreatedAt time.Time `json:"createdAt" es:"date"` ThreadParentID string `json:"threadParentMessageId,omitempty" es:"keyword"` ThreadParentCreatedAt *time.Time `json:"threadParentMessageCreatedAt,omitempty" es:"date"` + TShow bool `json:"tshow,omitempty" es:"boolean"` } // newMessageSearchIndex maps a MessageEvent to a search index document. @@ -97,6 +98,7 @@ func newMessageSearchIndex(evt *model.MessageEvent) MessageSearchIndex { CreatedAt: evt.Message.CreatedAt, ThreadParentID: evt.Message.ThreadParentMessageID, ThreadParentCreatedAt: evt.Message.ThreadParentMessageCreatedAt, + TShow: evt.Message.TShow, } } diff --git a/search-sync-worker/messages_test.go b/search-sync-worker/messages_test.go index b683b2f0e..9196132a3 100644 --- a/search-sync-worker/messages_test.go +++ b/search-sync-worker/messages_test.go @@ -41,6 +41,8 @@ func TestMessageCollection_TemplateBody(t *testing.T) { assert.Contains(t, props, "userAccount") assert.Contains(t, props, "content") assert.Contains(t, props, "createdAt") + assert.Contains(t, props, "tshow") + assert.Equal(t, "boolean", props["tshow"].(map[string]any)["type"]) assert.Equal(t, false, mappings["dynamic"]) settings := tmpl["settings"].(map[string]any) @@ -191,6 +193,7 @@ func TestNewMessageSearchIndex(t *testing.T) { Content: "hello", CreatedAt: ts, ThreadParentMessageID: "parent-1", ThreadParentMessageCreatedAt: &parentTS, + TShow: true, }, SiteID: "site-a", } @@ -205,6 +208,30 @@ func TestNewMessageSearchIndex(t *testing.T) { assert.Equal(t, "parent-1", doc.ThreadParentID) require.NotNil(t, doc.ThreadParentCreatedAt) assert.Equal(t, parentTS, *doc.ThreadParentCreatedAt) + assert.True(t, doc.TShow) +} + +// TestNewMessageSearchIndex_TShowOmittedWhenFalse verifies that a message with +// the default TShow (false) marshals without a `tshow` key so unmarked thread +// replies don't bloat the index and so range/term queries on `tshow` only +// match explicitly-flagged docs. +func TestNewMessageSearchIndex_TShowOmittedWhenFalse(t *testing.T) { + evt := &model.MessageEvent{ + Message: model.Message{ + ID: "msg-1", RoomID: "r1", UserID: "u1", UserAccount: "alice", + Content: "hello", CreatedAt: time.Date(2026, 1, 15, 10, 30, 0, 0, time.UTC), + }, + SiteID: "site-a", + } + doc := newMessageSearchIndex(evt) + assert.False(t, doc.TShow) + + data, err := json.Marshal(doc) + require.NoError(t, err) + var raw map[string]any + require.NoError(t, json.Unmarshal(data, &raw)) + _, present := raw["tshow"] + assert.False(t, present, "tshow should be omitted when false") } func TestMessageCollection_BuildAction(t *testing.T) { diff --git a/search-sync-worker/spotlight.go b/search-sync-worker/spotlight.go index 7e0f8509f..f0d0ec20d 100644 --- a/search-sync-worker/spotlight.go +++ b/search-sync-worker/spotlight.go @@ -44,9 +44,10 @@ func (c *spotlightCollection) TemplateBody() json.RawMessage { // event is redelivered, every action 409s uniformly and is treated as a // successful idempotent replay. // -// Restricted rooms (HistorySharedSince != 0 on the event) short-circuit the -// entire bulk and return an empty slice — the search service handles -// restricted rooms via DB+cache at query time, so nothing needs to be indexed. +// Restricted rooms (HistorySharedSince != nil on the event) short-circuit the +// entire bulk and return an empty slice. The spotlight index keeps the MVP +// skip for restricted rooms; user-room (not spotlight) stores restricted-room +// membership on the per-user ES doc via `restrictedRooms{}`. func (c *spotlightCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { evt, payload, err := parseMemberEvent(data) if err != nil { @@ -55,11 +56,10 @@ func (c *spotlightCollection) BuildAction(data []byte) ([]searchengine.BulkActio if payload.RoomID == "" { return nil, fmt.Errorf("build spotlight action: missing roomId") } - // Event-level restricted-room short-circuit runs BEFORE account - // validation so restricted events with any payload shape (including an - // empty Accounts slice) are uniformly skipped per the InboxMemberEvent - // contract — no surprise error on an otherwise-valid "skip me" event. - if payload.HistorySharedSince != 0 { + // Spotlight skips restricted rooms (MVP); user-room stores them. + // Mirror user_room.go's `hss > 0` sentinel so a leaked &0 is treated as + // unrestricted by both indices. + if payload.HistorySharedSince != nil && *payload.HistorySharedSince > 0 { return nil, nil } if len(payload.Accounts) == 0 { diff --git a/search-sync-worker/spotlight_test.go b/search-sync-worker/spotlight_test.go index a567c3085..ea7e2f955 100644 --- a/search-sync-worker/spotlight_test.go +++ b/search-sync-worker/spotlight_test.go @@ -163,8 +163,9 @@ func TestSpotlightCollection_BuildAction_RestrictedRoomSkipped(t *testing.T) { payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob"} // Event-level HistorySharedSince short-circuits the entire bulk — - // restricted rooms are handled by the search service via DB+cache. - payload.HistorySharedSince = 1735689500000 + // spotlight keeps MVP skip for restricted rooms; user-room stores them. + hss := int64(1735689500000) + payload.HistorySharedSince = &hss data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, 100) diff --git a/search-sync-worker/user_room.go b/search-sync-worker/user_room.go index 0c02c58cf..aa16da4e5 100644 --- a/search-sync-worker/user_room.go +++ b/search-sync-worker/user_room.go @@ -48,13 +48,33 @@ func (c *userRoomCollection) TemplateBody() json.RawMessage { // on (user, room) using `params.ts` (OutboxEvent.Timestamp in millis). Stale // events short-circuit via `ctx.op = 'none'` which tells ES to skip the write // entirely — no version bump, no disk I/O. +// +// addRoomScript additionally routes by `params.hss`: +// - hss > 0 → rid lives in `restrictedRooms{rid: hss}` and is removed from `rooms[]` +// - hss <= 0 → rid lives in `rooms[]` and is removed from `restrictedRooms{}` +// +// This makes admin-driven restriction transitions atomic inside a single +// update: the rid always ends up in exactly one of the two slots. +// +// Painless lacks nullable primitives in script params, so the Go side passes +// `hss = 0` for unrestricted and `hss = *event.HistorySharedSince` otherwise. +// The Go↔painless contract: publishers MUST emit nil for unrestricted rooms +// on the wire — a `&0` is treated as unrestricted by this script and would be +// a silent contract violation. const ( addRoomScript = `if (ctx._source.roomTimestamps == null) { ctx._source.roomTimestamps = [:]; } ` + `if (ctx._source.rooms == null) { ctx._source.rooms = []; } ` + + `if (ctx._source.restrictedRooms == null) { ctx._source.restrictedRooms = [:]; } ` + `long stored = ctx._source.roomTimestamps.containsKey(params.rid) ` + `? ((Number)ctx._source.roomTimestamps.get(params.rid)).longValue() : 0L; ` + `if (params.ts > stored) { ` + + `if (params.hss > 0) { ` + + `ctx._source.restrictedRooms[params.rid] = params.hss; ` + + `ctx._source.rooms.removeIf(r -> r == params.rid); ` + + `} else { ` + `if (!ctx._source.rooms.contains(params.rid)) { ctx._source.rooms.add(params.rid); } ` + + `ctx._source.restrictedRooms.remove(params.rid); ` + + `} ` + `ctx._source.roomTimestamps.put(params.rid, params.ts); ` + `ctx._source.updatedAt = params.now; ` + `} else { ctx.op = 'none'; }` @@ -66,6 +86,7 @@ const ( `if (ctx._source.rooms != null) { ` + `int idx = ctx._source.rooms.indexOf(params.rid); ` + `if (idx >= 0) { ctx._source.rooms.remove(idx); } } ` + + `if (ctx._source.restrictedRooms != null) { ctx._source.restrictedRooms.remove(params.rid); } ` + `ctx._source.roomTimestamps.put(params.rid, params.ts); ` + `} else { ctx.op = 'none'; }` ) @@ -75,10 +96,9 @@ const ( // user-room doc updates from a single event (each account touches a // different user's doc, keyed by account). // -// Restricted rooms (HistorySharedSince != 0 on the event) short-circuit the -// entire bulk and return an empty slice — the search service handles -// restricted rooms via DB+cache at query time, so nothing needs to be -// indexed. The handler then acks the source message without touching ES. +// Restricted rooms (HistorySharedSince != nil on the event) are routed into +// `restrictedRooms{}` on the user-room doc — the search service reads both +// `rooms[]` and `restrictedRooms{}` directly from ES at query time. func (c *userRoomCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { evt, payload, err := parseMemberEvent(data) if err != nil { @@ -87,19 +107,17 @@ func (c *userRoomCollection) BuildAction(data []byte) ([]searchengine.BulkAction if payload.RoomID == "" { return nil, fmt.Errorf("build user-room action: missing roomId") } - // Event-level restricted-room short-circuit runs BEFORE account - // validation so restricted events with any payload shape (including an - // empty Accounts slice) are uniformly skipped per the InboxMemberEvent - // contract — no surprise error on an otherwise-valid "skip me" event. - if payload.HistorySharedSince != 0 { - return nil, nil - } if len(payload.Accounts) == 0 { return nil, fmt.Errorf("build user-room action: empty accounts") } ts := evt.Timestamp roomID := payload.RoomID + // Translate *int64 → painless-safe int64 (sentinel contract lives on addRoomScript). + var hss int64 + if payload.HistorySharedSince != nil { + hss = *payload.HistorySharedSince + } actions := make([]searchengine.BulkAction, 0, len(payload.Accounts)) for i, account := range payload.Accounts { if account == "" { @@ -108,7 +126,7 @@ func (c *userRoomCollection) BuildAction(data []byte) ([]searchengine.BulkAction switch evt.Type { case model.OutboxMemberAdded: - body, err := buildAddRoomUpdateBody(account, roomID, ts) + body, err := buildAddRoomUpdateBody(account, roomID, ts, hss) if err != nil { return nil, err } @@ -137,18 +155,41 @@ func (c *userRoomCollection) BuildAction(data []byte) ([]searchengine.BulkAction } // userRoomUpsertDoc is the full document inserted when the user has no prior -// user-room entry (i.e., the first time a room is added for this user). The -// `RoomTimestamps` map seeds the per-room timestamp guard. +// user-room entry (i.e., the first time a room is added for this user). +// +// Rooms holds unrestricted room IDs; RestrictedRooms maps rid → +// historySharedSince (millis) for rooms the user joined with a history +// restriction. RoomTimestamps seeds the per-room LWW timestamp guard used +// uniformly across both paths. type userRoomUpsertDoc struct { - UserAccount string `json:"userAccount"` - Rooms []string `json:"rooms"` - RoomTimestamps map[string]int64 `json:"roomTimestamps"` - CreatedAt string `json:"createdAt"` - UpdatedAt string `json:"updatedAt"` + UserAccount string `json:"userAccount"` + Rooms []string `json:"rooms"` + RestrictedRooms map[string]int64 `json:"restrictedRooms"` + RoomTimestamps map[string]int64 `json:"roomTimestamps"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` } -func buildAddRoomUpdateBody(account, roomID string, ts int64) (json.RawMessage, error) { +func buildAddRoomUpdateBody(account, roomID string, ts, hss int64) (json.RawMessage, error) { now := time.UnixMilli(ts).UTC().Format(time.RFC3339Nano) + + // Seed the upsert document so the first-insert shape matches the + // painless-updated shape: restricted rooms go straight into + // restrictedRooms{}, unrestricted rooms go straight into rooms[]. + upsert := userRoomUpsertDoc{ + UserAccount: account, + Rooms: []string{}, + RestrictedRooms: map[string]int64{}, + RoomTimestamps: map[string]int64{roomID: ts}, + CreatedAt: now, + UpdatedAt: now, + } + if hss > 0 { + upsert.RestrictedRooms[roomID] = hss + } else { + upsert.Rooms = []string{roomID} + } + body := map[string]any{ "script": map[string]any{ "source": addRoomScript, @@ -156,16 +197,11 @@ func buildAddRoomUpdateBody(account, roomID string, ts int64) (json.RawMessage, "params": map[string]any{ "rid": roomID, "ts": ts, + "hss": hss, "now": now, }, }, - "upsert": userRoomUpsertDoc{ - UserAccount: account, - Rooms: []string{roomID}, - RoomTimestamps: map[string]int64{roomID: ts}, - CreatedAt: now, - UpdatedAt: now, - }, + "upsert": upsert, } data, err := json.Marshal(body) if err != nil { @@ -217,9 +253,14 @@ func userRoomTemplateBody(indexName string) json.RawMessage { "keyword": map[string]any{"type": "keyword", "ignore_above": 256}, }, }, - "roomTimestamps": map[string]any{"type": "flattened"}, - "createdAt": map[string]any{"type": "date"}, - "updatedAt": map[string]any{"type": "date"}, + // restrictedRooms is a rid → historySharedSince (millis) + // map. `flattened` keeps the mapping stable regardless of + // how many restricted rids show up — same approach as + // roomTimestamps. + "restrictedRooms": map[string]any{"type": "flattened"}, + "roomTimestamps": map[string]any{"type": "flattened"}, + "createdAt": map[string]any{"type": "date"}, + "updatedAt": map[string]any{"type": "date"}, }, }, }, diff --git a/search-sync-worker/user_room_test.go b/search-sync-worker/user_room_test.go index 3fee0302d..a3c18cb68 100644 --- a/search-sync-worker/user_room_test.go +++ b/search-sync-worker/user_room_test.go @@ -53,6 +53,7 @@ func TestUserRoomCollection_TemplateBody(t *testing.T) { assert.Contains(t, props, "userAccount") assert.Contains(t, props, "rooms") + assert.Contains(t, props, "restrictedRooms") assert.Contains(t, props, "roomTimestamps") assert.Contains(t, props, "createdAt") assert.Contains(t, props, "updatedAt") @@ -60,6 +61,9 @@ func TestUserRoomCollection_TemplateBody(t *testing.T) { rt := props["roomTimestamps"].(map[string]any) assert.Equal(t, "flattened", rt["type"]) + rr := props["restrictedRooms"].(map[string]any) + assert.Equal(t, "flattened", rr["type"]) + rooms := props["rooms"].(map[string]any) assert.Equal(t, "text", rooms["type"]) } @@ -91,11 +95,15 @@ func TestUserRoomCollection_BuildAction_MemberAdded(t *testing.T) { assert.Contains(t, src, "ctx._source.rooms.add") assert.Contains(t, src, "ctx.op = 'none'") assert.Contains(t, src, "roomTimestamps") + assert.Contains(t, src, "restrictedRooms") assert.Contains(t, src, "params.ts") + assert.Contains(t, src, "params.hss") params := script["params"].(map[string]any) assert.Equal(t, "r-eng", params["rid"]) assert.Equal(t, float64(ts), params["ts"]) + assert.Equal(t, float64(0), params["hss"], + "unrestricted event must translate to hss=0 on the painless boundary") assert.NotEmpty(t, params["now"]) upsert, ok := body["upsert"].(map[string]any) @@ -105,11 +113,54 @@ func TestUserRoomCollection_BuildAction_MemberAdded(t *testing.T) { require.Len(t, rooms, 1) assert.Equal(t, "r-eng", rooms[0]) + // Unrestricted upsert seeds an empty restrictedRooms map so the shape is + // consistent with the script-updated shape. + restricted := upsert["restrictedRooms"].(map[string]any) + assert.Empty(t, restricted) + roomTimestamps := upsert["roomTimestamps"].(map[string]any) assert.Equal(t, float64(ts), roomTimestamps["r-eng"]) assert.NotEmpty(t, upsert["createdAt"]) } +func TestUserRoomCollection_BuildAction_MemberAdded_Restricted(t *testing.T) { + coll := newUserRoomCollection("user-room-site-a") + payload := baseInboxMemberEvent() + const ts int64 = 1735689700000 + const hssVal int64 = 1735689500000 + hss := hssVal + payload.HistorySharedSince = &hss + data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, ts) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1, + "restricted add must still produce an action — user-room now stores it") + + action := actions[0] + require.NotNil(t, action.Doc) + + var body map[string]any + require.NoError(t, json.Unmarshal(action.Doc, &body)) + + params := body["script"].(map[string]any)["params"].(map[string]any) + assert.Equal(t, float64(hssVal), params["hss"], + "restricted event must pass hss through to the painless params") + + upsert := body["upsert"].(map[string]any) + rooms := upsert["rooms"].([]any) + assert.Empty(t, rooms, + "restricted upsert must NOT seed rooms[] with the rid") + + restricted := upsert["restrictedRooms"].(map[string]any) + assert.Equal(t, float64(hssVal), restricted["r-eng"], + "restricted upsert must seed restrictedRooms[rid] with the HSS") + + roomTimestamps := upsert["roomTimestamps"].(map[string]any) + assert.Equal(t, float64(ts), roomTimestamps["r-eng"], + "LWW timestamp guard applies to restricted path too") +} + func TestUserRoomCollection_BuildAction_MemberRemoved(t *testing.T) { coll := newUserRoomCollection("user-room-site-a") payload := baseInboxMemberEvent() @@ -146,21 +197,56 @@ func TestUserRoomCollection_BuildAction_MemberRemoved(t *testing.T) { assert.False(t, hasUpsert, "remove update body must not contain upsert") } -// TestUserRoomCollection_BuildAction_RestrictedRoomSkipped verifies the -// event-level restricted-room short-circuit: when HistorySharedSince is -// non-zero on the event, every account in the bulk is skipped (no actions). -// The search service handles restricted rooms via DB+cache at query time. -func TestUserRoomCollection_BuildAction_RestrictedRoomSkipped(t *testing.T) { +// TestUserRoomCollection_BuildAction_BulkMixed_AllRestricted verifies that a +// restricted bulk fans out: every account in the bulk gets its own upsert +// seeded with `restrictedRooms[rid] = hss` and an empty `rooms[]`. All +// actions share the same HSS (event-level field). +func TestUserRoomCollection_BuildAction_BulkMixed_AllRestricted(t *testing.T) { coll := newUserRoomCollection("user-room-site-a") payload := baseInboxMemberEvent() - payload.Accounts = []string{"alice", "bob"} - payload.HistorySharedSince = 1735689500000 + payload.Accounts = []string{"alice", "bob", "carol"} + const hssVal int64 = 1735689500000 + hss := hssVal + payload.HistorySharedSince = &hss data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, 100) actions, err := coll.BuildAction(data) require.NoError(t, err) - assert.Empty(t, actions, "restricted-room event should produce no actions") + require.Len(t, actions, 3, "restricted bulk must fan out per account") + + for _, action := range actions { + var body map[string]any + require.NoError(t, json.Unmarshal(action.Doc, &body)) + params := body["script"].(map[string]any)["params"].(map[string]any) + assert.Equal(t, float64(hssVal), params["hss"]) + + upsert := body["upsert"].(map[string]any) + assert.Empty(t, upsert["rooms"].([]any)) + restricted := upsert["restrictedRooms"].(map[string]any) + assert.Equal(t, float64(hssVal), restricted["r-eng"]) + } +} + +// TestUserRoomCollection_BuildAction_RemoveScriptEvictsBoth verifies the +// remove body touches both rooms[] and restrictedRooms{} so a member_removed +// event works regardless of which slot currently holds the rid. +func TestUserRoomCollection_BuildAction_RemoveScriptEvictsBoth(t *testing.T) { + coll := newUserRoomCollection("user-room-site-a") + payload := baseInboxMemberEvent() + data := makeInboxMemberEvent(t, model.OutboxMemberRemoved, payload, 200) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + + var body map[string]any + require.NoError(t, json.Unmarshal(actions[0].Doc, &body)) + src := body["script"].(map[string]any)["source"].(string) + assert.Contains(t, src, "ctx._source.rooms.remove", + "remove script must evict from rooms[]") + assert.Contains(t, src, "ctx._source.restrictedRooms.remove", + "remove script must evict from restrictedRooms{}") } // TestUserRoomCollection_BuildAction_BulkInvite verifies fan-out: one event