Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
52109e2
docs: spec for message-worker thread subscription outbox events
claude Apr 28, 2026
d0decfd
docs: implementation plan chunk 1 (tasks 1-3 — model+mention scaffold…
claude Apr 29, 2026
f98f6fa
docs: implementation plan chunk 2 (tasks 4-5 — publish helper + owner…
claude Apr 29, 2026
4d06909
docs: implementation plan chunk 3 (tasks 6-7 — wire publish in reply …
claude Apr 29, 2026
834ae3c
docs: implementation plan chunk 4 (tasks 8-9 — mention publish + main…
claude Apr 29, 2026
088a21b
docs: implementation plan chunk 5a (task 10 — inbox-worker dispatch)
claude Apr 29, 2026
d839405
docs: implementation plan chunk 5b (task 11 — Mongo UpsertThreadSubsc…
claude Apr 29, 2026
d8e3f30
docs: implementation plan chunk 5c (task 12 — inbox-worker integratio…
claude Apr 29, 2026
af07716
docs: implementation plan chunk 5d (self-review + handoff)
claude Apr 29, 2026
3106993
model: add OutboxThreadSubscriptionUpserted event type constant
claude Apr 29, 2026
644ee81
model+mention: add SiteID to Participant, propagate from Resolve
claude Apr 29, 2026
3537507
message-worker: plumb siteID and PublishFunc into Handler
claude Apr 29, 2026
244aa1e
message-worker: add publishThreadSubOutboxIfRemote helper
claude Apr 29, 2026
71db87d
message-worker: ThreadSubscription.SiteID = owner site; lookup parent…
claude Apr 29, 2026
7e017c2
message-worker: emit outbox event on first-reply ThreadSubscription i…
claude Apr 29, 2026
6058b59
message-worker: wrap first-reply outbox publish errors with role context
claude Apr 29, 2026
118aec6
message-worker: emit outbox event on subsequent-reply ThreadSubscript…
claude Apr 29, 2026
d98abf7
message-worker: emit outbox event on mention-marked ThreadSubscriptions
claude Apr 29, 2026
c1efd78
inbox-worker: dispatch thread_subscription_upserted to UpsertThreadSu…
claude Apr 29, 2026
fc6ee18
inbox-worker: Mongo UpsertThreadSubscription with monotonic hasMentio…
claude Apr 29, 2026
65a0f22
inbox-worker: integration tests for UpsertThreadSubscription monotoni…
claude Apr 29, 2026
e414041
message-worker: add same-site subsequent-reply test case for symmetry
claude Apr 29, 2026
371a142
test+docs: subsequent-reply parent-not-found case + $max-on-bool comment
claude Apr 29, 2026
0d53aa0
test: post-rebase fixups for upstream Publisher removal and userAccou…
claude Apr 29, 2026
56f99c2
ThreadSubscription.SiteID stays as room site; route by owner site sep…
claude Apr 30, 2026
0c33b23
docs: align spec/plan with post-review SiteID semantic
claude Apr 30, 2026
0cfddda
message-worker: fix outbox dedup-ID collision + don't drop parent sub…
claude Apr 30, 2026
2cfc181
docs: address CodeRabbit doc nits — code-fence langs, \$max example, …
claude Apr 30, 2026
06cd97f
style: wrap publish-closure errors + assert on json.Marshal in integr…
claude Apr 30, 2026
15f233b
inbox-worker: collection name → thread_subscriptions; inline outbox d…
claude May 4, 2026
55ab474
docs+log: spec collection name; clarify lookupOwnerSiteID warn message
claude May 4, 2026
4838110
test: fixup upstream NewHandler call site after rebase
claude May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,158 changes: 2,158 additions & 0 deletions docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions inbox-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type InboxStore interface {
UpdateSubscriptionRoles(ctx context.Context, account, roomID string, roles []model.Role) error
DeleteSubscriptionsByAccounts(ctx context.Context, roomID string, accounts []string) error
FindUsersByAccounts(ctx context.Context, accounts []string) ([]model.User, error)
UpsertThreadSubscription(ctx context.Context, sub *model.ThreadSubscription) error
}

// Handler processes incoming cross-site OutboxEvent messages.
Expand Down Expand Up @@ -47,6 +48,8 @@ func (h *Handler) HandleEvent(ctx context.Context, data []byte) error {
return h.handleRoomSync(ctx, &evt)
case "role_updated":
return h.handleRoleUpdated(ctx, &evt)
case "thread_subscription_upserted":
return h.handleThreadSubscriptionUpserted(ctx, &evt)
default:
slog.Warn("unknown event type, skipping", "type", evt.Type)
return nil
Expand Down Expand Up @@ -163,3 +166,19 @@ func (h *Handler) handleRoleUpdated(ctx context.Context, evt *model.OutboxEvent)
}
return nil
}

// handleThreadSubscriptionUpserted upserts a ThreadSubscription on the local
// site when message-worker on another site reports that a user (parent author,
// replier, or mentionee) is participating in a thread. The Mongo store layer
// is responsible for the monotonic hasMention merge — see store impl.
func (h *Handler) handleThreadSubscriptionUpserted(ctx context.Context, evt *model.OutboxEvent) error {
var sub model.ThreadSubscription
if err := json.Unmarshal(evt.Payload, &sub); err != nil {
return fmt.Errorf("unmarshal thread_subscription_upserted payload: %w", err)
}
if err := h.store.UpsertThreadSubscription(ctx, &sub); err != nil {
return fmt.Errorf("upsert thread subscription (threadRoomID %q, userID %q): %w",
sub.ThreadRoomID, sub.UserID, err)
}
return nil
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
138 changes: 138 additions & 0 deletions inbox-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type stubInboxStore struct {
rooms []model.Room
roleUpdates []roleUpdate
users []model.User
threadSubs []model.ThreadSubscription
}

func (s *stubInboxStore) CreateSubscription(ctx context.Context, sub *model.Subscription) error {
Expand Down Expand Up @@ -128,6 +129,31 @@ func (s *stubInboxStore) BulkCreateSubscriptions(_ context.Context, subs []*mode
return nil
}

func (s *stubInboxStore) UpsertThreadSubscription(_ context.Context, sub *model.ThreadSubscription) error {
s.mu.Lock()
defer s.mu.Unlock()
for i := range s.threadSubs {
if s.threadSubs[i].ThreadRoomID == sub.ThreadRoomID && s.threadSubs[i].UserID == sub.UserID {
// Monotonic hasMention merge — never clear true→false.
if sub.HasMention {
s.threadSubs[i].HasMention = true
}
s.threadSubs[i].UpdatedAt = sub.UpdatedAt
return nil
}
}
s.threadSubs = append(s.threadSubs, *sub)
return nil
}

func (s *stubInboxStore) getThreadSubs() []model.ThreadSubscription {
s.mu.Lock()
defer s.mu.Unlock()
cp := make([]model.ThreadSubscription, len(s.threadSubs))
copy(cp, s.threadSubs)
return cp
}

// --- Tests ---

func TestHandleEvent_MemberAdded(t *testing.T) {
Expand Down Expand Up @@ -773,3 +799,115 @@ func TestHandleEvent_MemberRemoved_DeleteError(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "delete subscriptions")
}

func TestHandleEvent_ThreadSubscriptionUpserted_Insert(t *testing.T) {
store := &stubInboxStore{}
h := NewHandler(store)

now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
// SiteID is the room's home site (site-a), preserved across federation.
sub := model.ThreadSubscription{
ID: "sub-1",
ParentMessageID: "pm-1",
RoomID: "r1",
ThreadRoomID: "tr-1",
UserID: "u-bob",
UserAccount: "bob",
SiteID: "site-a",
HasMention: false,
CreatedAt: now,
UpdatedAt: now,
}
subData, err := json.Marshal(sub)
require.NoError(t, err)

evt := model.OutboxEvent{
Type: "thread_subscription_upserted",
SiteID: "site-a",
DestSiteID: "site-b",
Payload: subData,
Timestamp: now.UnixMilli(),
}
evtData, _ := json.Marshal(evt)

require.NoError(t, h.HandleEvent(context.Background(), evtData))

got := store.getThreadSubs()
require.Len(t, got, 1)
assert.Equal(t, sub, got[0])
}

func TestHandleEvent_ThreadSubscriptionUpserted_MonotonicHasMention(t *testing.T) {
store := &stubInboxStore{}
h := NewHandler(store)

now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
// SiteID is the room's home site (site-a), preserved across federation.
mentionSub := model.ThreadSubscription{
ID: "sub-1", ParentMessageID: "pm-1", RoomID: "r1", ThreadRoomID: "tr-1",
UserID: "u-bob", UserAccount: "bob", SiteID: "site-a",
HasMention: true, CreatedAt: now, UpdatedAt: now,
}
mentionData, _ := json.Marshal(mentionSub)
mentionEvt, _ := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: mentionData, Timestamp: now.UnixMilli(),
})
require.NoError(t, h.HandleEvent(context.Background(), mentionEvt))

// Second event for same (threadRoomID, userID) with HasMention=false must NOT clear it.
plainSub := mentionSub
plainSub.HasMention = false
plainSub.UpdatedAt = now.Add(time.Minute)
plainData, _ := json.Marshal(plainSub)
plainEvt, _ := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: plainData, Timestamp: plainSub.UpdatedAt.UnixMilli(),
})
require.NoError(t, h.HandleEvent(context.Background(), plainEvt))

got := store.getThreadSubs()
require.Len(t, got, 1)
assert.True(t, got[0].HasMention, "hasMention must remain true after a non-mention event")
}

func TestHandleEvent_ThreadSubscriptionUpserted_InvalidPayload(t *testing.T) {
store := &stubInboxStore{}
h := NewHandler(store)

evt := model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: []byte("not json"),
}
evtData, _ := json.Marshal(evt)

require.Error(t, h.HandleEvent(context.Background(), evtData))
assert.Empty(t, store.getThreadSubs())
}

func TestHandleEvent_ThreadSubscriptionUpserted_StoreError(t *testing.T) {
store := &errorThreadSubStore{stubInboxStore: &stubInboxStore{}}
h := NewHandler(store)

now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
sub := model.ThreadSubscription{
ID: "sub-1", ThreadRoomID: "tr-1", UserID: "u-bob", SiteID: "site-a",
CreatedAt: now, UpdatedAt: now,
}
subData, _ := json.Marshal(sub)
evtData, _ := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: subData, Timestamp: now.UnixMilli(),
})

err := h.HandleEvent(context.Background(), evtData)
require.Error(t, err)
}

type errorThreadSubStore struct {
*stubInboxStore
}

func (s *errorThreadSubStore) UpsertThreadSubscription(_ context.Context, _ *model.ThreadSubscription) error {
return fmt.Errorf("boom")
}
119 changes: 119 additions & 0 deletions inbox-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,122 @@ func TestInboxWorker_MemberRemoved_Integration(t *testing.T) {

// No publish — room-worker handles user notification via NATS supercluster.
}

func TestInboxWorker_ThreadSubscriptionUpserted_Insert_Integration(t *testing.T) {
db := setupMongo(t)
ctx := context.Background()

store := &mongoInboxStore{
subCol: db.Collection("subscriptions"),
roomCol: db.Collection("rooms"),
userCol: db.Collection("users"),
threadSubCol: db.Collection("thread_subscriptions"),
}
require.NoError(t, store.ensureIndexes(ctx))

handler := NewHandler(store)

now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
// Subscription.SiteID is the room's home site (site-a). Bob's home is site-b
// (where this inbox-worker instance lives), inferred from the document being
// stored on this site rather than from the field.
sub := model.ThreadSubscription{
ID: "sub-1", ParentMessageID: "pm-1", RoomID: "r1", ThreadRoomID: "tr-1",
UserID: "u-bob", UserAccount: "bob", SiteID: "site-a",
HasMention: false, CreatedAt: now, UpdatedAt: now,
}
subData, err := json.Marshal(sub)
require.NoError(t, err)
evtData, err := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: subData, Timestamp: now.UnixMilli(),
})
require.NoError(t, err)

require.NoError(t, handler.HandleEvent(ctx, evtData))

var got model.ThreadSubscription
require.NoError(t, db.Collection("thread_subscriptions").
FindOne(ctx, bson.M{"threadRoomId": "tr-1", "userId": "u-bob"}).
Decode(&got))
assert.Equal(t, "sub-1", got.ID)
assert.Equal(t, "site-a", got.SiteID, "SiteID is the room's site, preserved across federation")
assert.False(t, got.HasMention)
assert.True(t, got.CreatedAt.Equal(now))
assert.True(t, got.UpdatedAt.Equal(now))
}

func TestInboxWorker_ThreadSubscriptionUpserted_MonotonicMention_Integration(t *testing.T) {
db := setupMongo(t)
ctx := context.Background()

store := &mongoInboxStore{
subCol: db.Collection("subscriptions"),
roomCol: db.Collection("rooms"),
userCol: db.Collection("users"),
threadSubCol: db.Collection("thread_subscriptions"),
}
require.NoError(t, store.ensureIndexes(ctx))

handler := NewHandler(store)
now := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)

// First event: HasMention=true. Subscription.SiteID is the room's site (site-a).
mentionSub := model.ThreadSubscription{
ID: "sub-1", ParentMessageID: "pm-1", RoomID: "r1", ThreadRoomID: "tr-1",
UserID: "u-bob", UserAccount: "bob", SiteID: "site-a",
HasMention: true, CreatedAt: now, UpdatedAt: now,
}
mentionData, err := json.Marshal(mentionSub)
require.NoError(t, err)
mentionEvt, err := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: mentionData, Timestamp: now.UnixMilli(),
})
require.NoError(t, err)
require.NoError(t, handler.HandleEvent(ctx, mentionEvt))

