feat: real-time thread reply fan-out (broadcast-worker) + reply-count badge pipeline#245
feat: real-time thread reply fan-out (broadcast-worker) + reply-count badge pipeline#245ngangwar962 wants to merge 14 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds thread-reply event handling and metadata publishing across the platform. Broadcast-worker now routes hidden thread replies to dedicated handlers that compute recipient sets from thread followers and mentions, build/encrypt payloads, and publish per-subscriber. Message-worker publishes thread-reply-added events with parent reply counts. History-service computes tcount via row counting instead of CAS, and room-service atomically removes thread reads. Event model gains thread event types and ThreadMetadataUpdatedEvent. ChangesThread Reply Fan-Out and Metadata Events
🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
history-service/internal/publisher/publisher.go (1)
14-17: ⚡ Quick winRename
coreConnto follow the single-method interface rule.This is a single-method interface, so it should use an
-ername such ascorePublisher. As per coding guidelines, "Interfaces must use the-ersuffix for single-method interfaces and<Domain>Storepattern for store interfaces".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/publisher/publisher.go` around lines 14 - 17, The interface currently named coreConn is a single-method interface and should be renamed to follow the `-er` convention; update the interface name from coreConn to corePublisher (keeping the method signature Publish(ctx context.Context, subject string, data []byte) error) and update all references/usages of coreConn in publisher.go and any callers to use corePublisher so the code compiles and adheres to naming guidelines.history-service/internal/service/messages_test.go (1)
1509-1596: ⚡ Quick winAdd the
PublishCorefailure case for the new metadata path.These tests lock in the success and
newTcount == nilbranches, but not the branch wherePublishCorefails andDeleteMessagestill succeeds. That best-effort contract is subtle enough to deserve an explicit test. As per coding guidelines, "Tests must cover: happy path, error paths, edge cases (empty collections, boundary conditions), and invalid input".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/messages_test.go` around lines 1509 - 1596, Add a new unit test mirroring TestHistoryService_DeleteMessage_ThreadReply_PublishesThreadMetadataEvent but exercise the PublishCore failure path: set up GetHistorySharedSince and GetMessageByID the same, have msgs.SoftDeleteMessage return a non-nil newTcount, expect pub.Publish for MsgCanonicalDeleted, and then expect pub.EXPECT().PublishCore(...).Return(errors.New("publish failure")) (or similar) to simulate a failing PublishCore; call svc.DeleteMessage(site, models.DeleteMessageRequest{MessageID: "..."} ) and assert it returns no error and the response is non-nil (i.e., DeleteMessage succeeds despite PublishCore failing). Ensure the test references the same symbols (svc.DeleteMessage, msgs.SoftDeleteMessage, pub.PublishCore) so the mock expectations align with the existing tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@docs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.md`:
- Around line 55-56: Update the doc line that describes the covering index to
match the implementation: change the index tuple mentioned for the
thread_subscriptions query from (parentMessageId, userAccount) to
(parentMessageId, siteId) so the spec aligns with the code in
broadcast-worker/store_mongo.go; reference the collection name
thread_subscriptions and the fields parentMessageId and siteId in the updated
sentence.
In `@history-service/internal/cassrepo/write.go`:
- Around line 242-262: The messages_by_room mirror update must not turn a
successful delete into an RPC failure: in the read and casDecrement error
branches inside the parent CAS block (references: r.session.Query(...) scanning
tcount, casDecrement(...), parentID, newTcountByID), stop returning an error and
instead treat the mirror as best-effort by returning &newTcountByID (optionally
logging the failure). Keep the existing behavior for gocql.ErrNotFound, but
change the other error returns (both the read error path and the casDecrement
error path) to return &newTcountByID so the authoritative messages_by_id result
remains the outcome.
In `@message-worker/handler.go`:
- Around line 104-105: The slog.Warn call logging the failed publish of thread
metadata (the call that currently includes "error", err, "roomID",
evt.Message.RoomID, "messageID", evt.Message.ID) must include the propagated
request/correlation ID; add the key "request_id" with value
natsutil.RequestIDFromContext(ctx) to that structured log entry so it matches
the project's logging pattern and other occurrences (e.g., store_cassandra.go
usage).
---
Nitpick comments:
In `@history-service/internal/publisher/publisher.go`:
- Around line 14-17: The interface currently named coreConn is a single-method
interface and should be renamed to follow the `-er` convention; update the
interface name from coreConn to corePublisher (keeping the method signature
Publish(ctx context.Context, subject string, data []byte) error) and update all
references/usages of coreConn in publisher.go and any callers to use
corePublisher so the code compiles and adheres to naming guidelines.
In `@history-service/internal/service/messages_test.go`:
- Around line 1509-1596: Add a new unit test mirroring
TestHistoryService_DeleteMessage_ThreadReply_PublishesThreadMetadataEvent but
exercise the PublishCore failure path: set up GetHistorySharedSince and
GetMessageByID the same, have msgs.SoftDeleteMessage return a non-nil newTcount,
expect pub.Publish for MsgCanonicalDeleted, and then expect
pub.EXPECT().PublishCore(...).Return(errors.New("publish failure")) (or similar)
to simulate a failing PublishCore; call svc.DeleteMessage(site,
models.DeleteMessageRequest{MessageID: "..."} ) and assert it returns no error
and the response is non-nil (i.e., DeleteMessage succeeds despite PublishCore
failing). Ensure the test references the same symbols (svc.DeleteMessage,
msgs.SoftDeleteMessage, pub.PublishCore) so the mock expectations align with the
existing tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 35b17d15-de9e-407e-9fb5-fbaa3a4509fc
📒 Files selected for processing (24)
broadcast-worker/handler.gobroadcast-worker/handler_test.gobroadcast-worker/integration_test.gobroadcast-worker/main.gobroadcast-worker/mock_store_test.gobroadcast-worker/store.gobroadcast-worker/store_mongo.godocs/superpowers/plans/2026-05-28-broadcast-worker-thread-handling.mddocs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.mdhistory-service/cmd/main.gohistory-service/internal/cassrepo/write.gohistory-service/internal/publisher/publisher.gohistory-service/internal/service/messages.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/service.gomessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_cassandra.gomessage-worker/store_cassandra_test.gopkg/model/event.gopkg/model/model_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
history-service/internal/service/integration_test.go (1)
409-409: 💤 Low valueConsider asserting the exact NewTCount value.
The test seeds the parent with
tcount = 1and deletes the single reply, soNewTCountshould be exactly0after the decrement. The current assertionGreaterOrEqualvalidates the non-negative constraint but is looser than necessary.📊 More precise assertion
- assert.GreaterOrEqual(t, evt.NewTCount, 0, "NewTCount must be non-negative after decrement") + assert.Equal(t, 0, evt.NewTCount, "NewTCount should be 0 after deleting the single reply")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/integration_test.go` at line 409, The assertion on evt.NewTCount is too loose; since the test seeds the parent with tcount = 1 and deletes the only reply, assert that evt.NewTCount equals 0 exactly. Replace the call to assert.GreaterOrEqual(t, evt.NewTCount, 0, "...") with assert.Equal(t, 0, evt.NewTCount, "NewTCount must be 0 after deleting the single reply") so the test verifies the precise expected decrement of evt.NewTCount.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@history-service/internal/service/integration_test.go`:
- Line 409: The assertion on evt.NewTCount is too loose; since the test seeds
the parent with tcount = 1 and deletes the only reply, assert that evt.NewTCount
equals 0 exactly. Replace the call to assert.GreaterOrEqual(t, evt.NewTCount, 0,
"...") with assert.Equal(t, 0, evt.NewTCount, "NewTCount must be 0 after
deleting the single reply") so the test verifies the precise expected decrement
of evt.NewTCount.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 89c19b5e-a792-4c11-aedd-e61baaf1a82e
📒 Files selected for processing (9)
broadcast-worker/handler.gobroadcast-worker/handler_test.gobroadcast-worker/integration_test.gohistory-service/internal/cassrepo/write_integration_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gomessage-worker/handler_test.gomessage-worker/integration_test.gopkg/model/event.go
💤 Files with no reviewable changes (1)
- broadcast-worker/handler_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
- pkg/model/event.go
- message-worker/handler_test.go
- history-service/internal/service/messages_test.go
- broadcast-worker/handler.go
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
broadcast-worker/handler.go (1)
191-199:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPreserve mention metadata in thread-created fan-out.
This path parses mentions only to widen
fanOut, but the emittedRoomEventdrops the mention signal entirely. Unlike the normal created-message paths, it never setsMentionAll,Mentions, or any per-recipient mention flag, so mentioned non-subscribers can receive the thread reply without the metadata clients use for mention badges/highlights.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@broadcast-worker/handler.go` around lines 191 - 199, The thread-created fan-out path builds roomEvt via buildRoomEvent(meta, clientMsg) but never populates mention metadata (MentionAll, Mentions, and per-recipient mention flags) before marshaling, so clients lose mention badges; update the code after building roomEvt (and before json.Marshal) to compute and set the same mention fields used by the normal created-message path: derive MentionAll and Mentions from clientMsg/parsed mention data (and mark per-recipient mention flags when computing fanOut) and assign them onto roomEvt (respecting the existing encJSON/message override logic) so the emitted RoomEvent contains the mention metadata.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@history-service/internal/service/integration_test.go`:
- Around line 401-402: The test currently only checks canonicalEvt.NewTCount is
non-negative; tighten it to assert the exact expected value 0 given the fixture
starts with one reply: after require.NotNil(t, canonicalEvt.NewTCount) replace
the GreaterOrEqual assertion with an equality check (e.g., assert.Equal(t, 0,
*canonicalEvt.NewTCount, "NewTCount should be 0 after deleting the sole reply"))
so the test fails if the decrement did not occur.
In `@notification-worker/handler.go`:
- Around line 42-46: The handler currently treats all EventCreated events as
room-wide notifications, causing actual thread replies (which arrive as
EventCreated with a non-empty evt.Message.ThreadParentMessageID) to notify the
whole room; modify the early filter in the handler to also skip messages that
are thread replies by checking evt.Message.ThreadParentMessageID (and/or the
TShow flag if present) before calling h.members.ListSubscriptions so only
top-level messages use room-wide subscription logic while thread replies follow
the thread-specific path used in broadcast-worker/handler.go.
In `@pkg/subject/subject.go`:
- Around line 164-166: The function MsgCanonicalThreadReply introduces a new
canonical subject shape; change it to use the documented MESSAGES_CANONICAL
family instead—return "chat.msg.canonical.%s.created" (or create a new
MsgCanonicalCreated(siteID) wrapper) and update callers to use the existing
canonical subject builder (e.g., the MESSAGES_CANONICAL constant or the
MsgCanonicalCreated helper) so canonical events stay under
chat.msg.canonical.{siteID}.created/.edited/.deleted; rename or remove
MsgCanonicalThreadReply and replace its usages with the corrected builder to
maintain hierarchical dot-delimited subjects.
---
Outside diff comments:
In `@broadcast-worker/handler.go`:
- Around line 191-199: The thread-created fan-out path builds roomEvt via
buildRoomEvent(meta, clientMsg) but never populates mention metadata
(MentionAll, Mentions, and per-recipient mention flags) before marshaling, so
clients lose mention badges; update the code after building roomEvt (and before
json.Marshal) to compute and set the same mention fields used by the normal
created-message path: derive MentionAll and Mentions from clientMsg/parsed
mention data (and mark per-recipient mention flags when computing fanOut) and
assign them onto roomEvt (respecting the existing encJSON/message override
logic) so the emitted RoomEvent contains the mention metadata.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ff6682db-93d6-4a34-8dc0-2744ab63c787
📒 Files selected for processing (14)
broadcast-worker/handler.gobroadcast-worker/handler_test.gobroadcast-worker/integration_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/service.gomessage-worker/handler.gomessage-worker/handler_test.gonotification-worker/handler.gonotification-worker/handler_test.gopkg/model/event.gopkg/subject/subject.go
🚧 Files skipped from review as they are similar to previous changes (1)
- broadcast-worker/integration_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
message-worker/store_cassandra.go (1)
110-140:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftDon't treat any existing
messages_by_idrow as a completed thread write.If the LWT insert succeeds and the later
thread_messages_by_roominsert orincrementParentTcountfails, JetStream will redeliver this message. On retry, Lines 123-125 return success immediately, so the missing by-room row / parenttcountis never repaired and the message can be acked in a permanently partial state.A fix here needs a completion marker or a repair path on
!appliedthat can safely finish the remaining side effects instead of unconditionally returningnil, nil.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@message-worker/store_cassandra.go` around lines 110 - 140, The LWT branch treats any existing messages_by_id row as fully complete on !applied (variable applied from MapScanCAS) which allows permanent partial writes if later inserts fail; update the handler so that when applied is false you detect whether the side-effects (thread_messages_by_room insert and incrementParentTcount) have completed and, if not, perform the missing work or mark the original row as completed; specifically, add a completion marker column to messages_by_id (or read the row and inspect a completion flag) and, on !applied, query messages_by_id for that flag and either perform the missing thread_messages_by_room insert + call incrementParentTcount (idempotently) or set the completion flag atomically so retries see a finished state; ensure you use the same identifiers (messages_by_id, thread_messages_by_room, incrementParentTcount, MapScanCAS/applied) and make all repair operations idempotent.
🧹 Nitpick comments (5)
room-service/integration_test.go (1)
1870-1874: ⚡ Quick winSeed each subtest independently.
The
last element removed...case only passes because the previous subtest already mutatedsub-1from["t1","t2"]to["t2"]. That makes these assertions order-dependent and brittle. Reinsert or reset the subscription inside each subtest instead of sharing the mutated document.As per coding guidelines, "Each test must be fully independent — no shared mutable state between tests; never rely on test execution order."
Also applies to: 1881-1885
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@room-service/integration_test.go` around lines 1870 - 1874, The failing tests share mutable state: before calling store.UpdateSubscriptionThreadRead in the "removes specified threadID and returns remaining" subtest (and the sibling "last element removed..." subtest), reseed or recreate the subscription document so each subtest starts from the same initial unread list (e.g., recreate subscription "sub-1" with UnreadThreads ["t1","t2"] for user "alice" in the test setup), or call the existing test helper that inserts the subscription before invoking UpdateSubscriptionThreadRead; ensure each t.Run block independently seeds the store (or uses a fresh store/session) so tests do not rely on prior subtest mutations.notification-worker/handler.go (1)
20-23: ⚡ Quick winRename
ThreadSubscriberLookupto match the single-method interface rule.This new interface is single-method, so it should use the repo's
-ernaming convention instead of*Lookup. Renaming it now avoids spreading the nonstandard name through the worker.As per coding guidelines, "Interfaces must use the
-ersuffix for single-method interfaces and<Domain>Storepattern for store interfaces".Also applies to: 32-38
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@notification-worker/handler.go` around lines 20 - 23, The interface ThreadSubscriberLookup is a single-method interface and should be renamed to follow the repo "-er" convention (e.g., ThreadSubscriptionLister or ThreadSubscriberLister); update the interface name and keep the method signature ListThreadSubscriptions(ctx context.Context, parentMessageID, siteID string) ([]model.ThreadSubscription, error). Replace all usages and references (including the other occurrence that mirrors this interface) to the new name and update any implementing types/functions so they satisfy the renamed interface (adjust imports/tests/constructors where the old type name was used).room-service/handler_test.go (1)
3668-3672: ⚡ Quick winDon't allow write expectations on the
GetUserSiteIDfailure path.
handleMessageThreadReadreturns before the write phase whenGetUserSiteIDfails, so these.AnyTimes()expectations make the test pass even if a future refactor starts mutating state too early. Leaving both methods un-expected here gives you a stronger regression check.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@room-service/handler_test.go` around lines 3668 - 3672, Remove the permissive write expectations on the GetUserSiteID failure path so the test fails if writes occur: in the test around handleMessageThreadRead remove the f.store.EXPECT().UpdateSubscriptionThreadRead(...).Return(...).AnyTimes() and f.store.EXPECT().UpdateThreadSubscriptionRead(...).Return(...).AnyTimes() lines (or replace them with explicit .Times(0) if you prefer), leaving no positive expectations for UpdateSubscriptionThreadRead and UpdateThreadSubscriptionRead when GetUserSiteID is supposed to fail.notification-worker/main.go (1)
64-76: ⚡ Quick winWrap the Mongo lookup errors with query context.
Findandcursor.Allcurrently return bare Mongo errors here, which makes failures hard to attribute once they hit worker logs. Please wrap them with the parent message/site context in this helper too.Suggested fix
func (m *mongoThreadSubLookup) ListThreadSubscriptions(ctx context.Context, parentMessageID, siteID string) ([]model.ThreadSubscription, error) { filter := map[string]string{"parentMessageId": parentMessageID, "siteId": siteID} cursor, err := m.col.Find(ctx, filter) if err != nil { - return nil, err + return nil, fmt.Errorf("find thread subscriptions for parent %q site %q: %w", parentMessageID, siteID, err) } defer cursor.Close(ctx) var subs []model.ThreadSubscription if err := cursor.All(ctx, &subs); err != nil { - return nil, err + return nil, fmt.Errorf("decode thread subscriptions for parent %q site %q: %w", parentMessageID, siteID, err) } return subs, nil }As per coding guidelines, "Always wrap errors with context using
fmt.Errorf("short description: %w", err)describing what the current function was doing".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@notification-worker/main.go` around lines 64 - 76, In ListThreadSubscriptions, wrap errors returned by m.col.Find and cursor.All with contextual messages including parentMessageID and siteID using fmt.Errorf("...: %w", err) so logs show the query context; update the error returns for the Find call to something like fmt.Errorf("find thread subscriptions parent=%s site=%s: %w", parentMessageID, siteID, err) and similarly wrap the cursor.All error with a message like fmt.Errorf("decode thread subscriptions parent=%s site=%s: %w", parentMessageID, siteID, err).notification-worker/handler_test.go (1)
57-58: ⚡ Quick winMake the thread-subscriber stub assert
siteID.The production lookup filters on both
parentMessageIDandsiteID, but this stub ignores the site completely. That means these tests still pass ifHandleMessagesends the wrong site. Key the stub on both fields, or fail whensiteIDis unexpected.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@notification-worker/handler_test.go` around lines 57 - 58, The stub ListThreadSubscriptions currently ignores siteID (in stubThreadSubLookup.ListThreadSubscriptions) causing tests to miss incorrect site values; update the stub to either key subscriptions by a composite key of parentMsgID+siteID (e.g., map[string][]model.ThreadSubscription using parentMsgID+"|"+siteID) or add an assertion that the passed siteID matches the expected value stored on the stub and return an error or fail the test when it doesn't; modify stubThreadSubLookup (its subs map and any setup in tests) so Lookups require both parentMsgID and siteID to match.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@message-worker/store_cassandra.go`:
- Around line 128-136: The INSERT into thread_messages_by_room is missing the
tshow column/value, causing msg.TShow to be dropped; update the Query in
store_cassandra.go (the s.cassSession.Query call that inserts into
thread_messages_by_room) to include the tshow column in the column list and pass
msg.TShow as the corresponding value in the VALUES arguments (mirror what
messages_by_id write does) so readers of thread_messages_by_room see the correct
TShow.
---
Outside diff comments:
In `@message-worker/store_cassandra.go`:
- Around line 110-140: The LWT branch treats any existing messages_by_id row as
fully complete on !applied (variable applied from MapScanCAS) which allows
permanent partial writes if later inserts fail; update the handler so that when
applied is false you detect whether the side-effects (thread_messages_by_room
insert and incrementParentTcount) have completed and, if not, perform the
missing work or mark the original row as completed; specifically, add a
completion marker column to messages_by_id (or read the row and inspect a
completion flag) and, on !applied, query messages_by_id for that flag and either
perform the missing thread_messages_by_room insert + call incrementParentTcount
(idempotently) or set the completion flag atomically so retries see a finished
state; ensure you use the same identifiers (messages_by_id,
thread_messages_by_room, incrementParentTcount, MapScanCAS/applied) and make all
repair operations idempotent.
---
Nitpick comments:
In `@notification-worker/handler_test.go`:
- Around line 57-58: The stub ListThreadSubscriptions currently ignores siteID
(in stubThreadSubLookup.ListThreadSubscriptions) causing tests to miss incorrect
site values; update the stub to either key subscriptions by a composite key of
parentMsgID+siteID (e.g., map[string][]model.ThreadSubscription using
parentMsgID+"|"+siteID) or add an assertion that the passed siteID matches the
expected value stored on the stub and return an error or fail the test when it
doesn't; modify stubThreadSubLookup (its subs map and any setup in tests) so
Lookups require both parentMsgID and siteID to match.
In `@notification-worker/handler.go`:
- Around line 20-23: The interface ThreadSubscriberLookup is a single-method
interface and should be renamed to follow the repo "-er" convention (e.g.,
ThreadSubscriptionLister or ThreadSubscriberLister); update the interface name
and keep the method signature ListThreadSubscriptions(ctx context.Context,
parentMessageID, siteID string) ([]model.ThreadSubscription, error). Replace all
usages and references (including the other occurrence that mirrors this
interface) to the new name and update any implementing types/functions so they
satisfy the renamed interface (adjust imports/tests/constructors where the old
type name was used).
In `@notification-worker/main.go`:
- Around line 64-76: In ListThreadSubscriptions, wrap errors returned by
m.col.Find and cursor.All with contextual messages including parentMessageID and
siteID using fmt.Errorf("...: %w", err) so logs show the query context; update
the error returns for the Find call to something like fmt.Errorf("find thread
subscriptions parent=%s site=%s: %w", parentMessageID, siteID, err) and
similarly wrap the cursor.All error with a message like fmt.Errorf("decode
thread subscriptions parent=%s site=%s: %w", parentMessageID, siteID, err).
In `@room-service/handler_test.go`:
- Around line 3668-3672: Remove the permissive write expectations on the
GetUserSiteID failure path so the test fails if writes occur: in the test around
handleMessageThreadRead remove the
f.store.EXPECT().UpdateSubscriptionThreadRead(...).Return(...).AnyTimes() and
f.store.EXPECT().UpdateThreadSubscriptionRead(...).Return(...).AnyTimes() lines
(or replace them with explicit .Times(0) if you prefer), leaving no positive
expectations for UpdateSubscriptionThreadRead and UpdateThreadSubscriptionRead
when GetUserSiteID is supposed to fail.
In `@room-service/integration_test.go`:
- Around line 1870-1874: The failing tests share mutable state: before calling
store.UpdateSubscriptionThreadRead in the "removes specified threadID and
returns remaining" subtest (and the sibling "last element removed..." subtest),
reseed or recreate the subscription document so each subtest starts from the
same initial unread list (e.g., recreate subscription "sub-1" with UnreadThreads
["t1","t2"] for user "alice" in the test setup), or call the existing test
helper that inserts the subscription before invoking
UpdateSubscriptionThreadRead; ensure each t.Run block independently seeds the
store (or uses a fresh store/session) so tests do not rely on prior subtest
mutations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7b12dad2-7d47-4fe2-be22-61ea82e4cd3e
📒 Files selected for processing (13)
history-service/internal/service/messages.gohistory-service/internal/service/messages_test.gomessage-worker/integration_test.gomessage-worker/store_cassandra.gonotification-worker/handler.gonotification-worker/handler_test.gonotification-worker/main.goroom-service/handler.goroom-service/handler_test.goroom-service/integration_test.goroom-service/mock_store_test.goroom-service/store.goroom-service/store_mongo.go
✅ Files skipped from review due to trivial changes (1)
- room-service/mock_store_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- history-service/internal/service/messages.go
- history-service/internal/service/messages_test.go
…of-scope work All notification-worker changes are dropped from this PR — a separate engineer owns that service. The handler is restored to its main-branch state. Two artifacts document what still needs to be built there: - notification-worker/handler.go: NOT COVERED comment on HandleMessage listing the three things required (EventCreated filter, thread-subscriber routing, @-mention fan-out via EventThreadReplyAdded) - docs/thread-reply-notifications.md: full design notes with file references for whoever picks up the work https://claude.ai/code/session_0198s2kLNZM8cUVw5JLW28BK
Mark all plan tasks [x] done. Add "Implementation Notes" to the design spec covering the six additions that went beyond the original scope (DM thread handling, EventThreadReplyAdded badge pipeline, thread delete badge + mention fan-out, message-worker tcount publish, history-service tcount CAS + EventDeleted.Content). Documents notification-worker as intentionally out of scope and points to docs/thread-reply-notifications.md. https://claude.ai/code/session_0198s2kLNZM8cUVw5JLW28BK
…t-review fixes Spec: adds 8 missing broadcast-worker entries to the Implementation Notes block covering the 3 post-review correctness fixes (evt.Timestamp propagation, TShow=true badge on delete, history-service tcount best-effort), plus the simplification and defensive changes from the /simplify and /code-review passes (shouldUseThreadFanOut rename, buildEditRoomEvent/buildDeleteRoomEvent helpers, publishThreadBadge, default-branch return nil removal, nil guard). Plan: adds a note in the status block pointing to a new "Post-Plan Fixes and Refactoring" section that documents all post-plan commits with their rationale, so the plan is a complete record of everything that landed in PR #245. https://claude.ai/code/session_0198s2kLNZM8cUVw5JLW28BK
ca735d2 to
00a6434
Compare
2d51754 to
3f85333
Compare
- room-worker/handler.go: fix rangeValCopy lint (iterate by index) - message-worker/integration_test.go: SaveThreadMessage returns (*int, error); assign both values instead of single-value require.NoError - broadcast-worker/integration_test.go: MongoDB driver v2 decodes nested BSON documents as bson.D, not bson.M; fix EnsureIndexes index verification to use []bson.D iteration (matches room-service pattern) - inbox-worker/handler.go: add UpdateSubscriptionNamesForRoom and ApplySubscriptionVisibility to InboxStore interface; implement handleRoomRenamed and handleRoomVisibilityChanged; wire room_renamed and room_restricted cases in HandleEvent switch - inbox-worker/handler_test.go: add stub methods for new interface methods - room-worker/integration_test.go: fix TestIntegration_ProcessRoomRename seed — carol's subscription must use SiteID=remoteSite so outbox fan-out to site-b fires correctly https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
e61905b to
bc475d8
Compare
|
@coderabbitai review Generated by Claude Code |
|
🧠 Learnings used✅ Action performedReview finished.
|
|
@coderabbitai review Generated by Claude Code |
|
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/client-api.md`:
- Around line 616-628: The example JSON for the "Rename Room" error in the
Rename Room section uses only an "error" field but must follow the canonical
Error envelope; update the example to include the required "code" property (and
add "reason" when applicable) so it matches the documented envelope referenced
by "Error envelope" (see the error list for the `"rename is only allowed in
channel rooms"` case) and ensure the example keys/order match other examples in
the document.
- Around line 936-1033: The file contains verbatim duplicated API sections
("Toggle Favorite", "Pin Message", "Unpin Message", "List Pinned Messages")
which risk contract drift; remove the duplicate blocks and keep a single
canonical copy for each named section. Locate the duplicate headings by the
exact section titles and ensure you preserve the complete content from one copy
(subjects, request/response tables, error cases, triggered events, and
cross-site behaviour) while deleting the other identical copy, then run a quick
search to confirm no other verbatim duplicates remain and adjust the table of
contents or internal anchors if necessary so links still resolve to the single
retained section.
- Around line 3137-3147: Update the documentation to make the `code` cardinality
consistent: pick whether `code` is one of 7 or one of 8 categories, then change
the conflicting textual occurrences so both state the same number and ensure the
enumerated list of categories exactly matches that number; update the sentence
that currently reads `"code": "<one of 8 generic categories>"` and the later
sentence that reads `One of the 7 categories below.` to the chosen cardinality
and verify the `code` enum in the specification (the list under the `code`
field) contains the same count.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 14e59bbc-2a5c-4a9c-86c1-c365739aad67
📒 Files selected for processing (34)
.gitignorebroadcast-worker/handler.gobroadcast-worker/handler_test.gobroadcast-worker/integration_test.gobroadcast-worker/main.gobroadcast-worker/mock_store_test.gobroadcast-worker/store.gobroadcast-worker/store_mongo.godocs/client-api.mddocs/superpowers/plans/2026-05-28-broadcast-worker-thread-handling.mddocs/superpowers/plans/2026-06-04-tcount-count-based.mddocs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.mddocs/thread-reply-notifications.mdhistory-service/internal/cassrepo/write.gohistory-service/internal/cassrepo/write_integration_test.gohistory-service/internal/publisher/publisher.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/service.goinbox-worker/handler.goinbox-worker/handler_test.gomessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_cassandra.gomessage-worker/store_cassandra_test.gopkg/model/event.gopkg/model/model_test.goroom-service/handler.goroom-service/handler_test.go
💤 Files with no reviewable changes (24)
- docs/thread-reply-notifications.md
- docs/superpowers/plans/2026-06-04-tcount-count-based.md
- history-service/internal/publisher/publisher.go
- message-worker/store.go
- message-worker/mock_store_test.go
- pkg/model/model_test.go
- message-worker/store_cassandra_test.go
- history-service/internal/service/mocks/mock_repository.go
- message-worker/handler.go
- inbox-worker/handler.go
- history-service/internal/service/messages.go
- history-service/internal/service/service.go
- history-service/internal/service/integration_test.go
- pkg/model/event.go
- docs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.md
- room-service/handler.go
- inbox-worker/handler_test.go
- room-service/handler_test.go
- history-service/internal/cassrepo/write_integration_test.go
- message-worker/handler_test.go
- message-worker/integration_test.go
- message-worker/store_cassandra.go
- history-service/internal/service/messages_test.go
- history-service/internal/cassrepo/write.go
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
- broadcast-worker/main.go
There was a problem hiding this comment.
Caution
Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/client-api.md`:
- Around line 616-628: The example JSON for the "Rename Room" error in the
Rename Room section uses only an "error" field but must follow the canonical
Error envelope; update the example to include the required "code" property (and
add "reason" when applicable) so it matches the documented envelope referenced
by "Error envelope" (see the error list for the `"rename is only allowed in
channel rooms"` case) and ensure the example keys/order match other examples in
the document.
- Around line 936-1033: The file contains verbatim duplicated API sections
("Toggle Favorite", "Pin Message", "Unpin Message", "List Pinned Messages")
which risk contract drift; remove the duplicate blocks and keep a single
canonical copy for each named section. Locate the duplicate headings by the
exact section titles and ensure you preserve the complete content from one copy
(subjects, request/response tables, error cases, triggered events, and
cross-site behaviour) while deleting the other identical copy, then run a quick
search to confirm no other verbatim duplicates remain and adjust the table of
contents or internal anchors if necessary so links still resolve to the single
retained section.
- Around line 3137-3147: Update the documentation to make the `code` cardinality
consistent: pick whether `code` is one of 7 or one of 8 categories, then change
the conflicting textual occurrences so both state the same number and ensure the
enumerated list of categories exactly matches that number; update the sentence
that currently reads `"code": "<one of 8 generic categories>"` and the later
sentence that reads `One of the 7 categories below.` to the chosen cardinality
and verify the `code` enum in the specification (the list under the `code`
field) contains the same count.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 14e59bbc-2a5c-4a9c-86c1-c365739aad67
📒 Files selected for processing (34)
.gitignorebroadcast-worker/handler.gobroadcast-worker/handler_test.gobroadcast-worker/integration_test.gobroadcast-worker/main.gobroadcast-worker/mock_store_test.gobroadcast-worker/store.gobroadcast-worker/store_mongo.godocs/client-api.mddocs/superpowers/plans/2026-05-28-broadcast-worker-thread-handling.mddocs/superpowers/plans/2026-06-04-tcount-count-based.mddocs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.mddocs/thread-reply-notifications.mdhistory-service/internal/cassrepo/write.gohistory-service/internal/cassrepo/write_integration_test.gohistory-service/internal/publisher/publisher.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/service.goinbox-worker/handler.goinbox-worker/handler_test.gomessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_cassandra.gomessage-worker/store_cassandra_test.gopkg/model/event.gopkg/model/model_test.goroom-service/handler.goroom-service/handler_test.go
💤 Files with no reviewable changes (24)
- docs/thread-reply-notifications.md
- docs/superpowers/plans/2026-06-04-tcount-count-based.md
- history-service/internal/publisher/publisher.go
- message-worker/store.go
- message-worker/mock_store_test.go
- pkg/model/model_test.go
- message-worker/store_cassandra_test.go
- history-service/internal/service/mocks/mock_repository.go
- message-worker/handler.go
- inbox-worker/handler.go
- history-service/internal/service/messages.go
- history-service/internal/service/service.go
- history-service/internal/service/integration_test.go
- pkg/model/event.go
- docs/superpowers/specs/2026-05-28-broadcast-worker-thread-handling-design.md
- room-service/handler.go
- inbox-worker/handler_test.go
- room-service/handler_test.go
- history-service/internal/cassrepo/write_integration_test.go
- message-worker/handler_test.go
- message-worker/integration_test.go
- message-worker/store_cassandra.go
- history-service/internal/service/messages_test.go
- history-service/internal/cassrepo/write.go
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (1)
- broadcast-worker/main.go
🛑 Comments failed to post (3)
docs/client-api.md (3)
616-628:
⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAlign Rename Room error example with the canonical error envelope.
The example
{ "error": "rename is only allowed in channel rooms" }omitscode, but this document statescodeis always present for client-facing errors. The example should includecode(andreasonif applicable).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/client-api.md` around lines 616 - 628, The example JSON for the "Rename Room" error in the Rename Room section uses only an "error" field but must follow the canonical Error envelope; update the example to include the required "code" property (and add "reason" when applicable) so it matches the documented envelope referenced by "Error envelope" (see the error list for the `"rename is only allowed in channel rooms"` case) and ensure the example keys/order match other examples in the document.
936-1033:
⚠️ Potential issue | 🟠 Major | ⚡ Quick winRemove duplicated API sections to avoid contract drift.
Toggle Favorite,Pin Message,Unpin Message, andList Pinned Messagesare duplicated verbatim in this file. This creates two sources of truth and increases the chance of clients implementing stale behavior.Also applies to: 2073-2357
🧰 Tools
🪛 markdownlint-cli2 (0.22.1)
[warning] 986-986: Multiple headings with the same content
(MD024, no-duplicate-heading)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/client-api.md` around lines 936 - 1033, The file contains verbatim duplicated API sections ("Toggle Favorite", "Pin Message", "Unpin Message", "List Pinned Messages") which risk contract drift; remove the duplicate blocks and keep a single canonical copy for each named section. Locate the duplicate headings by the exact section titles and ensure you preserve the complete content from one copy (subjects, request/response tables, error cases, triggered events, and cross-site behaviour) while deleting the other identical copy, then run a quick search to confirm no other verbatim duplicates remain and adjust the table of contents or internal anchors if necessary so links still resolve to the single retained section.
3137-3147:
⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix contradictory error-code cardinality in the envelope spec.
Line 3137 says
codeis “one of 8 generic categories,” while Line 3146 says “one of the 7 categories below.” This should be consistent to keep the contract unambiguous.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/client-api.md` around lines 3137 - 3147, Update the documentation to make the `code` cardinality consistent: pick whether `code` is one of 7 or one of 8 categories, then change the conflicting textual occurrences so both state the same number and ensure the enumerated list of categories exactly matches that number; update the sentence that currently reads `"code": "<one of 8 generic categories>"` and the later sentence that reads `One of the 7 categories below.` to the chosen cardinality and verify the `code` enum in the specification (the list under the `code` field) contains the same count.
|
@coderabbitai review Generated by Claude Code |
|
🧠 Learnings used✅ Action performedReview finished.
|
5fe6146 to
155e097
Compare
| case model.RoomTypeDM, model.RoomTypeBotDM: | ||
| // DM thread replies are visible to all members, so @-mention badges are correct. | ||
| if len(resolved.Accounts) > 0 { | ||
| if err := h.store.SetSubscriptionMentions(ctx, meta.ID, resolved.Accounts); err != nil { |
There was a problem hiding this comment.
Might need to double check whether thread msg in a DM will make frontend sidebar menu room name bold with red dot badge
| return fmt.Errorf("set subscription mentions: %w", err) | ||
| } | ||
| } | ||
| if err := h.store.UpdateRoomLastMessage(ctx, msg.RoomID, msg.ID, msg.CreatedAt, resolved.MentionAll); err != nil { |
There was a problem hiding this comment.
Also need to check whether this aligns with the current behavior
| return nil | ||
| } | ||
| if h.encrypt { | ||
| if err := h.encryptEditedContent(ctx, room.ID, &edit); err != nil { |
There was a problem hiding this comment.
Thread msg broadcast is sending to each user subject, so we don't need to encrypt the content
| if len(resolved.Participants) > 0 { | ||
| roomEvt.Mentions = resolved.Participants | ||
| } | ||
| if err := h.encryptRoomEvent(ctx, meta.ID, clientMsg, &roomEvt); err != nil { |
There was a problem hiding this comment.
Thread msg is broadcasted to each recipient subject, so we don't need to encrypt the content
| RoomID: room.ID, | ||
| SiteID: room.SiteID, | ||
| Timestamp: time.Now().UTC().UnixMilli(), | ||
| Timestamp: evt.Timestamp, |
There was a problem hiding this comment.
This Timestamp is meant for tracking the actual broadcast worker publish time. For example, in case of Jetstream redelivery, we can find out the actual publish time by broadcast worker in frontend.
We could have another field for triggered message event time.
| siteEvtData, _ := json.Marshal(siteEvt) | ||
| outbox := model.OutboxEvent{ | ||
| Type: model.OutboxMemberAdded, SiteID: room.SiteID, DestSiteID: destSiteID, | ||
| Type: "member_added", SiteID: room.SiteID, DestSiteID: destSiteID, |
| payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp) | ||
| dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed) | ||
| if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), outboxData, dedupID); err != nil { | ||
| if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), outboxData, dedupID); err != nil { |
| continue | ||
| } | ||
| h.publishSubscriptionUpdate(ctx, sub.User.Account, data) | ||
| if err := h.publish(ctx, subject.SubscriptionUpdate(sub.User.Account), data, ""); err != nil { |
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("marshal rename outbox payload: %w", err) | ||
| } |
| // Thread-reply badge events carry only the parent's tcount, not a | ||
| // searchable document — skip them before the document-shape guards below, | ||
| // which would otherwise reject the sparse Message as a hard error. | ||
| if evt.Event == model.EventThreadReplyAdded { |
There was a problem hiding this comment.
Like previous comment, we should publish to another subject for tcount event instead of using message canonical Jetstream
155e097 to
0d9a731
Compare
Adds end-to-end thread reply support across the chat pipeline: - broadcast-worker: fan out thread reply create/edit/delete events to thread followers (replyAccounts) and @-mentions; fan out tcount badge via handleThreadTCountUpdated. DM/BotDM replies go to all human members. - message-worker: persist thread replies with IF NOT EXISTS LWT + MapScanCAS idempotency; publish EventThreadReplyAdded with tcount from countAndSetParentTcount (COUNT-based, crash-safe, idempotent on redelivery). - history-service: edit/delete events carry ThreadParentMessageID + TShow; delete path recomputes tcount via countAndSetParentTcount (COUNT → blind SET). - search-sync-worker: skip EventThreadReplyAdded events (no searchable doc). - room-service: UpdateSubscriptionThreadRead made atomic (returns new array + alert in a single MongoDB pipeline update). - pkg/model: EventThreadReplyAdded, NewTCount, ThreadMetadataUpdatedEvent. - pkg/subject: badge events travel on MsgCanonicalCreated (.created subject). - docs: tcount COUNT-based implementation plan + client-api updates. tcount approach: replaces CAS increment/decrement with a full partition scan of thread_messages_by_thread (COUNT non-deleted rows → blind SET on parent rows). Eliminates 2PC crash window — any JetStream redelivery re-COUNTs and re-SETs, converging to the correct value. Known cost: O(N) scan per event. Follow-up PR will replace with a Cassandra COUNTER table + reconciliation job (see docs/superpowers/plans/2026-06-04-tcount-count-based.md §"Known Trade-offs and Future Work"). https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
The PR accidentally deleted the publishSubscriptionUpdate helper, inlined its three call sites, replaced model.OutboxMemberAdded constants with raw strings, deleted findRemoteSitesForAccounts, and replaced the full processRoomRename implementation with a stripped-down version that lost async-job-result publishing and request-ID validation. None of these changes are part of the thread-reply pipeline. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…ollowers parentMessageId is the unique key in thread_rooms — one document per parent message — so the siteId filter was redundant. Removing it simplifies the query, the store interface, and all callers. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
Thread replies are published per-user to chat.user.{account}.room.event
subjects — each message goes to exactly one user's mailbox, so there is
no shared stream to protect. Encrypting per-user subjects adds overhead
with no security benefit.
Encryption is retained for the shared room channel stream
(chat.room.{roomID}.event) where all members subscribe.
https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…lishes fail Previously errgroup returned the first error, causing JetStream to redeliver to all accounts including those who already received the event, producing duplicate thread replies on the frontend. Now partial failure is tolerated: only return an error (triggering redelivery) when every single publish failed. Individual failures are logged but do not cause redelivery for successful recipients. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
4ff3363 to
a950796
Compare
- history-service/internal/service/integration_test.go:349 — TestDeleteMessage_Integration_ThreadReplyPublishesMetadataEvent called New() with 6 args; function now requires 8 (UserStore + CustomEmojiStore were added). Add nil, nil for the two store params. - history-service/internal/cassrepo/reactions_integration_test.go — TestRepository_AddReaction_Pinned and TestRepository_RemoveReaction_Pinned inserted pinned_at into messages_by_room, which lacks that column per the Cassandra model doc. Remove the column from the INSERT statements. - broadcast-worker/integration_test.go — TestBroadcastWorker_GetThreadFollowers called GetThreadFollowers with a siteID arg after commit 37b55b7 removed it from the Store interface. Update calls to 1-arg form and replace the siteId-isolation subtest (not applicable per-site deployment model) with a distinct-parentMessageId subtest.
…m test INSERTs/SELECTs TestRepository_AddReaction_Pinned and TestRepository_RemoveReaction_Pinned were inserting into pinned_messages_by_room using column name 'created_at' instead of 'pinned_at' (the actual clustering key), and the verification SELECTs had the same wrong column in the WHERE clause. https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
…parent lookup and re-publish tcount is persisted durably by countAndSetParentTcount on the first delete. Re-publishing EventDeleted on retry adds unnecessary Cassandra reads and failure modes (parent-lookup error → retry loop) without any benefit. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
JetStream MsgID dedup prevents double-delivery at the consumer level, so IF NOT EXISTS was adding 5-10x Paxos overhead for no benefit. tcount is derived from a COUNT + blind SET via countAndSetParentTcount, which is idempotent on redelivery without any CAS. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…ThreadRead with app-side logic Use $pull to atomically remove the threadID, then check in Go whether threadUnread is now empty. If empty, a second UpdateOne clears alert and unsets the field. Avoids aggregation pipeline CPU overhead on MongoDB. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…amp for canonical event time Timestamp on each room event struct now records when broadcast-worker publishes the event, enabling clients to detect JetStream redeliveries. EventTimestamp carries the original canonical event time from message-worker for correlation. ReactRoomEvent already used publish time; all other event types (RoomEvent, EditRoomEvent, DeleteRoomEvent, PinRoomEvent, UnpinRoomEvent, ThreadMetadataUpdatedEvent) are updated. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…rver-broadcast
Publishing EventThreadReplyAdded back to MESSAGES_CANONICAL polluted the
message CRUD event store with badge events and required a skip guard in
message-worker to avoid reprocessing its own publishes.
New flow: message-worker publishes via core NATS on
chat.server.broadcast.{siteID}.thread.tcount; broadcast-worker subscribes
to chat.server.broadcast.{siteID}.> with a queue group and handles the
event via HandleServerBroadcast. Badge events are best-effort
(fire-and-forget, no JetStream durability needed).
https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
…cast events timestamp = when broadcast-worker published the event (allows clients to detect JetStream redeliveries). eventTimestamp = when message-worker published the canonical event (source-of-truth time for correlation). Updated DeleteRoomEvent and ThreadMetadataUpdatedEvent tables. https://claude.ai/code/session_01LjGg9QJU7QVZFU6pXXFbgb
New commits added EventTimestamp to DeleteRoomEvent, PinRoomEvent, UnpinRoomEvent, and ReactRoomEvent but left older fields misaligned. goimports now requires all fields in a struct to be column-aligned. https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
Summary
Adds real-time thread reply support to the chat system. The headline change is in broadcast-worker, which fans out thread reply create/edit/delete events — and the parent message's reply-count badge (
tcount) — to the right audience in real time.Although the feature centers on broadcast-worker, thread support is inherently an end-to-end pipeline: broadcast-worker can only fan out the reply-count badge — the number itself must be produced upstream (in Cassandra, by message-worker / history-service) and travel through the
MESSAGES_CANONICALevent stream to reach it. That is why this PR touches several services. Each one is a station on the same pipeline.Data flow (why each service is involved)
pkg/modelandpkg/subjectdefine the shared vocabulary (event types, theNewTCountfield, the canonical subjects) that every service on the pipeline speaks.What each service does — and why it's required
handleThreadCreated/Updated/Deleted+handleThreadTCountUpdated, thread fan-out viaGetThreadFollowers(thread_rooms.replyAccounts), badge fan-outEventThreadReplyAdded,NewTCount,ThreadMetadataUpdatedEventMsgCanonicalCreated(.created) withEventThreadReplyAddeddiscriminator in payload — no new subject namespace introduced.chat.msg.canonical.{siteID}.*family.SaveThreadMessagereturns(*int, error)(new tcount);saveThreadMessageEncrypted(encrypted path with same LWT pattern);publishThreadReplyEvent; CassandraIF NOT EXISTSLWT guarded byMapScanCAS;AddReplyAccountsfor fan-out tracking;countAndSetParentTcount(COUNT-based, idempotent)ThreadParentMessageID+TShow; delete recomputes tcount viacountAndSetParentTcount(COUNT → blind SET, idempotent on redelivery)EventThreadReplyAddedinBuildActionMESSAGES_CANONICALstream search-sync consumes but carry no searchable document — they must be skipped or they'd error / corrupt the index.UpdateSubscriptionThreadReadmade atomic (returns resulting array + alert)docs/thread-reply-notifications.md).Implementation notes
ThreadParentMessageID+TShow=false.TShow=truereplies still flow through the normal room broadcast path.thread_rooms.replyAccounts— populated by message-worker on every reply with the parent author, replier, and @-mentioned accounts viaAddReplyAccounts(MongoDB$addToSet, idempotent). broadcast-worker reads this with a singleFindOneprojection (GetThreadFollowers), matching the pattern PR feat(notification-worker): cache + mobile push overhaul #237 introduced in notification-worker. The oldthread_subscriptionscursor scan (ListThreadSubscriptions) has been removed.replyAccounts∪ @-mentioned-in-reply accounts, with sender and bot accounts excluded, deduplicated. Channel replies go to that set; DM/BotDM replies go to all human members.EventThreadReplyAdded) travel overchat.msg.canonical.{siteID}.created— the same subject as new messages. Theeventdiscriminator field in the JSON payload distinguishes them.EventThreadReplyDeleted(never published) andMsgCanonicalThreadReply(non-standard subject) were removed as dead code.NewTCountis derived by counting non-deleted rows inthread_messages_by_thread(countThreadReplies) — clients set the badge directly, no client-side delta math. The COUNT-based approach makes every tcount write idempotent on JetStream redelivery.messages_by_idINSERT usesIF NOT EXISTS(LWT). When not applied (row exists from a prior delivery), Cassandra returns[applied]=falseplus all existing row columns.ScanCAS()with no scan destinations fails with "not enough columns to scan into" in this case; the fix isMapScanCAS(map)which absorbs all returned columns. After the LWT (whether applied or not),countAndSetParentTcountruns unconditionally: it COUNTs the non-deleted rows inthread_messages_by_threadand blindly SETstcounton the parent row in bothmessages_by_idandmessages_by_room. Because COUNT gives the ground truth and SET overwrites unconditionally, redelivery always converges to the correct value. The same LWT + MapScanCAS pattern is applied tosaveThreadMessageEncrypted.saveThreadMessageEncryptedmirrorsSaveThreadMessagewithat-restcipher: body fields (msg,attachments,card,card_action,file) are replaced withenc_payload/enc_meta;quoted_parent_messageUDT body is stripped before storage. Both the LWT INSERT intomessages_by_idand the plain INSERT intothread_messages_by_threadfollow the same idempotency contract.publishThreadReplyEvent(message-worker) andpublishToThreadAccounts(broadcast-worker) both return their error, and the history-service already-deleted retry returns an error rather than publishing aNewTCount=nilevent that would permanently drop the badge update. BecausecountAndSetParentTcountis idempotent, redelivery is safe.Known limitations (accepted for this PR)
replyAccounts— only the parent author, repliers, and reply-mentionees are. They won't receive thread events. Subscribing them would require carrying the parent's mention list onto the reply event.SaveThreadMessagewrite-then-publish is not atomic. A crash between the Cassandra LWT INSERT succeeding andcountAndSetParentTcountcompleting leaves tcount un-SET for this delivery; the subsequent JetStream redelivery runscountAndSetParentTcountagain with the same COUNT result and the correct tcount is SET. The badge event may be delayed by one redelivery cycle.countThreadRepliesperforms a full scan ofthread_messages_by_threadto count non-deleted rows — O(N) per add/delete event. This is the deliberate design choice that achieves crash-safety (COUNT gives ground truth at SET time; no 2PC needed). A follow-up PR will replace this with a Cassandra COUNTER table + periodic reconciliation job, making the hot path O(1). Seedocs/superpowers/plans/2026-06-04-tcount-count-based.md§ "Known Trade-offs and Future Work" for the detailed plan.Scope note for reviewers
The reply-broadcast + badge pipeline (broadcast-worker, message-worker, history-service, pkg/model, pkg/subject, search-sync-worker) is one indivisible unit — the badge cannot be fanned out unless it is produced upstream and the shared-stream consumers cooperate.
The room-service
UpdateSubscriptionThreadReadchange is the most separable piece: it's the thread read-state side and complements, rather than enables, broadcast-worker's fan-out. It could be split into its own PR if a tighter scope is preferred.Post-review changes (addressed in this PR)
All 7 CodeRabbit findings have been resolved:
.thread.replycanonical subjectMsgCanonicalThreadReplyandEventThreadReplyDeleted; badge events now use.created+EventThreadReplyAddeddiscriminatorthread_subscriptionsscan in broadcast-workerGetThreadFollowersreadingthread_rooms.replyAccounts(singleFindOneprojection, same pattern as PR #237 notification-worker)tshowinthread_messages_by_roomINSERTtshowcolumn and value to the Cassandra INSERTmessages_by_roommirror update best-effort inSoftDeleteMessagedocs/thread-reply-notifications.md)request_idinslog.Warn>= 0assertion in integration test== 0(single seeded reply must decrement to exactly zero)Additional changes made after initial review:
incrementParentTcount/readParentTcount(CAS increment + separate read) withcountAndSetParentTcount(COUNT non-deleted rows → blind SET). Eliminates the 2PC crash window: any JetStream redelivery runs the same COUNT + blind SET and converges to the correct value. Applied in both message-worker (add-path) and history-service (delete-path). Known cost: O(N) partition scan — see Known Limitations above.ScanCAS()withMapScanCAS(map)in bothSaveThreadMessageandsaveThreadMessageEncrypted.IF NOT EXISTSreturns all existing row columns when not applied;ScanCAS()with no destinations fails with "not enough columns to scan into".MapScanCASabsorbs all returned columns correctly.saveThreadMessageEncrypted— the cipher-enabled counterpart toSaveThreadMessage— with the same LWT + MapScanCAS idempotency contract. Bothmessages_by_idandthread_messages_by_threadINSERTs use the encrypted form whenat-restcipher is configured.SaveThreadMessagereturn typeerrorto(*int, error)— returns the new tcount fromcountAndSetParentTcounton every delivery (nil when parent tcount is unavailable, e.g.ThreadParentMessageCreatedAtnot set).AddReplyAccountsThreadStoremethod using MongoDB$addToSet— appends the replier's account and the parent author's account tothread_rooms.replyAccountsidempotently. Called byhandleFirstThreadReply,handleSubsequentThreadReply, andmarkThreadMentions.UpdateThreadRoomLastMessagesignaturereplierAccount string→replyAccounts []string— passes both the replier and the parent author so broadcast-worker can fan out to both on the subsequent-reply path.TestCassandraStore_SaveThreadMessage_IdempotentOnRedeliveryandTestSaveThreadMessage_EncryptedPath_SkipsTcountOnRedelivery— both callSaveThreadMessagetwice with the same message and asserttcount=1in bothmessages_by_idandmessages_by_roomafter the second call.Testing
make lintclean (0 issues);make sastgosec PASS (govulncheck/semgrep run in CI); unit suites green for all modified services.https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi