Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions inbox-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/uuid"

"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/subject"
)

// InboxStore abstracts the data store operations needed by the inbox worker.
Expand All @@ -29,13 +30,14 @@ type Publisher interface {

// Handler processes incoming cross-site OutboxEvent messages.
type Handler struct {
store InboxStore
pub Publisher
store InboxStore
pub Publisher
siteID string
}

// NewHandler creates a Handler with the given store and publisher.
func NewHandler(store InboxStore, pub Publisher) *Handler {
return &Handler{store: store, pub: pub}
// NewHandler creates a Handler with the given store, publisher, and local site ID.
func NewHandler(store InboxStore, pub Publisher, siteID string) *Handler {
return &Handler{store: store, pub: pub, siteID: siteID}
}

// HandleEvent processes a single JetStream message payload.
Expand Down Expand Up @@ -108,9 +110,14 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent)
return fmt.Errorf("bulk create subscriptions: %w", err)
}

// No SubscriptionUpdateEvent is published here — room-worker already publishes
// to the user's subject and the NATS supercluster routes it to the user's
// home site.
// Re-publish the enriched MemberAddEvent to the local ROOMS stream so
// search-sync-worker on this (remote) site picks it up for spotlight +
// user-room indexing. The event already carries RoomName/RoomType from the
// source site's room-worker, so no additional lookup needed.
if err := h.pub.Publish(ctx, subject.RoomCanonicalMemberAdded(h.siteID), evt.Payload); err != nil {
return fmt.Errorf("re-publish member_added to local ROOMS: %w", err)
}

return nil
}

Expand All @@ -131,6 +138,12 @@ func (h *Handler) handleMemberRemoved(ctx context.Context, evt *model.OutboxEven
if err := h.store.DeleteSubscriptionsByAccounts(ctx, memberEvt.RoomID, memberEvt.Accounts); err != nil {
return fmt.Errorf("delete subscriptions for room %s: %w", memberEvt.RoomID, err)
}

// Re-publish to local ROOMS stream for search-sync-worker.
if err := h.pub.Publish(ctx, subject.RoomCanonicalMemberRemoved(h.siteID), evt.Payload); err != nil {
return fmt.Errorf("re-publish member_removed to local ROOMS: %w", err)
}

return nil
}

Expand Down
73 changes: 36 additions & 37 deletions inbox-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/subject"
)

// --- In-memory InboxStore stub ---
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestHandleEvent_MemberAdded(t *testing.T) {
},
}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

change := model.MemberAddEvent{
Type: "member_added",
Expand Down Expand Up @@ -221,11 +222,10 @@ func TestHandleEvent_MemberAdded(t *testing.T) {
t.Error("subscription ID should be non-empty (generated UUID)")
}

// No SubscriptionUpdateEvent is published here — room-worker already
// publishes via the NATS supercluster to the user's home site.
// Re-publish to local ROOMS stream for search-sync-worker.
records := pub.getRecords()
if len(records) != 0 {
t.Fatalf("expected 0 publishes, got %d", len(records))
if len(records) != 1 {
t.Fatalf("expected 1 publish (ROOMS re-publish), got %d", len(records))
}
}

Expand All @@ -236,7 +236,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) {
},
}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

joinedAt := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC)
historyShared := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) {
func TestHandleEvent_RoomSync(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

room := model.Room{
ID: "room-1",
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestHandleEvent_RoomSync(t *testing.T) {
func TestHandleEvent_RoomSync_Upsert(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

// Insert initial room
room1 := model.Room{
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestHandleEvent_RoomSync_Upsert(t *testing.T) {
func TestHandleEvent_UnknownType(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

evt := model.OutboxEvent{
Type: "unknown_type",
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestHandleEvent_UnknownType(t *testing.T) {
func TestHandleEvent_InvalidJSON(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

err := h.HandleEvent(context.Background(), []byte("not json"))
if err == nil {
Expand All @@ -437,7 +437,7 @@ func TestHandleEvent_InvalidJSON(t *testing.T) {
func TestHandleEvent_MemberAdded_InvalidPayload(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

evt := model.OutboxEvent{
Type: "member_added",
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) {
},
}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

change := model.MemberAddEvent{
Type: "member_added",
Expand Down Expand Up @@ -510,10 +510,9 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) {
t.Errorf("subscription User.Account = %q, want %q", sub.User.Account, "account-bob")
}

// No SubscriptionUpdateEvent is published here — room-worker already
// publishes via the NATS supercluster to the user's home site.
if len(pub.getRecords()) != 0 {
t.Errorf("expected 0 publishes, got %d", len(pub.getRecords()))
// Re-publish to local ROOMS stream for search-sync-worker.
if len(pub.getRecords()) != 1 {
t.Errorf("expected 1 publish (ROOMS re-publish), got %d", len(pub.getRecords()))
}
}

Expand All @@ -525,7 +524,7 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) {
},
}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

joinedAt := time.Date(2026, 4, 5, 10, 30, 0, 0, time.UTC)
historyShared := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -592,10 +591,9 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) {
}
}

// No SubscriptionUpdateEvent is published here — room-worker already
// publishes via the NATS supercluster to the user's home site.
if len(pub.getRecords()) != 0 {
t.Fatalf("expected 0 publishes, got %d", len(pub.getRecords()))
// Re-publish to local ROOMS stream for search-sync-worker.
if len(pub.getRecords()) != 1 {
t.Fatalf("expected 1 publish (ROOMS re-publish), got %d", len(pub.getRecords()))
}
}

Expand All @@ -606,7 +604,7 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) {
},
}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

change := model.MemberAddEvent{
Type: "member_added",
Expand Down Expand Up @@ -643,7 +641,7 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) {
func TestHandleEvent_RoomSync_InvalidPayload(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

evt := model.OutboxEvent{
Type: "room_sync",
Expand All @@ -667,7 +665,7 @@ func TestHandleEvent_RoomSync_InvalidPayload(t *testing.T) {
func TestHandleEvent_RoleUpdated(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")
subEvt := model.SubscriptionUpdateEvent{
UserID: "u2",
Subscription: model.Subscription{
Expand Down Expand Up @@ -696,17 +694,17 @@ func TestHandleEvent_RoleUpdated(t *testing.T) {
if len(updates[0].roles) != 1 || updates[0].roles[0] != model.RoleOwner {
t.Errorf("role update roles = %v, want [owner]", updates[0].roles)
}
// No SubscriptionUpdateEvent publish — room-worker already handles that via NATS supercluster
// role_updated doesn't re-publish to ROOMS — only member_added/removed do.
records := pub.getRecords()
if len(records) != 0 {
t.Errorf("expected 0 publishes (room-worker handles notification), got %d", len(records))
t.Errorf("expected 0 publishes for role_updated, got %d", len(records))
}
}

func TestHandleEvent_RoleUpdated_InvalidPayload(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")
evt := model.OutboxEvent{
Type: "role_updated", SiteID: "site-a", DestSiteID: "site-b",
Payload: []byte("not valid json"),
Expand All @@ -724,7 +722,7 @@ func TestHandleEvent_RoleUpdated_InvalidPayload(t *testing.T) {
func TestHandleEvent_MemberRemoved(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

store.mu.Lock()
store.subscriptions = append(store.subscriptions, model.Subscription{
Expand All @@ -749,16 +747,16 @@ func TestHandleEvent_MemberRemoved(t *testing.T) {
subs := store.getSubscriptions()
assert.Empty(t, subs)

// No SubscriptionUpdateEvent is published — room-worker already publishes
// via the NATS supercluster to the user's home site.
// Re-publish to local ROOMS stream for search-sync-worker.
records := pub.getRecords()
assert.Empty(t, records)
require.Len(t, records, 1)
assert.Equal(t, subject.RoomCanonicalMemberRemoved("site-test"), records[0].subject)
}

func TestHandleEvent_MemberRemoved_InvalidPayload(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

evt := model.OutboxEvent{
Type: "member_removed", SiteID: "site-a", DestSiteID: "site-b",
Expand All @@ -773,7 +771,7 @@ func TestHandleEvent_MemberRemoved_InvalidPayload(t *testing.T) {
func TestHandleEvent_MemberRemoved_MultipleAccounts(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

// Pre-populate subscriptions for both accounts
store.mu.Lock()
Expand Down Expand Up @@ -803,15 +801,16 @@ func TestHandleEvent_MemberRemoved_MultipleAccounts(t *testing.T) {
subs := store.getSubscriptions()
assert.Empty(t, subs)

// No SubscriptionUpdateEvent publishes — room-worker handles that
// Re-publish to local ROOMS stream for search-sync-worker.
records := pub.getRecords()
assert.Empty(t, records)
require.Len(t, records, 1)
assert.Equal(t, subject.RoomCanonicalMemberRemoved("site-test"), records[0].subject)
}

func TestHandleEvent_MemberRemoved_EmptyAccountsNoOp(t *testing.T) {
store := &stubInboxStore{}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

memberEvt := model.MemberRemoveEvent{RoomID: "r1", Accounts: []string{}}
payload, _ := json.Marshal(memberEvt)
Expand All @@ -832,7 +831,7 @@ func (s *errorDeleteStore) DeleteSubscriptionsByAccounts(_ context.Context, _ st
func TestHandleEvent_MemberRemoved_DeleteError(t *testing.T) {
store := &errorDeleteStore{stubInboxStore: &stubInboxStore{}}
pub := &mockPublisher{}
h := NewHandler(store, pub)
h := NewHandler(store, pub, "site-test")

memberEvt := model.MemberRemoveEvent{RoomID: "r1", Accounts: []string{"alice"}}
payload, _ := json.Marshal(memberEvt)
Expand Down
4 changes: 2 additions & 2 deletions inbox-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func TestInboxWorker_MemberAdded_Integration(t *testing.T) {
// Create outbox event for member_added
change := model.MemberAddEvent{
Type: "member_added", RoomID: "r1", Accounts: []string{"u2"}, SiteID: "site-b",
JoinedAt: time.Now().UTC().UnixMilli(),
JoinedAt: time.Now().UTC().UnixMilli(),
HistorySharedSince: time.Now().UTC().UnixMilli(),
Timestamp: time.Now().UTC().UnixMilli(),
Timestamp: time.Now().UTC().UnixMilli(),
}
changeData, _ := json.Marshal(change)
evt := model.OutboxEvent{Type: "member_added", SiteID: "site-a", DestSiteID: "site-b", Payload: changeData}
Expand Down
2 changes: 1 addition & 1 deletion inbox-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func main() {
}

publisher := &natsPublisher{nc: nc}
handler := NewHandler(store, publisher)
handler := NewHandler(store, publisher, cfg.SiteID)

cctx, err := cons.Consume(func(m oteljetstream.Msg) {
if err := handler.HandleEvent(m.Context(), m.Data()); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type MemberAddEvent struct {
RoomID string `json:"roomId" bson:"roomId"`
Accounts []string `json:"accounts" bson:"accounts"`
SiteID string `json:"siteId" bson:"siteId"`
RoomName string `json:"roomName" bson:"roomName"`
RoomType RoomType `json:"roomType" bson:"roomType"`
JoinedAt int64 `json:"joinedAt" bson:"joinedAt"`
HistorySharedSince int64 `json:"historySharedSince" bson:"historySharedSince"`
Timestamp int64 `json:"timestamp" bson:"timestamp"`
Expand Down
42 changes: 42 additions & 0 deletions pkg/natsutil/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package natsutil

import "log/slog"

// Acker is the minimal JetStream message interface the Ack helper needs.
// Both `jetstream.Msg` (nats.go) and otel-wrapped variants
// (e.g. `oteljetstream.Msg`) satisfy it.
type Acker interface {
Ack() error
}

// Naker is the minimal JetStream message interface the Nak helper needs.
// Same compatibility story as Acker.
type Naker interface {
Nak() error
}

// Ack acks `msg` and logs any failure under a consistent structured-log
// shape (`reason` + `error`). `reason` is a short label describing WHY
// the message is being acked — e.g. "handler succeeded", "filtered",
// "malformed payload" — so operators can query logs by cause without
// parsing free-text phrases.
//
// Use this from every JetStream consumer in the repo rather than hand-rolling
// an `if err := msg.Ack(); err != nil { slog.Error(...) }` block. Consolidating
// the pattern gives us one place to add tracing spans, metrics counters, or
// delivery-context fields later, and keeps log keys consistent across services.
func Ack(msg Acker, reason string) {
if err := msg.Ack(); err != nil {
slog.Error("ack failed", "reason", reason, "error", err)
}
}

// Nak naks `msg` for redelivery and logs any failure under the same
// structured-log shape as Ack. `reason` describes WHY the message is being
// redelivered — e.g. "handler error", "bulk index failure", "transient
// downstream error".
func Nak(msg Naker, reason string) {
if err := msg.Nak(); err != nil {
slog.Error("nak failed", "reason", reason, "error", err)
}
}
Loading