Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions inbox-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent)

joinedAt := time.UnixMilli(event.JoinedAt).UTC()
var historySharedSince *time.Time
if event.HistorySharedSince > 0 {
t := time.UnixMilli(event.HistorySharedSince).UTC()
if event.HistorySharedSince != nil && *event.HistorySharedSince > 0 {
t := time.UnixMilli(*event.HistorySharedSince).UTC()
historySharedSince = &t
}

Expand Down
26 changes: 15 additions & 11 deletions inbox-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ func TestHandleEvent_MemberAdded(t *testing.T) {
pub := &mockPublisher{}
h := NewHandler(store, pub)

hssMillis := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli()
change := model.MemberAddEvent{
Type: "member_added",
RoomID: "room-1",
Accounts: []string{"bob"},
SiteID: "site-b",
JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
HistorySharedSince: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
HistorySharedSince: &hssMillis,
Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
}
changeData, err := json.Marshal(change)
Expand Down Expand Up @@ -240,14 +241,15 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) {

joinedAt := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC)
historyShared := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC)
hssMillis := historyShared.UnixMilli()

change := model.MemberAddEvent{
Type: "member_added",
RoomID: "room-2",
Accounts: []string{"carol"},
SiteID: "site-b",
JoinedAt: joinedAt.UnixMilli(),
HistorySharedSince: historyShared.UnixMilli(),
HistorySharedSince: &hssMillis,
Timestamp: joinedAt.UnixMilli(),
}
changeData, _ := json.Marshal(change)
Expand Down Expand Up @@ -467,13 +469,14 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) {
pub := &mockPublisher{}
h := NewHandler(store, pub)

hssMillis := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli()
change := model.MemberAddEvent{
Type: "member_added",
RoomID: "room-1",
Accounts: []string{"account-bob"},
SiteID: "site-b",
JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
HistorySharedSince: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
HistorySharedSince: &hssMillis,
Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
}
changeData, err := json.Marshal(change)
Expand Down Expand Up @@ -529,14 +532,15 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) {

joinedAt := time.Date(2026, 4, 5, 10, 30, 0, 0, time.UTC)
historyShared := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC)
hssMillis := historyShared.UnixMilli()

change := model.MemberAddEvent{
Type: "member_added",
RoomID: "room-99",
Accounts: []string{"alice", "bob"},
SiteID: "site-b",
JoinedAt: joinedAt.UnixMilli(),
HistorySharedSince: historyShared.UnixMilli(),
HistorySharedSince: &hssMillis,
Timestamp: joinedAt.UnixMilli(),
}
changeData, _ := json.Marshal(change)
Expand Down Expand Up @@ -609,13 +613,13 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) {
h := NewHandler(store, pub)

change := model.MemberAddEvent{
Type: "member_added",
RoomID: "room-1",
Accounts: []string{"dave"},
SiteID: "site-b",
JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
HistorySharedSince: 0, // means "all history"
Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
Type: "member_added",
RoomID: "room-1",
Accounts: []string{"dave"},
SiteID: "site-b",
JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
// HistorySharedSince nil → "all history"
Timestamp: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(),
}
changeData, _ := json.Marshal(change)

Expand Down
3 changes: 2 additions & 1 deletion inbox-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func TestInboxWorker_MemberAdded_Integration(t *testing.T) {
}

// Create outbox event for member_added
hssMillis := time.Now().UTC().UnixMilli()
change := model.MemberAddEvent{
Type: "member_added", RoomID: "r1", Accounts: []string{"u2"}, SiteID: "site-b",
JoinedAt: time.Now().UTC().UnixMilli(),
HistorySharedSince: time.Now().UTC().UnixMilli(),
HistorySharedSince: &hssMillis,
Timestamp: time.Now().UTC().UnixMilli(),
}
changeData, _ := json.Marshal(change)
Expand Down
18 changes: 9 additions & 9 deletions message-worker/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ func NewCassandraStore(session *gocql.Session) *CassandraStore {
// If either insert fails the error is returned immediately; JetStream will redeliver the message.
func (s *CassandraStore) SaveMessage(ctx context.Context, msg *model.Message, sender *cassParticipant, siteID string) error {
if err := s.cassSession.Query(
`INSERT INTO messages_by_room (room_id, created_at, message_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
msg.RoomID, msg.CreatedAt, msg.ID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData,
`INSERT INTO messages_by_room (room_id, created_at, message_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data, tshow)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
msg.RoomID, msg.CreatedAt, msg.ID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, msg.TShow,
).WithContext(ctx).Exec(); err != nil {
return fmt.Errorf("insert messages_by_room %s: %w", msg.ID, err)
}

if err := s.cassSession.Query(
`INSERT INTO messages_by_id (message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData,
`INSERT INTO messages_by_id (message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions, type, sys_msg_data, tshow)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions), msg.Type, msg.SysMsgData, msg.TShow,
).WithContext(ctx).Exec(); err != nil {
return fmt.Errorf("insert messages_by_id %s: %w", msg.ID, err)
}
Expand All @@ -89,10 +89,10 @@ func (s *CassandraStore) SaveThreadMessage(ctx context.Context, msg *model.Messa
if err := s.cassSession.Query(
`INSERT INTO messages_by_id
(message_id, created_at, room_id, sender, msg, site_id, updated_at, mentions,
thread_room_id, thread_parent_id, thread_parent_created_at, type, sys_msg_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
thread_room_id, thread_parent_id, thread_parent_created_at, type, sys_msg_data, tshow)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
msg.ID, msg.CreatedAt, msg.RoomID, sender, msg.Content, siteID, msg.CreatedAt, toMentionSet(msg.Mentions),
threadRoomID, msg.ThreadParentMessageID, msg.ThreadParentMessageCreatedAt, msg.Type, msg.SysMsgData,
threadRoomID, msg.ThreadParentMessageID, msg.ThreadParentMessageCreatedAt, msg.Type, msg.SysMsgData, msg.TShow,
).WithContext(ctx).Exec(); err != nil {
return fmt.Errorf("insert messages_by_id %s: %w", msg.ID, err)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ type UpdateRoleRequest struct {
// search-sync-worker. One event represents a bulk add/remove of N Accounts
// against a single room; downstream consumers fan out per-account.
//
// HistorySharedSince is a single event-level flag shared by all accounts in
// the bulk: when non-zero, the room is history-restricted and consumers MUST
// skip indexing the entire event (the search service handles restricted rooms
// via DB+cache at query time). JoinedAt is only meaningful on add events and
// omitted on removes.
// HistorySharedSince == nil means the entire bulk is unrestricted; non-nil
// means all Accounts in the bulk are restricted from that timestamp. The
// user-room collection routes these into restrictedRooms{}; the spotlight
// collection skips non-nil events entirely for MVP. Publishers MUST emit nil
// for unrestricted rooms — never &0 and never a non-positive timestamp — so
// the Go↔painless boundary sentinel (hss <= 0 → unrestricted) stays sound.
// JoinedAt is only meaningful on add events and omitted on removes.
type InboxMemberEvent struct {
RoomID string `json:"roomId"`
RoomName string `json:"roomName"`
RoomType RoomType `json:"roomType"`
SiteID string `json:"siteId"`
Accounts []string `json:"accounts"`
HistorySharedSince int64 `json:"historySharedSince,omitempty"`
HistorySharedSince *int64 `json:"historySharedSince,omitempty"`
JoinedAt int64 `json:"joinedAt,omitempty"`
Timestamp int64 `json:"timestamp" bson:"timestamp"`
}
Expand Down Expand Up @@ -99,7 +101,7 @@ type MemberAddEvent struct {
Accounts []string `json:"accounts" bson:"accounts"`
SiteID string `json:"siteId" bson:"siteId"`
JoinedAt int64 `json:"joinedAt" bson:"joinedAt"`
HistorySharedSince int64 `json:"historySharedSince" bson:"historySharedSince"`
HistorySharedSince *int64 `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"`
Timestamp int64 `json:"timestamp" bson:"timestamp"`
}

Expand Down
1 change: 1 addition & 0 deletions pkg/model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Message struct {
CreatedAt time.Time `json:"createdAt" bson:"createdAt"`
ThreadParentMessageID string `json:"threadParentMessageId,omitempty" bson:"threadParentMessageId,omitempty"`
ThreadParentMessageCreatedAt *time.Time `json:"threadParentMessageCreatedAt,omitempty" bson:"threadParentMessageCreatedAt,omitempty"`
TShow bool `json:"tshow,omitempty" bson:"tshow,omitempty"`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Type string `json:"type,omitempty" bson:"type,omitempty"`
SysMsgData []byte `json:"sysMsgData,omitempty" bson:"sysMsgData,omitempty"`
}
Expand Down
110 changes: 92 additions & 18 deletions pkg/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,39 @@ func TestMessageJSON(t *testing.T) {
_, hasSysMsgData := raw["sysMsgData"]
assert.False(t, hasSysMsgData, "sysMsgData should be omitted when nil")
})

t.Run("tshow round-trips when true", func(t *testing.T) {
m := model.Message{
ID: "m1", RoomID: "r1", UserID: "u1", UserAccount: "alice",
Content: "thread reply shown in main feed",
CreatedAt: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC),
TShow: true,
}
data, err := json.Marshal(&m)
require.NoError(t, err)

var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
assert.Equal(t, true, raw["tshow"])

var dst model.Message
require.NoError(t, json.Unmarshal(data, &dst))
assert.True(t, dst.TShow)
})

t.Run("tshow omitted on the wire when false", func(t *testing.T) {
m := model.Message{
ID: "m1", RoomID: "r1", UserID: "u1", UserAccount: "alice",
Content: "plain message",
CreatedAt: time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC),
}
data, err := json.Marshal(&m)
require.NoError(t, err)
var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
_, present := raw["tshow"]
assert.False(t, present, "tshow should be omitted when false")
})
}

func TestSendMessageRequestJSON(t *testing.T) {
Expand Down Expand Up @@ -619,24 +652,30 @@ func TestInboxMemberEventJSON(t *testing.T) {
})

t.Run("add event, restricted room carries HistorySharedSince", func(t *testing.T) {
hss := int64(1735689500000)
src := model.InboxMemberEvent{
RoomID: "r1",
RoomName: "engineering",
RoomType: model.RoomTypeGroup,
SiteID: "site-a",
Accounts: []string{"alice"},
HistorySharedSince: 1735689500000,
HistorySharedSince: &hss,
JoinedAt: 1735689600000,
Timestamp: 1735689600000,
}
data, err := json.Marshal(&src)
require.NoError(t, err)
var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
assert.EqualValues(t, hss, raw["historySharedSince"])

var dst model.InboxMemberEvent
require.NoError(t, json.Unmarshal(data, &dst))
assert.Equal(t, src, dst)
require.NotNil(t, dst.HistorySharedSince)
assert.Equal(t, hss, *dst.HistorySharedSince)
})

t.Run("remove event omits HistorySharedSince and JoinedAt when zero", func(t *testing.T) {
t.Run("remove event omits HistorySharedSince and JoinedAt when nil/zero", func(t *testing.T) {
src := model.InboxMemberEvent{
RoomID: "r1",
RoomName: "engineering",
Expand All @@ -650,9 +689,13 @@ func TestInboxMemberEventJSON(t *testing.T) {
var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
_, hasHSS := raw["historySharedSince"]
assert.False(t, hasHSS, "historySharedSince should be omitted when zero")
assert.False(t, hasHSS, "historySharedSince should be omitted when nil")
_, hasJoinedAt := raw["joinedAt"]
assert.False(t, hasJoinedAt, "joinedAt should be omitted when zero")

var dst model.InboxMemberEvent
require.NoError(t, json.Unmarshal(data, &dst))
assert.Nil(t, dst.HistorySharedSince, "unrestricted event must decode HistorySharedSince as nil")
})
}

Expand Down Expand Up @@ -879,20 +922,51 @@ func TestMembersAddedJSON(t *testing.T) {
}

func TestMemberAddEventJSON(t *testing.T) {
src := model.MemberAddEvent{
Type: "member_added",
RoomID: "r1",
Accounts: []string{"alice", "bob"},
SiteID: "site-a",
JoinedAt: 1735689600000,
HistorySharedSince: 1735689600000,
Timestamp: 1735689600000,
}
data, err := json.Marshal(src)
require.NoError(t, err)
var dst model.MemberAddEvent
require.NoError(t, json.Unmarshal(data, &dst))
assert.Equal(t, src, dst)
t.Run("restricted room round-trips HistorySharedSince pointer", func(t *testing.T) {
hss := int64(1735689600000)
src := model.MemberAddEvent{
Type: "member_added",
RoomID: "r1",
Accounts: []string{"alice", "bob"},
SiteID: "site-a",
JoinedAt: 1735689600000,
HistorySharedSince: &hss,
Timestamp: 1735689600000,
}
data, err := json.Marshal(src)
require.NoError(t, err)

var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
assert.EqualValues(t, hss, raw["historySharedSince"])

var dst model.MemberAddEvent
require.NoError(t, json.Unmarshal(data, &dst))
require.NotNil(t, dst.HistorySharedSince)
assert.Equal(t, hss, *dst.HistorySharedSince)
})

t.Run("unrestricted room omits historySharedSince on the wire", func(t *testing.T) {
src := model.MemberAddEvent{
Type: "member_added",
RoomID: "r1",
Accounts: []string{"alice"},
SiteID: "site-a",
JoinedAt: 1735689600000,
Timestamp: 1735689600000,
}
data, err := json.Marshal(src)
require.NoError(t, err)

var raw map[string]any
require.NoError(t, json.Unmarshal(data, &raw))
_, hasHSS := raw["historySharedSince"]
assert.False(t, hasHSS, "historySharedSince must be omitted when nil")

var dst model.MemberAddEvent
require.NoError(t, json.Unmarshal(data, &dst))
assert.Nil(t, dst.HistorySharedSince)
})
}

func TestListRoomMembersRequestJSON(t *testing.T) {
Expand Down
18 changes: 14 additions & 4 deletions room-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,17 +620,27 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error {
}

// 8. Publish MemberAddEvent (actualAccounts was built above alongside subs)
var historySharedSince int64
// Never emit &0 — the painless sentinel `hss <= 0` would misroute the
// restricted room into `rooms[]`. If the upstream request is malformed
// (restricted mode but missing timestamp), leave the pointer nil + log —
// we can't silently translate to &0.
var historySharedSincePtr *int64
if req.History.Mode == model.HistoryModeNone {
historySharedSince = req.Timestamp
if req.Timestamp > 0 {
v := req.Timestamp
historySharedSincePtr = &v
} else {
slog.Error("restricted history with missing timestamp, emitting nil",
"roomID", req.RoomID, "mode", req.History.Mode)
}
}
memberAddEvt := model.MemberAddEvent{
Type: "member_added",
RoomID: req.RoomID,
Accounts: actualAccounts,
SiteID: room.SiteID,
JoinedAt: req.Timestamp,
HistorySharedSince: historySharedSince,
HistorySharedSince: historySharedSincePtr,
Timestamp: now.UnixMilli(),
}
memberAddData, _ := json.Marshal(memberAddEvt)
Expand Down Expand Up @@ -681,7 +691,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error {
Accounts: accounts,
SiteID: room.SiteID,
JoinedAt: req.Timestamp,
HistorySharedSince: historySharedSince,
HistorySharedSince: historySharedSincePtr,
Timestamp: now.UnixMilli(),
}
siteEvtData, _ := json.Marshal(siteEvt)
Expand Down
Loading