// Second event: HasMention=false (later updatedAt). Must NOT clear the flag.
plainSub := mentionSub
plainSub.HasMention = false
later := now.Add(time.Minute)
plainSub.UpdatedAt = later
plainData, err := json.Marshal(plainSub)
require.NoError(t, err)
plainEvt, err := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: plainData, Timestamp: later.UnixMilli(),
})
require.NoError(t, err)
require.NoError(t, handler.HandleEvent(ctx, plainEvt))

var got model.ThreadSubscription
require.NoError(t, db.Collection("thread_subscriptions").
FindOne(ctx, bson.M{"threadRoomId": "tr-1", "userId": "u-bob"}).
Decode(&got))
assert.True(t, got.HasMention, "hasMention must remain true after a non-mention event")
assert.True(t, got.UpdatedAt.Equal(later), "updatedAt must advance to the later event's value")
// _id and createdAt come from $setOnInsert and must remain from the first event.
assert.Equal(t, "sub-1", got.ID)
assert.True(t, got.CreatedAt.Equal(now))

// Third event: HasMention=true again. Idempotent — still true, updatedAt advances.
thirdSub := plainSub
thirdSub.HasMention = true
evenLater := later.Add(time.Minute)
thirdSub.UpdatedAt = evenLater
thirdData, err := json.Marshal(thirdSub)
require.NoError(t, err)
thirdEvt, err := json.Marshal(model.OutboxEvent{
Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b",
Payload: thirdData, Timestamp: evenLater.UnixMilli(),
})
require.NoError(t, err)
require.NoError(t, handler.HandleEvent(ctx, thirdEvt))

require.NoError(t, db.Collection("thread_subscriptions").
FindOne(ctx, bson.M{"threadRoomId": "tr-1", "userId": "u-bob"}).
Decode(&got))
assert.True(t, got.HasMention)
assert.True(t, got.UpdatedAt.Equal(evenLater))
}
Loading
Loading