message-worker: replicate thread subscriptions via outbox/inbox federation#141
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (15)
✅ Files skipped from review due to trivial changes (4)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughAdds OUTBOX/INBOX federation for ThreadSubscription upserts: model wire-format (Participant.SiteID, new outbox type), message-worker publish wiring with deterministic JetStream dedup IDs and owner-site resolution, and inbox-worker dispatch plus Mongo upsert that monotically merges hasMention; includes unit/integration tests and design docs. ChangesThread-subscription OUTBOX → INBOX federation
Sequence Diagram(s)sequenceDiagram
participant Sender as message-worker
participant Jet as JetStream/NATS
participant Inbox as inbox-worker
participant Mongo as MongoDB
Sender->>Sender: build ThreadSubscription (SiteID = room home site)
Sender->>Sender: resolve ownerSiteID (replier/parent/mention participant)
alt ownerSiteID != local && ownerSiteID != ""
Sender->>Jet: Publish Outbox Event (subject outbox.{home}.to.{owner}.thread_subscription_upserted, msgID dedup)
end
Jet->>Inbox: Deliver Outbox Event
Inbox->>Inbox: unmarshal ThreadSubscription
Inbox->>Mongo: Upsert filter=(threadRoomId,userId) with $setOnInsert, $set updatedAt, $max hasMention
Mongo-->>Inbox: ack
Inbox-->>Jet: ack
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
inbox-worker/integration_test.go (1)
239-243: Avoid silently discarding marshal errors in the new integration tests.Several new payload/event marshals ignore
error(e.g., Line 239, Line 284, Line 296, Line 319). Userequire.NoErrorso fixture setup failures are explicit.Proposed patch pattern
- subData, _ := json.Marshal(sub) - evtData, _ := json.Marshal(model.OutboxEvent{ + subData, err := json.Marshal(sub) + require.NoError(t, err) + evtData, err := json.Marshal(model.OutboxEvent{ Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b", Payload: subData, Timestamp: now.UnixMilli(), }) + require.NoError(t, err) - mentionData, _ := json.Marshal(mentionSub) - mentionEvt, _ := json.Marshal(model.OutboxEvent{ + mentionData, err := json.Marshal(mentionSub) + require.NoError(t, err) + mentionEvt, err := json.Marshal(model.OutboxEvent{ Type: "thread_subscription_upserted", SiteID: "site-a", DestSiteID: "site-b", Payload: mentionData, Timestamp: now.UnixMilli(), }) + require.NoError(t, err)As per coding guidelines: "Never ignore errors silently — comment if intentionally discarded."
Also applies to: 283-287, 295-299, 318-322
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@inbox-worker/integration_test.go` around lines 239 - 243, Several json.Marshal calls in the test (e.g., creating subData and evtData for model.OutboxEvent and the other payload marshals referenced) are discarding errors; update each marshal call to capture the error and assert success with require.NoError so test setup failures fail fast. For example, change "subData, _ := json.Marshal(sub)" and "evtData, _ := json.Marshal(model.OutboxEvent{...})" to "subData, err := json.Marshal(...); require.NoError(t, err)" and do the same for the other marshals that produce payload/event bytes referenced in the diff (the subData, evtData and the other payload marshal variables).message-worker/main.go (1)
95-101: Wrap publish errors with context in the injected closure.At Line 97 and Line 100, raw errors are returned without operation/subject context. Please wrap both branches for better diagnostics and guideline compliance.
Proposed patch
handler := NewHandler(store, us, threadStore, cfg.SiteID, func(ctx context.Context, subj string, data []byte, msgID string) error { if msgID == "" { - return nc.Publish(ctx, subj, data) + if err := nc.Publish(ctx, subj, data); err != nil { + return fmt.Errorf("publish nats message to %s: %w", subj, err) + } + return nil } - _, err := js.Publish(ctx, subj, data, jetstream.WithMsgID(msgID)) - return err + if _, err := js.Publish(ctx, subj, data, jetstream.WithMsgID(msgID)); err != nil { + return fmt.Errorf("publish jetstream message to %s with msgID %s: %w", subj, msgID, err) + } + return nil })As per coding guidelines: "Always wrap errors with context using
fmt.Errorf(\"short description: %w\", err)... Never return bareerr."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/main.go` around lines 95 - 101, The injected closure passed into NewHandler returns raw errors from nc.Publish and js.Publish; update it to wrap those errors with context using fmt.Errorf so callers receive descriptive diagnostics (e.g., include operation and subj and msgID where relevant). Locate the anonymous func(ctx context.Context, subj string, data []byte, msgID string) error in the NewHandler call and change the two return points (the nc.Publish branch and the js.Publish branch) to return fmt.Errorf("publish to %s failed: %w", subj, err) and include msgID in the message for the jetstream branch (e.g., "jetstream publish to %s msgID=%s failed: %w"). Ensure fmt is imported if not already.docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md (2)
45-47: Add language specifier to fenced code block.The code block is missing a language specifier, which triggers a markdownlint warning.
📝 Proposed fix
-``` +```go const OutboxThreadSubscriptionUpserted OutboxEventType = "thread_subscription_upserted"</details> <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md
around lines 45 - 47, The fenced code block containing the constant declaration
for OutboxThreadSubscriptionUpserted (type OutboxEventType) lacks a language
tag; update the markdown fence to include the language specifier (e.g., "go") so
the block reads as a Go code block, ensuring proper linting and syntax
highlighting for the OutboxThreadSubscriptionUpserted and OutboxEventType
declaration.</details> --- `119-127`: **Add language specifier to fenced code block.** The Mongo update document block is missing a language specifier. <details> <summary>📝 Proposed fix</summary> ```diff -``` +```javascript filter: { threadRoomId: sub.ThreadRoomID, userId: sub.UserID } update: $setOnInsert: { _id, parentMessageId, roomId, threadRoomId, userId, userAccount, siteId, createdAt, lastSeenAt: null } $set: { updatedAt } $bit: { hasMention: { or: <1 if sub.HasMention else 0> } } opts: upsert: true</details> <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md
around lines 119 - 127, Update the fenced code block to include a language
specifier (e.g., change the openingtojavascript) for the Mongo update
document containing filter, update, $setOnInsert, $set, $bit and opts so syntax
highlighting is applied; locate the block that starts with "filter: {
threadRoomId: sub.ThreadRoomID, userId: sub.UserID }" and prepend the language
name to the opening fence.</details> </blockquote></details> </blockquote></details> <details> <summary>🤖 Prompt for all review comments with AI agents</summary>Verify each finding against the current code and only fix it if needed.
Inline comments:
In@inbox-worker/handler.go:
- Around line 176-186: The function handleThreadSubscriptionUpserted currently
handles unmarshalling and error cases but does not return on success; add an
explicit "return nil" as the final statement in handleThreadSubscriptionUpserted
immediately after the successful call to h.store.UpsertThreadSubscription so the
function returns a nil error on success (keep the existing json.Unmarshal and
error-wrapping logic for UpsertThreadSubscription).
Nitpick comments:
In
@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md:
- Around line 45-47: The fenced code block containing the constant declaration
for OutboxThreadSubscriptionUpserted (type OutboxEventType) lacks a language
tag; update the markdown fence to include the language specifier (e.g., "go") so
the block reads as a Go code block, ensuring proper linting and syntax
highlighting for the OutboxThreadSubscriptionUpserted and OutboxEventType
declaration.- Around line 119-127: Update the fenced code block to include a language
specifier (e.g., change the openingtojavascript) for the Mongo update
document containing filter, update, $setOnInsert, $set, $bit and opts so syntax
highlighting is applied; locate the block that starts with "filter: {
threadRoomId: sub.ThreadRoomID, userId: sub.UserID }" and prepend the language
name to the opening fence.In
@inbox-worker/integration_test.go:
- Around line 239-243: Several json.Marshal calls in the test (e.g., creating
subData and evtData for model.OutboxEvent and the other payload marshals
referenced) are discarding errors; update each marshal call to capture the error
and assert success with require.NoError so test setup failures fail fast. For
example, change "subData, _ := json.Marshal(sub)" and "evtData, _ :=
json.Marshal(model.OutboxEvent{...})" to "subData, err := json.Marshal(...);
require.NoError(t, err)" and do the same for the other marshals that produce
payload/event bytes referenced in the diff (the subData, evtData and the other
payload marshal variables).In
@message-worker/main.go:
- Around line 95-101: The injected closure passed into NewHandler returns raw
errors from nc.Publish and js.Publish; update it to wrap those errors with
context using fmt.Errorf so callers receive descriptive diagnostics (e.g.,
include operation and subj and msgID where relevant). Locate the anonymous
func(ctx context.Context, subj string, data []byte, msgID string) error in the
NewHandler call and change the two return points (the nc.Publish branch and the
js.Publish branch) to return fmt.Errorf("publish to %s failed: %w", subj, err)
and include msgID in the message for the jetstream branch (e.g., "jetstream
publish to %s msgID=%s failed: %w"). Ensure fmt is imported if not already.</details> <details> <summary>🪄 Autofix (Beta)</summary> Fix all unresolved CodeRabbit comments on this PR: - [ ] <!-- {"checkboxId": "4b0d0e0a-96d7-4f10-b296-3a18ea78f0b9"} --> Push a commit to this branch (recommended) - [ ] <!-- {"checkboxId": "ff5b1114-7d8c-49e6-8ac1-43f82af23a33"} --> Create a new PR with the fixes </details> --- <details> <summary>ℹ️ Review info</summary> <details> <summary>⚙️ Run configuration</summary> **Configuration used**: defaults **Review profile**: CHILL **Plan**: Pro **Run ID**: `2481949c-1341-41c5-90f2-113f67d1c317` </details> <details> <summary>📥 Commits</summary> Reviewing files that changed from the base of the PR and between 1cc8af2bd6c18d3e963fd9e78402df5905708f9b and 727b9898430af65c5ef24d916775cbf3ea662501. </details> <details> <summary>📒 Files selected for processing (14)</summary> * `docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md` * `docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md` * `inbox-worker/handler.go` * `inbox-worker/handler_test.go` * `inbox-worker/integration_test.go` * `inbox-worker/main.go` * `message-worker/handler.go` * `message-worker/handler_test.go` * `message-worker/integration_test.go` * `message-worker/main.go` * `pkg/mention/mention.go` * `pkg/mention/mention_test.go` * `pkg/model/event.go` * `pkg/model/model_test.go` </details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
727b989 to
7833243
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md`:
- Around line 45-47: The fenced code blocks in the spec are missing language
identifiers (causing MD040 warnings); update the blocks that contain the Go
constant declaration and the YAML-like outbox filter/update snippet to include
appropriate language tags (e.g., change ``` to ```go for the line declaring
const OutboxThreadSubscriptionUpserted OutboxEventType =
"thread_subscription_upserted" and to ```yaml for the outbox filter/update
block), and apply the same change to the other affected fenced block (the one
around lines showing the outbox filter/update at the other location) so all code
fences specify a language.
- Around line 72-79: The spec's description of ThreadSubscription.SiteID
conflicts with current code/tests: update the spec (or adjust code/tests) so
ThreadSubscription.SiteID semantics match the implemented behavior used by
buildThreadSubscription and inbox-worker; specifically ensure
ThreadSubscription.SiteID is documented to represent the subscription owner's
home site (to align with Subscription.SiteID) or, if the implementation is
intended to use the room's site, change buildThreadSubscription, tests, and
inbox-worker handling to use the room site and update Subscription.SiteID
accordingly; mention ThreadSubscription.SiteID, buildThreadSubscription,
Subscription.SiteID, RoomID and inbox-worker in your changes so readers can
reconcile contract across spec and code.
In `@inbox-worker/main.go`:
- Around line 115-120: Change the collection name used by inbox-worker from
"threadSubscriptions" to the canonical "thread_subscriptions" everywhere so data
isn't split; specifically update the code that creates the index (the call
around s.threadSubCol.Indexes().CreateOne and any other places constructing or
referencing the collection, including the similar index creation at the other
occurrence referenced) to obtain the Mongo collection named
"thread_subscriptions" (lowercase plural per guidelines) before creating indexes
or performing reads/writes.
In `@message-worker/handler.go`:
- Around line 164-176: The code currently skips persisting the parent author's
thread subscription when lookupOwnerSiteID returns an empty string; change the
flow in the parent subscription handling so that you always build the
subscription via buildThreadSubscription and always call
threadStore.InsertThreadSubscription (and in the other branch,
UpsertThreadSubscription) regardless of parentOwnerSite, but only call
publishThreadSubOutboxIfRemote when parentOwnerSite != "" (i.e., gate only the
outbox publish on the presence of parentOwnerSite); apply the same reshape to
the UpsertThreadSubscription path referenced in the review.
- Around line 369-370: The dedup ID currently omits the per-subscription seed so
multiple subscriptions to the same destination in one request collide; include
the per-subscription seed (payloadSeed built from threadRoomID, sub.UserID and
msgID) when generating the dedup ID by changing the outboxDedupID usage (or its
implementation) to incorporate payloadSeed (e.g., include payloadSeed + ":" +
destSiteID in the returned dedup string) so each subscription publish has a
unique, stable dedup ID; update calls to outboxDedupID and its signature if
needed to accept and use the payloadSeed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e41909ec-88a2-4b21-b6c1-6da7355a0980
📒 Files selected for processing (15)
docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.mddocs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.mdinbox-worker/handler.goinbox-worker/handler_test.goinbox-worker/integration_test.goinbox-worker/main.gomessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/main.gopkg/mention/mention.gopkg/mention/mention_test.gopkg/model/event.gopkg/model/model_test.gopkg/model/threadsubscription.go
✅ Files skipped from review due to trivial changes (3)
- pkg/model/threadsubscription.go
- docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md
- message-worker/handler_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- pkg/mention/mention.go
- inbox-worker/handler.go
- pkg/model/model_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (2)
docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md (1)
146-154: 💤 Low valueAlign MongoDB upsert syntax with the implemented solution.
The spec shows
$bit: { hasMention: { or: ... } }for the monotonic mention merge, but the implementation in Task 11 of the plan (line 1907) andinbox-worker/main.gouses$max: { hasMention: sub.HasMention }instead. While line 156 acknowledges both as equivalent, showing the actually-implemented operator reduces confusion.Suggested alignment
filter: { threadRoomId: sub.ThreadRoomID, userId: sub.UserID } update: $setOnInsert: { _id, parentMessageId, roomId, threadRoomId, userId, userAccount, siteId, createdAt, lastSeenAt: null } $set: { updatedAt } - $bit: { hasMention: { or: <1 if sub.HasMention else 0> } } + $max: { hasMention: sub.HasMention } opts: upsert: trueThen update line 156: ```diff -The `$bit:or` (or equivalent `$max` on a 0/1 int) makes the merge monotonic: +The `$max` operator makes the merge monotonic:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md` around lines 146 - 154, Update the spec to match the implemented MongoDB upsert operator: replace the $bit-based monotonic merge example with the $max variant used in Task 11 and inbox-worker/main.go by showing update: { $setOnInsert: {...}, $set: { updatedAt }, $max: { hasMention: sub.HasMention } } (or clearly state the $max equivalence) so the spec's thread monotonic-mention merge (hasMention) aligns exactly with the implemented symbol names and operator used in the codebase (Task 11, inbox-worker/main.go).docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md (1)
2118-2147: ⚡ Quick winAdd a prominent warning at the document start directing readers to the correction.
The post-implementation correction section is comprehensive and well-documented, but it appears only at the end of a 2100+ line plan. Readers who begin executing tasks linearly will follow the incorrect
ThreadSubscription.SiteIDsemantics described in Task 5 (lines 560-902) before discovering the correction.📋 Suggested addition to document header
Add after line 11 (after the "Goal:" statement):
**Goal:** Replicate `ThreadSubscription` writes (parent author, replier, mentionees) from message-worker on the room's home site to each affected user's home site via the existing OUTBOX/INBOX federation. +> **⚠️ IMPORTANT:** This plan document preserves the original task descriptions for historical traceability, but contains a semantic correction applied during implementation. Task 5 and related sections describe `ThreadSubscription.SiteID` as the owner's home site, which is **incorrect**. See the "Post-implementation correction" section at the end of this document for the corrected design. The canonical design is in `docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md`. + **Architecture:** Mirror room-worker's pattern. Message-worker gains a `publish PublishFunc` field; after every local Insert/Upsert/MarkThreadSubscriptionMention it emits a `thread_subscription_upserted` outbox event when the subscription owner's site differs from the room's site. Inbox-worker dispatches the new event type to a new `UpsertThreadSubscription` store method that uses MongoDB `$max` on `hasMention` for monotonic mention-flag merge.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md` around lines 2118 - 2147, Add a prominent warning immediately after the "Goal:" statement directing readers to the corrected design (commit 1c3915c) and the canonical spec at docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md; clearly state in one or two sentences that ThreadSubscription.SiteID is the room's home site (not the owner's), and that routing by owner site is handled transiently via ownerSiteID in publishThreadSubOutboxIfRemote. Also call out the impacted symbols so readers can find the corrected behavior: buildThreadSubscription (always called with roomSite), publishThreadSubOutboxIfRemote (adds ownerSiteID param), handleFirstThreadReply / handleSubsequentThreadReply (gain eventSiteID), markThreadMentions (no empty-SiteID fallback), and test assertions which should expect sub.SiteID == roomSite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.md`:
- Around line 2118-2147: Add a prominent warning immediately after the "Goal:"
statement directing readers to the corrected design (commit 1c3915c) and the
canonical spec at
docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md;
clearly state in one or two sentences that ThreadSubscription.SiteID is the
room's home site (not the owner's), and that routing by owner site is handled
transiently via ownerSiteID in publishThreadSubOutboxIfRemote. Also call out the
impacted symbols so readers can find the corrected behavior:
buildThreadSubscription (always called with roomSite),
publishThreadSubOutboxIfRemote (adds ownerSiteID param), handleFirstThreadReply
/ handleSubsequentThreadReply (gain eventSiteID), markThreadMentions (no
empty-SiteID fallback), and test assertions which should expect sub.SiteID ==
roomSite.
In
`@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md`:
- Around line 146-154: Update the spec to match the implemented MongoDB upsert
operator: replace the $bit-based monotonic merge example with the $max variant
used in Task 11 and inbox-worker/main.go by showing update: { $setOnInsert:
{...}, $set: { updatedAt }, $max: { hasMention: sub.HasMention } } (or clearly
state the $max equivalence) so the spec's thread monotonic-mention merge
(hasMention) aligns exactly with the implemented symbol names and operator used
in the codebase (Task 11, inbox-worker/main.go).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 070a9816-5044-47c9-ba54-df66d14828f6
📒 Files selected for processing (2)
docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.mddocs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md
mliu33
left a comment
There was a problem hiding this comment.
Great work! Just two minor comments and need to resolve conflicts, thanks!
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
message-worker/handler_test.go (1)
1170-1247: ⚡ Quick winAssert distinct dedup IDs in the same-destination cases.
The
"both remote, same site"scenarios only count publishes. If two publishes accidentally reuse the sameNats-Msg-Id, these tests still pass because the fake publisher records both calls and doesn't model JetStream dedup. Capturing the twomsgIDs and asserting they differ would lock in the exact regression this PR fixed.Also applies to: 1318-1400
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/handler_test.go` around lines 1170 - 1247, In the "both remote, same site" test(s) inside handler_test.go where you build the publishCall slice via NewHandler's publish func and call h.handleFirstThreadReply, capture the msgID values from each publishCall (publishCall.msgID) for publishes that share the same outer.DestSiteID and add an assertion that those msgIDs are distinct (e.g., require.NotEqual or assert.NotEqual) to ensure the Nats-Msg-Id is unique per publish; apply the same change to the similar test block around the 1318-1400 region so both tests validate deduplication behavior by comparing the two recorded msgIDs for the same destination and failing if they are equal.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@docs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.md`:
- Around line 303-307: In inbox-worker/main.go update the mongoInboxStore
initialization to use the actual collection name "thread_subscriptions": set the
threadSubCol *mongo.Collection by calling db.Collection("thread_subscriptions")
(replace the current db.Collection("threadSubscriptions") usage) so the
mongoInboxStore.threadSubCol matches the rollout spec and message-worker
collection name.
In `@message-worker/handler.go`:
- Around line 279-282: The log message around lookupOwnerSiteID is misleading:
when errors.Is(err, userstore.ErrUserNotFound) we only skip cross-site publish,
not the entire thread subscription. Update the slog.Warn call in the error
branch inside lookupOwnerSiteID handling (where ErrUserNotFound is checked and
the code returns "", nil) to state that the owner site lookup failed and
cross-site publish will be skipped (include userID and role), e.g. change "owner
user not found — skipping thread subscription" to something like "owner user not
found — skipping cross-site publish; local thread insert/upsert will continue"
so on-call debugging tracks the correct flow.
In `@message-worker/main.go`:
- Around line 95-99: The inline publish closure passed to NewHandler is calling
nc.Publish with a context parameter (in the anonymous func defined in main.go)
which mismatches the nats.Conn.Publish signature; update the closure used when
constructing NewHandler to call nc.Publish(subj, data) (remove the ctx argument)
and propagate any returned error as before (e.g., return fmt.Errorf("publish
nats message to %s: %w", subj, err)). Ensure you modify the anonymous function
where NewHandler(..., func(ctx context.Context, subj string, data []byte, msgID
string) error { ... }) is defined.
---
Nitpick comments:
In `@message-worker/handler_test.go`:
- Around line 1170-1247: In the "both remote, same site" test(s) inside
handler_test.go where you build the publishCall slice via NewHandler's publish
func and call h.handleFirstThreadReply, capture the msgID values from each
publishCall (publishCall.msgID) for publishes that share the same
outer.DestSiteID and add an assertion that those msgIDs are distinct (e.g.,
require.NotEqual or assert.NotEqual) to ensure the Nats-Msg-Id is unique per
publish; apply the same change to the similar test block around the 1318-1400
region so both tests validate deduplication behavior by comparing the two
recorded msgIDs for the same destination and failing if they are equal.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f52ce64c-f488-432e-b5d7-0006a5f9424c
📒 Files selected for processing (7)
docs/superpowers/plans/2026-04-28-message-worker-thread-subscription-outbox.mddocs/superpowers/specs/2026-04-28-message-worker-thread-subscription-outbox-design.mdinbox-worker/integration_test.goinbox-worker/main.gomessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/main.go
🚧 Files skipped from review as they are similar to previous changes (2)
- inbox-worker/integration_test.go
- inbox-worker/main.go
| handler := NewHandler(store, us, threadStore, cfg.SiteID, func(ctx context.Context, subj string, data []byte, msgID string) error { | ||
| if msgID == "" { | ||
| if err := nc.Publish(ctx, subj, data); err != nil { | ||
| return fmt.Errorf("publish nats message to %s: %w", subj, err) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In github.com/nats-io/nats.go v1.50.0, what is the signature of (*nats.Conn).Publish? Does it accept context.Context, or is it Publish(subject string, data []byte)?
💡 Result:
In github.com/nats-io/nats.go v1.50.0, the signature of (*nats.Conn).Publish is func (c *Conn) Publish(subject string, data []byte) error. It does not accept context.Context as the first parameter. It is Publish(subject string, data []byte). This is evidenced by multiple examples in the official repository at tag v1.50.0 showing usage as nc.Publish("foo", []byte("Hello World")), and internal calls like c.Conn.publish(subject, EMPTY, false, nil, b) from EncodedConn.Publish. Context support exists for other methods like RequestWithContext and NextMsgWithContext, and for JetStream's Publish (which uses Request under the hood), but not for the core Conn.Publish method, which is fire-and-forget and non-blocking.
Citations:
- 1: https://github.com/nats-io/nats.go/blob/v1.50.0/nats.go
- 2: https://github.com/nats-io/nats.go/blob/v1.50.0/js.go
- 3: https://github.com/nats-io/nats.go
- 4: https://github.com/nats-io/nats.go/tree/v1.51.0
- 5: https://github.com/nats-io/nats.go/blob/v1.50.0/enc.go
- 6: https://github.com/nats-io/nats.go/tree/v1.50.0
Fix NATS Publish call — remove context parameter that will cause compilation failure.
The core NATS (*nats.Conn).Publish method signature is Publish(subject string, data []byte) error and does not accept a context parameter. The current call at lines 97-98 will not compile:
Fix
if msgID == "" {
- if err := nc.Publish(ctx, subj, data); err != nil {
+ if err := nc.Publish(subj, data); err != nil {
return fmt.Errorf("publish nats message to %s: %w", subj, err)
}
return nil
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@message-worker/main.go` around lines 95 - 99, The inline publish closure
passed to NewHandler is calling nc.Publish with a context parameter (in the
anonymous func defined in main.go) which mismatches the nats.Conn.Publish
signature; update the closure used when constructing NewHandler to call
nc.Publish(subj, data) (remove the ctx argument) and propagate any returned
error as before (e.g., return fmt.Errorf("publish nats message to %s: %w", subj,
err)). Ensure you modify the anonymous function where NewHandler(..., func(ctx
context.Context, subj string, data []byte, msgID string) error { ... }) is
defined.
There was a problem hiding this comment.
False positive — nc here is *otelnats.Conn from github.com/Marz32onE/instrumentation-go/otel-nats/otelnats (returned by natsutil.Connect, see pkg/natsutil/connect.go:25), not *nats.Conn. The otelnats wrapper's Publish does accept context.Context as the first arg — it propagates the trace context through the NATS message headers.
Same call shape is used in room-worker/main.go:80 (nc.Publish(ctx, subj, data)) and has been compiling and running for months.
Verified on the current branch (HEAD 908d7be):
make lint—0 issuesmake test— green across all 30 packagesmake build SERVICE=message-worker— succeeds
The web query in your analysis was against github.com/nats-io/nats.go directly, but our codebase doesn't use that interface in the message-worker startup path — it uses the otel-nats wrapper everywhere.
Generated by Claude Code
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check natsutil.Connect return type and the otelnats wrapper's Publish signature
fd -i "connect.go" --search-path pkg/natsutil/ | xargs cat 2>/dev/null || fd "connect.go" | grep -i natsutil | xargs cat 2>/dev/null
# Check the otelnats wrapper's Publish method signature
rg -n "func.*Publish" --type go -C2
Length of output: 27458
@general-lex, you're absolutely right — I apologize for the false positive. The script confirms that natsutil.Connect returns *otelnats.Conn (from github.com/Marz32onE/instrumentation-go/otel-nats/otelnats), not a plain *nats.Conn. The otelnats wrapper's Publish method does accept context.Context as its first argument for trace propagation, as evidenced by history-service/internal/publisher/publisher.go:26 calling p.nc.Publish(ctx, subj, data) with the same pattern used throughout the codebase.
My web query was incorrectly scoped to the upstream nats.go library rather than the instrumented wrapper used here. I'm recording this to avoid the same mistake in future reviews.
✏️ Learnings added
Learnt from: general-lex
Repo: hmchangw/chat PR: 141
File: message-worker/main.go:95-99
Timestamp: 2026-05-04T03:30:38.678Z
Learning: In the hmchangw/chat repository, `natsutil.Connect` (pkg/natsutil/connect.go) returns `*otelnats.Conn` from `github.com/Marz32onE/instrumentation-go/otel-nats/otelnats`, not a plain `*nats.Conn`. The `otelnats.Conn.Publish` method signature accepts `context.Context` as the first argument (i.e., `Publish(ctx context.Context, subj string, data []byte) error`) for OpenTelemetry trace propagation via NATS message headers. Do NOT flag `nc.Publish(ctx, subj, data)` calls as incorrect — this is the standard and correct call shape throughout this codebase (used in message-worker, broadcast-worker, notification-worker, history-service, etc.).
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/roomkeysender/roomkeysender.go:28-30
Timestamp: 2026-04-08T08:25:32.108Z
Learning: In this repository’s Go code, follow the CLAUDE.md rule: don’t add defensive error handling, fallbacks, or nil/validation guards for conditions that are guaranteed not to occur. For internal functions/methods (e.g., when all callers are known to pass non-nil pointer parameters, such as pkg/roomkeysender/roomkeysender.go’s Send), the absence of nil-pointer guards is acceptable—do not flag it as a review issue. Reserve pointer validation and other safety checks for system boundaries (e.g., external inputs, API/HTTP requests, deserialization, or other untrusted entry points).
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/model/model_test.go:306-317
Timestamp: 2026-04-08T08:25:48.132Z
Learning: In Go, `time.Time` is `comparable` and therefore may be used as a type argument to generic constraints requiring `comparable` (e.g., `func roundTrip[T comparable](...)`). This is because `time.Time`’s underlying fields are all comparable (`uint64`, `int64`, and a pointer `*Location`, where pointers are comparable by address). As a result, any struct that contains or embeds a `time.Time` field (e.g., `model.Message`, or structs embedding it like `model.NotificationEvent`) should also satisfy `comparable` and should not be flagged as a compilation-error when used with `T comparable`.
Replicate ThreadSubscription writes (parent author, replier, mentionees) to each affected user's home site via OUTBOX/INBOX, mirroring room-worker's pattern. Single new outbox type (thread_subscription_upserted), payload is the existing ThreadSubscription, inbox-worker upserts with monotonic hasMention merge.
…ing) Add OutboxThreadSubscriptionUpserted constant, SiteID on Participant propagated from mention.Resolve, and siteID + PublishFunc fields on message-worker Handler.
…-site) Add publishThreadSubOutboxIfRemote helper with TDD coverage for same-site no-op, empty-SiteID skip, remote publish payload + dedup ID, and publish errors. Switch ThreadSubscription.SiteID to the owner's site, plumb the replier User through, and look up the parent's siteID via userStore with warn-and-skip on ErrUserNotFound.
…paths) Wire publishThreadSubOutboxIfRemote after each InsertThreadSubscription in handleFirstThreadReply and after each UpsertThreadSubscription in handleSubsequentThreadReply, with cross-site test matrices and error propagation for outbox publish failures.
… wiring) Wire publishThreadSubOutboxIfRemote after MarkThreadSubscriptionMention with test coverage for local/remote mentionees, @ALL skip, sender self-mention skip, and HasMention=true in the published payload. Verify message-worker main.go JetStream publish closure and confirm bootstrap.go stays unchanged for OUTBOX (ops/IaC owns it).
Add UpsertThreadSubscription to InboxStore interface and the thread_subscription_upserted dispatch case + handler in inbox-worker, with unit-test coverage for insert, monotonic hasMention merge, invalid payload, and store-error paths via an extended in-memory stub.
…ription) Add threadSubCol to mongoInboxStore, ensure (threadRoomId, userId) unique index at startup, and implement UpsertThreadSubscription using $setOnInsert for immutable fields, $set on updatedAt, and $max on hasMention so a non-mention event never clears a prior mention=true.
…n tests) Add integration tests covering the insert path and the monotonic hasMention merge across two and three events through the real Mongo testcontainer, asserting _id/createdAt are pinned by \$setOnInsert and updatedAt advances.
Add spec coverage map and execution-handoff section. Clean up Task 5 step 5.3 to present lookupOwnerSiteID with its final (string, error) signature directly instead of showing an incorrect (string, bool) version first and then correcting it.
- Add SiteID field to model.Participant struct with omitempty JSON/BSON tags - Extend TestParticipantJSON to verify SiteID round-trips and omits when empty - Propagate SiteID from User to Participant in mention.Resolve - Update mention tests to include SiteID in test fixtures and expectations - Update message-worker test fixture to expect SiteID in resolved Participant Downstream code can now determine each mentioned user's home site via Participant.SiteID, enabling cross-site outbox events for thread subscriptions. https://claude.ai/code/session_01J9Ht3dT6EzmvoipcVzz8NB
… siteID
- buildThreadSubscription now takes ownerSiteID (owner's home site, not
room's site) so subscriptions round-trip correctly through OUTBOX/INBOX
- handleThreadRoomAndSubscriptions/handleFirstThreadReply/handleSubsequent-
ThreadReply updated to accept replier *model.User and use replier.SiteID
- lookupOwnerSiteID helper resolves parent's home site via userStore.FindUserByID;
returns ("", nil) on ErrUserNotFound (warn-and-skip) so processing continues
- markThreadMentions uses Participant.SiteID for mentionee subscriptions
- processMessage passes user to handleThreadRoomAndSubscriptions
- Tests: added FindUserByID parent mock expectations to all thread test cases;
extended struct with extraUserStoreSetup/expectReplierInsert; added
warn-and-skip and DB-error edge-case tests for parent user lookup
https://claude.ai/code/session_01J9Ht3dT6EzmvoipcVzz8NB
Wire publishThreadSubOutboxIfRemote into markThreadMentions so that remote mentionees receive a thread_subscription_upserted outbox event with HasMention=true. Local mentionees short-circuit with no publish. https://claude.ai/code/session_01J9Ht3dT6EzmvoipcVzz8NB
…nt arg - Drop the now-removed Publisher arg from NewHandler calls in our 4 inbox-worker thread-subscription tests and 2 integration tests. - Drop the obsolete mockPublisher/publishRecord types; upstream removed publishing from inbox-worker and the existing tests no longer use them. - Add the new userAccount arg to two UpdateThreadRoomLastMessage mock expectations our tests added (PR #127 changed the signature).
…arately mliu33's review: Subscription.SiteID and ThreadSubscription.SiteID both mean "the home site of the room" — a back-reference, not a self-identifier of the owner. Across federation the field is constant on every replica. Revert the Task-5 semantic change. Subscription.SiteID is now built from eventSiteID (the room's site) at all three call sites (parent, replier, mentionee). publishThreadSubOutboxIfRemote takes ownerSiteID as an explicit parameter, used only to decide whether to publish and where (not stored on the subscription). - handler.go: buildThreadSubscription param renamed back to siteID; helper signature gains ownerSiteID; handleFirstThreadReply / handleSubsequent- ThreadReply gain eventSiteID and pass it for the sub while routing the outbox by parentOwnerSite or replier.SiteID; markThreadMentions stores eventSiteID on the sub and routes by Participant.SiteID. - pkg/model/threadsubscription.go: doc comment clarifying SiteID is the room's site, same semantic as Subscription.SiteID. - Tests updated to assert SiteID is the room's site after the round-trip and to call the new helper/handler signatures.
Spec doc rewritten in-place to describe the corrected design: ThreadSubscription.SiteID is the room's site (matching Subscription.SiteID), preserved across federation. Owner-site routing for the cross-site outbox publish is a separate, explicit ownerSiteID parameter to publishThreadSubOutboxIfRemote — not stored on the subscription. Plan doc gains a "Post-implementation correction" postscript pointing at commit 1c3915c that captures the divergence from the original Task 5 description. Plan body is left as-written for historical traceability. Skipping the optional rename to RoomSiteID per Subscription.SiteID's unprefixed convention; this is documented as a non-goal in the spec.
… on missing owner-site
Two bug fixes from PR review (CodeRabbit):
1. outboxDedupID was returning requestID + ":" + destSiteID and discarding
payloadSeed when an X-Request-ID was present in context. A single thread
reply that publishes for parent + replier (or multiple mentionees) on the
same destination site got identical dedup IDs, causing JetStream to dedup
away all but the first publish. Use payloadSeed (which already contains
threadRoomID + userID + msg.ID — stable across redeliveries and unique
per publish) directly: "{payloadSeed}:{destSiteID}".
2. handleFirstThreadReply / handleSubsequentThreadReply skipped the local
parent Insert/Upsert entirely when lookupOwnerSiteID returned ("", nil)
on userstore.ErrUserNotFound. The local subscription doesn't depend on
knowing the owner's site (SiteID is the room's site, which we have via
eventSiteID, and parentSender already supplies userID + account). Only
the cross-site outbox publish needs the owner site. Always persist the
parent subscription locally; gate only the outbox call.
Tests updated: the two warn-and-skip cases now assert the parent
Insert/Upsert still runs while the outbox publish is skipped.
…plan banner Spec: add language tags to two markdown fences (MD040). Replace the \$bit example with the implemented \$max operator and explain why \$max on a Go bool field works (BSON encodes false=0x00 < true=0x01 → monotonic merge). Plan: add a top-of-document warning right after the Goal line pointing at the post-implementation correction section + the canonical spec, so readers who follow the linear task list don't first encounter (and act on) the incorrect Task 5 SiteID semantic.
…ation tests
CodeRabbit nitpicks (CLAUDE.md compliance):
- message-worker/main.go: wrap nc.Publish / js.Publish errors from the
injected publish closure with fmt.Errorf so callers get descriptive
diagnostics ("publish nats message to <subj>" / "publish jetstream
message to <subj> with msgID <id>").
- inbox-worker/integration_test.go: capture errors from json.Marshal
in our 8 new thread-subscription test fixture builds and assert with
require.NoError instead of "_, := json.Marshal(...)" — fail fast on
fixture construction errors. Pre-existing _ := json.Marshal sites in
unrelated test cases left as-is.
…edup ID CodeRabbit + mliu33 review: - Inbox-worker was passing "threadSubscriptions" (camelCase) to db.Collection(...). The canonical collection-name string elsewhere in the codebase is "thread_subscriptions" (snake_case): message-worker/ store_mongo.go:32, all 11 sites in message-worker/integration_test.go, and history-service/internal/mongorepo/threadroom.go:28. This PR was splitting federated thread subscriptions into a separate collection invisible to message-worker reads. Fix: rename the collection string in inbox-worker/main.go and 5 sites in inbox-worker/integration_test.go. The Go struct field name (threadSubCol) is unchanged. - outboxDedupID's X-Request-ID branch was dead — the value was warned about but not used in the result. Inline the dedup-ID derivation into publishThreadSubOutboxIfRemote (single call site) and drop the helper + the now-unused natsutil import. Format unchanged: thread-sub-outbox:<threadRoomID>:<userID>:<msgID>:<destSiteID>.
Two CodeRabbit nits:
- spec implementation outline still showed db.Collection("threadSubscriptions")
in the inbox-worker/main.go section, contradicting the rest of the spec
(which uses thread_subscriptions) and the actual code after 8147b83. Fix
the example string and clarify why the Go field name and collection name
differ. Drop a stale \$bit:or reference at the same time (helper text was
also out of date — the implementation uses \$max).
- lookupOwnerSiteID's warn log on userstore.ErrUserNotFound said "skipping
thread subscription" but after the parent-Insert gate fix, only the
cross-site outbox publish is skipped — the local Insert/Upsert still
runs. Reword so on-call debugging tracks the correct flow.
main brought in TestHandler_QuotedMessage (PR #134, message quoting) that calls NewHandler with the old 3-arg signature. Update that one site to the post-Task-3 5-arg form (siteID + no-op publish closure), matching the pattern used by every other test in this file.
908d7be to
4838110
Compare
Summary
Implement cross-site replication of
ThreadSubscriptionwrites from message-worker to affected users' home sites via the existing OUTBOX/INBOX federation pattern. When a user replies in a thread, subscriptions for the parent author, replier, and mentionees are now published as outbox events to their respective home sites, enabling them to see thread participation in their local UI.Key Changes
OutboxThreadSubscriptionUpsertedevent type constant and extendParticipantwithSiteIDfield for mention resolutionsiteIDandPublishFuncfields toHandlerfor outbox publishingpublishThreadSubOutboxIfRemotehelper that publishes thread subscription events when the subscription owner's home site differs from the room's sitebuildThreadSubscriptionand lookup parent author's site viauserstore.FindUserByIDInsertThreadSubscription,UpsertThreadSubscription, andMarkThreadSubscriptionMentionoperationsthread_subscription_upsertedeventsUpsertThreadSubscriptionstore method using MongoDB$maxoperator for monotonichasMentionflag merge (never clear true→false)threadSubColcollection tomongoInboxStorewith unique index on(threadRoomId, userId)SiteIDfrom looked-upUserobjects ontoParticipantstructsImplementation Details
thread-sub-outbox:{threadRoomID}:{userID}:{msg.ID}, stable across redeliveries for JetStream stream-level dedupoutbox.{homeSite}.to.{destSite}.thread_subscription_upsertedThreadSubscription.SiteIDnow reflects the subscription owner's home site (not the room's site), enabling clean round-trip federation$maxonhasMentionto ensure mention flags never regress from true to false across redeliverieshttps://claude.ai/code/session_01J9Ht3dT6EzmvoipcVzz8NB
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests