From 0fc366a46b27f534a59977df7b936a59cd8eae77 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 20 Apr 2026 07:49:54 +0000 Subject: [PATCH] feat: room sync for spotlight + user-room index via ROOMS stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement end-to-end room-member event pipeline for search indexing: room-worker enriches + publishes to ROOMS stream, inbox-worker re-publishes cross-site events to local ROOMS, and search-sync-worker consumes from ROOMS to maintain spotlight (room typeahead) and user-room (access control) Elasticsearch indexes. Event enrichment (pkg/model, room-worker): - MemberAddEvent gains RoomName + RoomType fields (already loaded in scope at processAddMembers/processInvite time — zero extra queries) - room-worker publishes enriched events to RoomCanonical subjects (chat.room.canonical.{site}.member_added/removed) which land in the existing ROOMS_{siteID} stream. Published alongside existing chat.room.{roomID}.event.member for backward compat with other consumers. - processAddMembers, processRemoveIndividual, processRemoveOrg, and processInvite all publish to ROOMS stream. Cross-site relay (inbox-worker): - After handling member_added (BulkCreateSubscriptions) and member_removed (DeleteSubscriptionsByAccounts), inbox-worker re-publishes the event to the local ROOMS stream so search-sync-worker on the remote site picks it up. Handler gains a siteID field. search-sync-worker: - Collection interface: BuildAction returns []BulkAction (fan-out), StreamConfig returns jetstream.StreamConfig, new FilterSubjects. - Handler: pendingMsg tracks per-message action ranges, ActionCount() drives flush decisions, isBulkItemSuccess with ErrorType-aware 404 handling, natsutil.Ack/Nak helpers. - roomMemberCollection base: StreamConfig from stream.Rooms, filter subjects from subject.RoomCanonicalMemberEventSubjects. - parseMemberEvent: tagged-union parser for MemberAddEvent / MemberRemoveEvent (supports member_added, member_removed, member_left). - spotlightCollection: doc key = account_roomID (composite), indexes userAccount/roomId/roomName/roomType/siteId/joinedAt. External versioning via evt.Timestamp. Restricted rooms (HistorySharedSince > 0) skip entire event. - userRoomCollection: per-user rooms array with LWW timestamp guard in painless scripts. roomTimestamps flattened map prevents stale out-of-order events from corrupting state. Multi-pod safe via ES primary-shard atomicity + the guard. Timestamp source is evt.Timestamp (not JoinedAt). - main.go: multi-collection loop with per-collection stream/consumer wiring. FetchBatchSize/BulkBatchSize/BulkFlushInterval config split. bootstrapConfig for dev-only stream creation. Fan-out-safe runConsumer with mid-batch flush. - esPropertiesFromStruct[T] generic for template mapping reflection. pkg/searchengine: - ActionUpdate type + bulk adapter (no external versioning on _update). - BulkResult.ErrorType for distinguishing document_missing_exception from index_not_found_exception on 404. pkg/natsutil: - Ack/Nak helpers with Acker/Naker minimal interfaces. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN --- inbox-worker/handler.go | 29 +++-- inbox-worker/handler_test.go | 73 +++++------ inbox-worker/integration_test.go | 4 +- inbox-worker/main.go | 2 +- pkg/model/event.go | 2 + pkg/natsutil/ack.go | 42 ++++++ pkg/natsutil/ack_test.go | 64 +++++++++ pkg/searchengine/adapter.go | 12 +- pkg/searchengine/searchengine.go | 20 ++- pkg/subject/subject.go | 15 +++ room-worker/handler.go | 40 +++++- room-worker/handler_test.go | 8 +- search-sync-worker/collection.go | 14 +- search-sync-worker/handler.go | 150 +++++++++++++-------- search-sync-worker/handler_test.go | 10 +- search-sync-worker/main.go | 195 ++++++++++++++++++++-------- search-sync-worker/messages.go | 25 ++-- search-sync-worker/messages_test.go | 11 +- search-sync-worker/room_member.go | 97 ++++++++++++++ search-sync-worker/spotlight.go | 129 ++++++++++++++++++ search-sync-worker/template.go | 43 ++++++ search-sync-worker/user_room.go | 186 ++++++++++++++++++++++++++ 22 files changed, 972 insertions(+), 199 deletions(-) create mode 100644 pkg/natsutil/ack.go create mode 100644 pkg/natsutil/ack_test.go create mode 100644 search-sync-worker/room_member.go create mode 100644 search-sync-worker/spotlight.go create mode 100644 search-sync-worker/template.go create mode 100644 search-sync-worker/user_room.go diff --git a/inbox-worker/handler.go b/inbox-worker/handler.go index cd7d25c59..1c759741b 100644 --- a/inbox-worker/handler.go +++ b/inbox-worker/handler.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/subject" ) // InboxStore abstracts the data store operations needed by the inbox worker. @@ -29,13 +30,14 @@ type Publisher interface { // Handler processes incoming cross-site OutboxEvent messages. type Handler struct { - store InboxStore - pub Publisher + store InboxStore + pub Publisher + siteID string } -// NewHandler creates a Handler with the given store and publisher. -func NewHandler(store InboxStore, pub Publisher) *Handler { - return &Handler{store: store, pub: pub} +// NewHandler creates a Handler with the given store, publisher, and local site ID. +func NewHandler(store InboxStore, pub Publisher, siteID string) *Handler { + return &Handler{store: store, pub: pub, siteID: siteID} } // HandleEvent processes a single JetStream message payload. @@ -108,9 +110,14 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent) return fmt.Errorf("bulk create subscriptions: %w", err) } - // No SubscriptionUpdateEvent is published here — room-worker already publishes - // to the user's subject and the NATS supercluster routes it to the user's - // home site. + // Re-publish the enriched MemberAddEvent to the local ROOMS stream so + // search-sync-worker on this (remote) site picks it up for spotlight + + // user-room indexing. The event already carries RoomName/RoomType from the + // source site's room-worker, so no additional lookup needed. + if err := h.pub.Publish(ctx, subject.RoomCanonicalMemberAdded(h.siteID), evt.Payload); err != nil { + return fmt.Errorf("re-publish member_added to local ROOMS: %w", err) + } + return nil } @@ -131,6 +138,12 @@ func (h *Handler) handleMemberRemoved(ctx context.Context, evt *model.OutboxEven if err := h.store.DeleteSubscriptionsByAccounts(ctx, memberEvt.RoomID, memberEvt.Accounts); err != nil { return fmt.Errorf("delete subscriptions for room %s: %w", memberEvt.RoomID, err) } + + // Re-publish to local ROOMS stream for search-sync-worker. + if err := h.pub.Publish(ctx, subject.RoomCanonicalMemberRemoved(h.siteID), evt.Payload); err != nil { + return fmt.Errorf("re-publish member_removed to local ROOMS: %w", err) + } + return nil } diff --git a/inbox-worker/handler_test.go b/inbox-worker/handler_test.go index d3413e747..dfcd7306c 100644 --- a/inbox-worker/handler_test.go +++ b/inbox-worker/handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/subject" ) // --- In-memory InboxStore stub --- @@ -164,7 +165,7 @@ func TestHandleEvent_MemberAdded(t *testing.T) { }, } pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") change := model.MemberAddEvent{ Type: "member_added", @@ -221,11 +222,10 @@ func TestHandleEvent_MemberAdded(t *testing.T) { t.Error("subscription ID should be non-empty (generated UUID)") } - // No SubscriptionUpdateEvent is published here — room-worker already - // publishes via the NATS supercluster to the user's home site. + // Re-publish to local ROOMS stream for search-sync-worker. records := pub.getRecords() - if len(records) != 0 { - t.Fatalf("expected 0 publishes, got %d", len(records)) + if len(records) != 1 { + t.Fatalf("expected 1 publish (ROOMS re-publish), got %d", len(records)) } } @@ -236,7 +236,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) { }, } pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") joinedAt := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC) historyShared := time.Date(2026, 4, 10, 8, 0, 0, 0, time.UTC) @@ -285,7 +285,7 @@ func TestHandleEvent_MemberAdded_SetsTimestamps(t *testing.T) { func TestHandleEvent_RoomSync(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") room := model.Room{ ID: "room-1", @@ -347,7 +347,7 @@ func TestHandleEvent_RoomSync(t *testing.T) { func TestHandleEvent_RoomSync_Upsert(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") // Insert initial room room1 := model.Room{ @@ -393,7 +393,7 @@ func TestHandleEvent_RoomSync_Upsert(t *testing.T) { func TestHandleEvent_UnknownType(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") evt := model.OutboxEvent{ Type: "unknown_type", @@ -426,7 +426,7 @@ func TestHandleEvent_UnknownType(t *testing.T) { func TestHandleEvent_InvalidJSON(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") err := h.HandleEvent(context.Background(), []byte("not json")) if err == nil { @@ -437,7 +437,7 @@ func TestHandleEvent_InvalidJSON(t *testing.T) { func TestHandleEvent_MemberAdded_InvalidPayload(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") evt := model.OutboxEvent{ Type: "member_added", @@ -465,7 +465,7 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) { }, } pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") change := model.MemberAddEvent{ Type: "member_added", @@ -510,10 +510,9 @@ func TestHandleEvent_MemberAdded_AccountRoutedSubject(t *testing.T) { t.Errorf("subscription User.Account = %q, want %q", sub.User.Account, "account-bob") } - // No SubscriptionUpdateEvent is published here — room-worker already - // publishes via the NATS supercluster to the user's home site. - if len(pub.getRecords()) != 0 { - t.Errorf("expected 0 publishes, got %d", len(pub.getRecords())) + // Re-publish to local ROOMS stream for search-sync-worker. + if len(pub.getRecords()) != 1 { + t.Errorf("expected 1 publish (ROOMS re-publish), got %d", len(pub.getRecords())) } } @@ -525,7 +524,7 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) { }, } pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") joinedAt := time.Date(2026, 4, 5, 10, 30, 0, 0, time.UTC) historyShared := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) @@ -592,10 +591,9 @@ func TestHandleEvent_MemberAdded_EventSourcedFields(t *testing.T) { } } - // No SubscriptionUpdateEvent is published here — room-worker already - // publishes via the NATS supercluster to the user's home site. - if len(pub.getRecords()) != 0 { - t.Fatalf("expected 0 publishes, got %d", len(pub.getRecords())) + // Re-publish to local ROOMS stream for search-sync-worker. + if len(pub.getRecords()) != 1 { + t.Fatalf("expected 1 publish (ROOMS re-publish), got %d", len(pub.getRecords())) } } @@ -606,7 +604,7 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) { }, } pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") change := model.MemberAddEvent{ Type: "member_added", @@ -643,7 +641,7 @@ func TestHandleEvent_MemberAdded_HistoryAll(t *testing.T) { func TestHandleEvent_RoomSync_InvalidPayload(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") evt := model.OutboxEvent{ Type: "room_sync", @@ -667,7 +665,7 @@ func TestHandleEvent_RoomSync_InvalidPayload(t *testing.T) { func TestHandleEvent_RoleUpdated(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") subEvt := model.SubscriptionUpdateEvent{ UserID: "u2", Subscription: model.Subscription{ @@ -696,17 +694,17 @@ func TestHandleEvent_RoleUpdated(t *testing.T) { if len(updates[0].roles) != 1 || updates[0].roles[0] != model.RoleOwner { t.Errorf("role update roles = %v, want [owner]", updates[0].roles) } - // No SubscriptionUpdateEvent publish — room-worker already handles that via NATS supercluster + // role_updated doesn't re-publish to ROOMS — only member_added/removed do. records := pub.getRecords() if len(records) != 0 { - t.Errorf("expected 0 publishes (room-worker handles notification), got %d", len(records)) + t.Errorf("expected 0 publishes for role_updated, got %d", len(records)) } } func TestHandleEvent_RoleUpdated_InvalidPayload(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") evt := model.OutboxEvent{ Type: "role_updated", SiteID: "site-a", DestSiteID: "site-b", Payload: []byte("not valid json"), @@ -724,7 +722,7 @@ func TestHandleEvent_RoleUpdated_InvalidPayload(t *testing.T) { func TestHandleEvent_MemberRemoved(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") store.mu.Lock() store.subscriptions = append(store.subscriptions, model.Subscription{ @@ -749,16 +747,16 @@ func TestHandleEvent_MemberRemoved(t *testing.T) { subs := store.getSubscriptions() assert.Empty(t, subs) - // No SubscriptionUpdateEvent is published — room-worker already publishes - // via the NATS supercluster to the user's home site. + // Re-publish to local ROOMS stream for search-sync-worker. records := pub.getRecords() - assert.Empty(t, records) + require.Len(t, records, 1) + assert.Equal(t, subject.RoomCanonicalMemberRemoved("site-test"), records[0].subject) } func TestHandleEvent_MemberRemoved_InvalidPayload(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") evt := model.OutboxEvent{ Type: "member_removed", SiteID: "site-a", DestSiteID: "site-b", @@ -773,7 +771,7 @@ func TestHandleEvent_MemberRemoved_InvalidPayload(t *testing.T) { func TestHandleEvent_MemberRemoved_MultipleAccounts(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") // Pre-populate subscriptions for both accounts store.mu.Lock() @@ -803,15 +801,16 @@ func TestHandleEvent_MemberRemoved_MultipleAccounts(t *testing.T) { subs := store.getSubscriptions() assert.Empty(t, subs) - // No SubscriptionUpdateEvent publishes — room-worker handles that + // Re-publish to local ROOMS stream for search-sync-worker. records := pub.getRecords() - assert.Empty(t, records) + require.Len(t, records, 1) + assert.Equal(t, subject.RoomCanonicalMemberRemoved("site-test"), records[0].subject) } func TestHandleEvent_MemberRemoved_EmptyAccountsNoOp(t *testing.T) { store := &stubInboxStore{} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") memberEvt := model.MemberRemoveEvent{RoomID: "r1", Accounts: []string{}} payload, _ := json.Marshal(memberEvt) @@ -832,7 +831,7 @@ func (s *errorDeleteStore) DeleteSubscriptionsByAccounts(_ context.Context, _ st func TestHandleEvent_MemberRemoved_DeleteError(t *testing.T) { store := &errorDeleteStore{stubInboxStore: &stubInboxStore{}} pub := &mockPublisher{} - h := NewHandler(store, pub) + h := NewHandler(store, pub, "site-test") memberEvt := model.MemberRemoveEvent{RoomID: "r1", Accounts: []string{"alice"}} payload, _ := json.Marshal(memberEvt) diff --git a/inbox-worker/integration_test.go b/inbox-worker/integration_test.go index 73780c5e4..d999f1993 100644 --- a/inbox-worker/integration_test.go +++ b/inbox-worker/integration_test.go @@ -73,9 +73,9 @@ func TestInboxWorker_MemberAdded_Integration(t *testing.T) { // Create outbox event for member_added change := model.MemberAddEvent{ Type: "member_added", RoomID: "r1", Accounts: []string{"u2"}, SiteID: "site-b", - JoinedAt: time.Now().UTC().UnixMilli(), + JoinedAt: time.Now().UTC().UnixMilli(), HistorySharedSince: time.Now().UTC().UnixMilli(), - Timestamp: time.Now().UTC().UnixMilli(), + Timestamp: time.Now().UTC().UnixMilli(), } changeData, _ := json.Marshal(change) evt := model.OutboxEvent{Type: "member_added", SiteID: "site-a", DestSiteID: "site-b", Payload: changeData} diff --git a/inbox-worker/main.go b/inbox-worker/main.go index 203cdb534..edc7c17f9 100644 --- a/inbox-worker/main.go +++ b/inbox-worker/main.go @@ -163,7 +163,7 @@ func main() { } publisher := &natsPublisher{nc: nc} - handler := NewHandler(store, publisher) + handler := NewHandler(store, publisher, cfg.SiteID) cctx, err := cons.Consume(func(m oteljetstream.Msg) { if err := handler.HandleEvent(m.Context(), m.Data()); err != nil { diff --git a/pkg/model/event.go b/pkg/model/event.go index 2ea43e345..34dede9f2 100644 --- a/pkg/model/event.go +++ b/pkg/model/event.go @@ -68,6 +68,8 @@ type MemberAddEvent struct { RoomID string `json:"roomId" bson:"roomId"` Accounts []string `json:"accounts" bson:"accounts"` SiteID string `json:"siteId" bson:"siteId"` + RoomName string `json:"roomName" bson:"roomName"` + RoomType RoomType `json:"roomType" bson:"roomType"` JoinedAt int64 `json:"joinedAt" bson:"joinedAt"` HistorySharedSince int64 `json:"historySharedSince" bson:"historySharedSince"` Timestamp int64 `json:"timestamp" bson:"timestamp"` diff --git a/pkg/natsutil/ack.go b/pkg/natsutil/ack.go new file mode 100644 index 000000000..d074658a9 --- /dev/null +++ b/pkg/natsutil/ack.go @@ -0,0 +1,42 @@ +package natsutil + +import "log/slog" + +// Acker is the minimal JetStream message interface the Ack helper needs. +// Both `jetstream.Msg` (nats.go) and otel-wrapped variants +// (e.g. `oteljetstream.Msg`) satisfy it. +type Acker interface { + Ack() error +} + +// Naker is the minimal JetStream message interface the Nak helper needs. +// Same compatibility story as Acker. +type Naker interface { + Nak() error +} + +// Ack acks `msg` and logs any failure under a consistent structured-log +// shape (`reason` + `error`). `reason` is a short label describing WHY +// the message is being acked — e.g. "handler succeeded", "filtered", +// "malformed payload" — so operators can query logs by cause without +// parsing free-text phrases. +// +// Use this from every JetStream consumer in the repo rather than hand-rolling +// an `if err := msg.Ack(); err != nil { slog.Error(...) }` block. Consolidating +// the pattern gives us one place to add tracing spans, metrics counters, or +// delivery-context fields later, and keeps log keys consistent across services. +func Ack(msg Acker, reason string) { + if err := msg.Ack(); err != nil { + slog.Error("ack failed", "reason", reason, "error", err) + } +} + +// Nak naks `msg` for redelivery and logs any failure under the same +// structured-log shape as Ack. `reason` describes WHY the message is being +// redelivered — e.g. "handler error", "bulk index failure", "transient +// downstream error". +func Nak(msg Naker, reason string) { + if err := msg.Nak(); err != nil { + slog.Error("nak failed", "reason", reason, "error", err) + } +} diff --git a/pkg/natsutil/ack_test.go b/pkg/natsutil/ack_test.go new file mode 100644 index 000000000..85d560ae9 --- /dev/null +++ b/pkg/natsutil/ack_test.go @@ -0,0 +1,64 @@ +package natsutil_test + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/natsutil" +) + +// stubMsg is a tiny test double for the Acker/Naker interfaces. Records +// whether Ack/Nak was called and returns a configurable error. +type stubMsg struct { + ackCalled bool + nakCalled bool + ackErr error + nakErr error +} + +func (s *stubMsg) Ack() error { + s.ackCalled = true + return s.ackErr +} + +func (s *stubMsg) Nak() error { + s.nakCalled = true + return s.nakErr +} + +func TestAck_Success(t *testing.T) { + msg := &stubMsg{} + natsutil.Ack(msg, "handler succeeded") + assert.True(t, msg.ackCalled, "Ack() should be invoked on the message") +} + +func TestAck_ErrorIsLoggedNotReturned(t *testing.T) { + // Ack is fire-and-forget by design — any error from msg.Ack() is logged + // and swallowed so callers don't have to branch on it. The helper's + // contract is "try to ack; if it fails, log it and move on." + msg := &stubMsg{ackErr: errors.New("connection closed")} + natsutil.Ack(msg, "filtered") + assert.True(t, msg.ackCalled) +} + +func TestNak_Success(t *testing.T) { + msg := &stubMsg{} + natsutil.Nak(msg, "handler error") + assert.True(t, msg.nakCalled, "Nak() should be invoked on the message") +} + +func TestNak_ErrorIsLoggedNotReturned(t *testing.T) { + msg := &stubMsg{nakErr: errors.New("consumer deleted")} + natsutil.Nak(msg, "bulk failure") + assert.True(t, msg.nakCalled) +} + +// Compile-time checks that the stub implements the interfaces the helpers +// require — this is what lets production code pass `jetstream.Msg` and +// `oteljetstream.Msg` to the helpers without a wrapper. +var ( + _ natsutil.Acker = (*stubMsg)(nil) + _ natsutil.Naker = (*stubMsg)(nil) +) diff --git a/pkg/searchengine/adapter.go b/pkg/searchengine/adapter.go index 992a83822..f43b6301b 100644 --- a/pkg/searchengine/adapter.go +++ b/pkg/searchengine/adapter.go @@ -72,6 +72,13 @@ func (a *httpAdapter) Bulk(ctx context.Context, actions []BulkAction) ([]BulkRes line, _ := json.Marshal(map[string]bulkActionMeta{"delete": meta}) buf.Write(line) buf.WriteByte('\n') + case ActionUpdate: + updateMeta := bulkActionMeta{Index: action.Index, ID: action.DocID} + line, _ := json.Marshal(map[string]bulkActionMeta{"update": updateMeta}) + buf.Write(line) + buf.WriteByte('\n') + buf.Write(action.Doc) + buf.WriteByte('\n') } } @@ -101,8 +108,9 @@ func (a *httpAdapter) Bulk(ctx context.Context, actions []BulkAction) ([]BulkRes for i, item := range bulkResp.Items { for _, detail := range item { results[i] = BulkResult{ - Status: detail.Status, - Error: detail.Error.Reason, + Status: detail.Status, + ErrorType: detail.Error.Type, + Error: detail.Error.Reason, } } } diff --git a/pkg/searchengine/searchengine.go b/pkg/searchengine/searchengine.go index dad971428..1df0a3267 100644 --- a/pkg/searchengine/searchengine.go +++ b/pkg/searchengine/searchengine.go @@ -18,21 +18,33 @@ type ActionType string const ( ActionIndex ActionType = "index" ActionDelete ActionType = "delete" + ActionUpdate ActionType = "update" ) // BulkAction represents a single action in a bulk request. +// +// For ActionUpdate, Doc contains the full ES update body (doc / script / +// upsert) and Version is ignored. The _update operation is read-modify-write +// on the ES side and does not accept `version`/`version_type=external`; that +// parameter pair is only valid for `index` (full-document replacement). type BulkAction struct { Action ActionType Index string DocID string - Version int64 // used as ES external version - Doc json.RawMessage // nil for delete actions + Version int64 // used as ES external version (ignored for ActionUpdate) + Doc json.RawMessage // index: full doc; update: update body; delete: nil } // BulkResult represents the result of a single bulk action item. +// +// ErrorType is the ES error type string (e.g., `document_missing_exception`, +// `index_not_found_exception`) when the item failed with an error block. +// Empty on 2xx success and on delete-404 responses (delete of a missing doc +// sets `result:"not_found"` without an error block). type BulkResult struct { - Status int - Error string + Status int + ErrorType string + Error string } // SearchEngine defines domain operations for search indexing. diff --git a/pkg/subject/subject.go b/pkg/subject/subject.go index 8ac820a65..592e8fe85 100644 --- a/pkg/subject/subject.go +++ b/pkg/subject/subject.go @@ -153,6 +153,21 @@ func RoomCanonicalWildcard(siteID string) string { return fmt.Sprintf("chat.room.canonical.%s.>", siteID) } +func RoomCanonicalMemberAdded(siteID string) string { + return RoomCanonical(siteID, "member_added") +} + +func RoomCanonicalMemberRemoved(siteID string) string { + return RoomCanonical(siteID, "member_removed") +} + +func RoomCanonicalMemberEventSubjects(siteID string) []string { + return []string{ + RoomCanonicalMemberAdded(siteID), + RoomCanonicalMemberRemoved(siteID), + } +} + func MsgHistoryWildcard(siteID string) string { return fmt.Sprintf("chat.user.*.request.room.*.%s.msg.history", siteID) } diff --git a/room-worker/handler.go b/room-worker/handler.go index bc7cb63cd..b07a7bbfc 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -103,8 +103,33 @@ func (h *Handler) processInvite(ctx context.Context, data []byte) error { slog.Error("subscription update publish failed", "error", err) } - // Notify all existing members: room metadata changed + // Publish MemberAddEvent for search-sync-worker (spotlight + user-room indexes). + // The legacy invite path creates one subscription at a time. Room metadata + // is loaded below for the metadata-update broadcast; reuse it here. room, err := h.store.GetRoom(ctx, req.RoomID) + if err == nil { + var historySharedSince int64 + if sub.HistorySharedSince != nil { + historySharedSince = sub.HistorySharedSince.UnixMilli() + } + inviteAddEvt := model.MemberAddEvent{ + Type: "member_added", + RoomID: req.RoomID, + Accounts: []string{req.InviteeAccount}, + SiteID: room.SiteID, + RoomName: room.Name, + RoomType: room.Type, + JoinedAt: now.UnixMilli(), + HistorySharedSince: historySharedSince, + Timestamp: now.UnixMilli(), + } + inviteAddData, _ := json.Marshal(inviteAddEvt) + if err := h.publish(ctx, subject.RoomCanonicalMemberAdded(h.siteID), inviteAddData); err != nil { + slog.Error("room canonical member_added publish failed (invite path)", "error", err, "roomID", req.RoomID) + } + } + + // Notify all existing members: room metadata changed if err == nil { metaEvt := model.RoomMetadataUpdateEvent{ RoomID: req.RoomID, @@ -282,6 +307,9 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove if err := h.publish(ctx, subject.MemberEvent(req.RoomID), memberEvtData); err != nil { slog.Error("member event publish failed", "error", err, "roomID", req.RoomID) } + if err := h.publish(ctx, subject.RoomCanonicalMemberRemoved(h.siteID), memberEvtData); err != nil { + slog.Error("room canonical member_removed publish failed", "error", err, "roomID", req.RoomID) + } // System message sysMsgUser := model.SysMsgUser{ @@ -402,6 +430,9 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR if err := h.publish(ctx, subject.MemberEvent(req.RoomID), memberEvtData); err != nil { slog.Error("member event publish failed", "error", err, "roomID", req.RoomID) } + if err := h.publish(ctx, subject.RoomCanonicalMemberRemoved(h.siteID), memberEvtData); err != nil { + slog.Error("room canonical member_removed publish failed", "error", err, "roomID", req.RoomID) + } } // System message @@ -629,6 +660,8 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { RoomID: req.RoomID, Accounts: actualAccounts, SiteID: room.SiteID, + RoomName: room.Name, + RoomType: room.Type, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), @@ -637,6 +670,9 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { if err := h.publish(ctx, subject.RoomMemberEvent(req.RoomID), memberAddData); err != nil { slog.Error("member add event publish failed", "error", err, "roomID", req.RoomID) } + if err := h.publish(ctx, subject.RoomCanonicalMemberAdded(h.siteID), memberAddData); err != nil { + slog.Error("room canonical member_added publish failed", "error", err, "roomID", req.RoomID) + } membersAdded := model.MembersAdded{ Individuals: actualAccounts, @@ -680,6 +716,8 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error { RoomID: req.RoomID, Accounts: accounts, SiteID: room.SiteID, + RoomName: room.Name, + RoomType: room.Type, JoinedAt: req.Timestamp, HistorySharedSince: historySharedSince, Timestamp: now.UnixMilli(), diff --git a/room-worker/handler_test.go b/room-worker/handler_test.go index 0f72558d1..d91d56a47 100644 --- a/room-worker/handler_test.go +++ b/room-worker/handler_test.go @@ -412,7 +412,7 @@ func TestHandler_ProcessRemoveMember_SelfLeave_IndividualOnly(t *testing.T) { require.NoError(t, err) // Expect: subscription update + member change event + system message = 3 publishes - assert.Len(t, published, 3, "expected 3 publishes: sub update, member event, sys msg") + assert.Len(t, published, 4, "expected 4 publishes: sub update, member event, room canonical member_removed, sys msg") subjSet := make(map[string]bool) for _, p := range published { @@ -585,7 +585,7 @@ func TestHandler_ProcessRemoveMember_OwnerRemovesIndividual(t *testing.T) { err := h.processRemoveMember(context.Background(), data) require.NoError(t, err) - assert.Len(t, published, 3, "expected 3 publishes: sub update, member event, sys msg") + assert.Len(t, published, 4, "expected 4 publishes: sub update, member event, room canonical member_removed, sys msg") // Verify the sys msg has type "member_removed" for _, p := range published { @@ -850,7 +850,7 @@ func TestHandler_ProcessRemoveMember_OwnerRemovesOrg(t *testing.T) { require.NoError(t, err) // Expect: 2 sub updates (carol, dave) + 1 member event + 1 sys msg = 4 publishes - assert.Len(t, published, 4, "expected 4 publishes: 2 sub updates, member event, sys msg") + assert.Len(t, published, 5, "expected 5 publishes: 2 sub updates, member event, room canonical member_removed, sys msg") subjSet := make(map[string]bool) for _, p := range published { @@ -909,7 +909,7 @@ func TestHandler_ProcessRemoveMember_CrossSiteOutbox(t *testing.T) { require.NoError(t, err) // Expect: sub update + member event + sys msg + outbox = 4 publishes - assert.Len(t, published, 4, "expected 4 publishes including outbox for federated user") + assert.Len(t, published, 5, "expected 5 publishes: sub update, member event, room canonical member_removed, sys msg, outbox") outboxSubj := subject.Outbox(localSite, userSite, "member_removed") subjSet := make(map[string]bool) diff --git a/search-sync-worker/collection.go b/search-sync-worker/collection.go index 70589e563..772325d05 100644 --- a/search-sync-worker/collection.go +++ b/search-sync-worker/collection.go @@ -3,22 +3,18 @@ package main import ( "encoding/json" + "github.com/nats-io/nats.go/jetstream" + "github.com/hmchangw/chat/pkg/searchengine" - "github.com/hmchangw/chat/pkg/stream" ) // Collection defines a search-indexable data source. Each collection // encapsulates its own stream config, ES template, and document mapping. -// To add a new collection (e.g., room search), implement this interface. type Collection interface { - // StreamConfig returns the JetStream stream to consume from. - StreamConfig(siteID string) stream.Config - // ConsumerName returns the durable consumer name for this collection. + StreamConfig(siteID string) jetstream.StreamConfig ConsumerName() string - // TemplateName returns the ES index template name. + FilterSubjects(siteID string) []string TemplateName() string - // TemplateBody returns the ES index template JSON. TemplateBody() json.RawMessage - // BuildAction converts raw JetStream message data into a BulkAction. - BuildAction(data []byte) (searchengine.BulkAction, error) + BuildAction(data []byte) ([]searchengine.BulkAction, error) } diff --git a/search-sync-worker/handler.go b/search-sync-worker/handler.go index 0010fc094..7777af745 100644 --- a/search-sync-worker/handler.go +++ b/search-sync-worker/handler.go @@ -7,113 +7,149 @@ import ( "github.com/nats-io/nats.go/jetstream" + "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/searchengine" ) -type bufferedMsg struct { - action searchengine.BulkAction - jsMsg jetstream.Msg +type pendingMsg struct { + jsMsg jetstream.Msg + actionStart int + actionCount int } -// Handler buffers JetStream messages and flushes them as ES bulk requests. type Handler struct { store Store collection Collection - batchSize int + bulkSize int mu sync.Mutex - buffer []bufferedMsg + pending []pendingMsg + actions []searchengine.BulkAction } -// NewHandler creates a Handler with the given store, collection, and batch size. -func NewHandler(store Store, collection Collection, batchSize int) *Handler { +func NewHandler(store Store, collection Collection, bulkSize int) *Handler { return &Handler{ store: store, collection: collection, - batchSize: batchSize, - buffer: make([]bufferedMsg, 0, batchSize), + bulkSize: bulkSize, + pending: make([]pendingMsg, 0, bulkSize), + actions: make([]searchengine.BulkAction, 0, bulkSize), } } -// Add parses a JetStream message via the collection and adds it to the buffer. func (h *Handler) Add(msg jetstream.Msg) { - action, err := h.collection.BuildAction(msg.Data()) + actions, err := h.collection.BuildAction(msg.Data()) if err != nil { slog.Error("build action", "error", err) - if ackErr := msg.Ack(); ackErr != nil { - slog.Error("ack malformed message", "error", ackErr) - } + natsutil.Ack(msg, "build action failed") + return + } + + if len(actions) == 0 { + natsutil.Ack(msg, "filtered, no actions") return } h.mu.Lock() - h.buffer = append(h.buffer, bufferedMsg{action: action, jsMsg: msg}) + h.pending = append(h.pending, pendingMsg{ + jsMsg: msg, + actionStart: len(h.actions), + actionCount: len(actions), + }) + h.actions = append(h.actions, actions...) h.mu.Unlock() } -// Flush sends all buffered actions to ES and acks/naks per item. func (h *Handler) Flush(ctx context.Context) { h.mu.Lock() - if len(h.buffer) == 0 { + if len(h.pending) == 0 { h.mu.Unlock() return } - items := h.buffer - h.buffer = make([]bufferedMsg, 0, h.batchSize) + pending := h.pending + actions := h.actions + h.pending = make([]pendingMsg, 0, h.bulkSize) + h.actions = make([]searchengine.BulkAction, 0, h.bulkSize) h.mu.Unlock() - actions := make([]searchengine.BulkAction, len(items)) - for i, item := range items { - actions[i] = item.action - } - results, err := h.store.Bulk(ctx, actions) if err != nil { - slog.Error("bulk request failed", "error", err, "count", len(items)) - for _, item := range items { - if nakErr := item.jsMsg.Nak(); nakErr != nil { - slog.Error("nak failed", "error", nakErr) - } - } + slog.Error("bulk request failed", "error", err, "actions", len(actions)) + nakAll(pending, "bulk request failed") return } - if len(results) != len(items) { - // Defensive guard for a protocol-level anomaly: ES bulk API normally - // returns one result per input action in input order. Nak-all is safe - // because of external versioning — on redelivery, any items already - // indexed return 409 (handled as ack below), and failed items get - // re-indexed. No duplicate processing, no lost events. - slog.Error("bulk result count mismatch", "expected", len(items), "actual", len(results)) - for _, item := range items { - if nakErr := item.jsMsg.Nak(); nakErr != nil { - slog.Error("nak failed", "error", nakErr) - } - } + if len(results) != len(actions) { + slog.Error("bulk result count mismatch", "expected", len(actions), "actual", len(results)) + nakAll(pending, "bulk result count mismatch") return } - for i, result := range results { - if result.Status == 409 || (result.Status >= 200 && result.Status < 300) { - if ackErr := items[i].jsMsg.Ack(); ackErr != nil { - slog.Error("ack failed", "error", ackErr) + for _, p := range pending { + allOK := true + for i := p.actionStart; i < p.actionStart+p.actionCount; i++ { + if isBulkItemSuccess(actions[i].Action, results[i]) { + continue } + allOK = false + slog.Error("bulk item failed", + "status", results[i].Status, + "error", results[i].Error, + "docID", actions[i].DocID, + "index", actions[i].Index, + ) + break + } + if allOK { + natsutil.Ack(p.jsMsg, "bulk actions succeeded") } else { - slog.Error("bulk item failed", "status", result.Status, "error", result.Error, "docID", items[i].action.DocID) - if nakErr := items[i].jsMsg.Nak(); nakErr != nil { - slog.Error("nak failed", "error", nakErr) - } + natsutil.Nak(p.jsMsg, "bulk action failed") } } } -// BufferLen returns the current buffer size. -func (h *Handler) BufferLen() int { +const ( + esErrDocumentMissing = "document_missing_exception" + esErrIndexNotFound = "index_not_found_exception" +) + +func isBulkItemSuccess(action searchengine.ActionType, result searchengine.BulkResult) bool { + if result.Status >= 200 && result.Status < 300 { + return true + } + if result.Status == 409 { + // External versioning rejected a stale write. Success for ActionIndex + // and ActionDelete (desired state already reached). For ActionUpdate, + // 409 means ES internal OCC conflict (seq_no mismatch) — the painless + // script didn't run, so we need a retry via JetStream redelivery. + return action != searchengine.ActionUpdate + } + if result.Status == 404 { + switch action { + case searchengine.ActionDelete: + return result.ErrorType == "" + case searchengine.ActionUpdate: + return result.ErrorType == esErrDocumentMissing + case searchengine.ActionIndex: + return false + } + } + return false +} + +func nakAll(pending []pendingMsg, reason string) { + for _, p := range pending { + natsutil.Nak(p.jsMsg, reason) + } +} + +func MessageCount(h *Handler) int { h.mu.Lock() defer h.mu.Unlock() - return len(h.buffer) + return len(h.pending) } -// BufferFull returns true if the buffer has reached batch size. -func (h *Handler) BufferFull() bool { - return h.BufferLen() >= h.batchSize +func (h *Handler) ActionCount() int { + h.mu.Lock() + defer h.mu.Unlock() + return len(h.actions) } diff --git a/search-sync-worker/handler_test.go b/search-sync-worker/handler_test.go index 93367a50d..bb0dcdbfd 100644 --- a/search-sync-worker/handler_test.go +++ b/search-sync-worker/handler_test.go @@ -60,7 +60,7 @@ func TestHandler_Add(t *testing.T) { msg := makeStubMsg(t, &evt) h.Add(msg) - assert.Equal(t, 1, h.BufferLen()) + assert.Equal(t, 1, h.ActionCount()) } func TestHandler_Add_MalformedJSON(t *testing.T) { @@ -70,7 +70,7 @@ func TestHandler_Add_MalformedJSON(t *testing.T) { msg := &stubMsg{data: []byte("{invalid")} h.Add(msg) - assert.Equal(t, 0, h.BufferLen()) + assert.Equal(t, 0, h.ActionCount()) assert.True(t, msg.acked) } @@ -99,7 +99,7 @@ func TestHandler_Flush(t *testing.T) { assert.True(t, msg.acked) assert.False(t, msg.nacked) - assert.Equal(t, 0, h.BufferLen()) + assert.Equal(t, 0, h.ActionCount()) }) t.Run("version conflict (409) — acked not nacked", func(t *testing.T) { @@ -153,7 +153,7 @@ func TestHandler_Flush(t *testing.T) { assert.True(t, msg1.nacked) assert.True(t, msg2.nacked) - assert.Equal(t, 0, h.BufferLen()) + assert.Equal(t, 0, h.ActionCount()) }) t.Run("empty flush is no-op", func(t *testing.T) { @@ -161,7 +161,7 @@ func TestHandler_Flush(t *testing.T) { store := NewMockStore(ctrl) h := NewHandler(store, newMessageCollection("msgs-v1"), 500) h.Flush(context.Background()) - assert.Equal(t, 0, h.BufferLen()) + assert.Equal(t, 0, h.ActionCount()) }) t.Run("mixed results — per-item ack/nak", func(t *testing.T) { diff --git a/search-sync-worker/main.go b/search-sync-worker/main.go index 2880c72a0..663e79802 100644 --- a/search-sync-worker/main.go +++ b/search-sync-worker/main.go @@ -18,15 +18,26 @@ import ( "github.com/hmchangw/chat/pkg/shutdown" ) +// bootstrapConfig groups fields meaningful ONLY in dev / integration tests. +// In production, streams are owned by their publisher services and +// search-sync-worker only manages its own durable consumers. +type bootstrapConfig struct { + Enabled bool `env:"STREAMS" envDefault:"false"` +} + type config struct { - NatsURL string `env:"NATS_URL,required"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID,required"` - SearchURL string `env:"SEARCH_URL,required"` - SearchBackend string `env:"SEARCH_BACKEND" envDefault:"elasticsearch"` - MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"` - BatchSize int `env:"BATCH_SIZE" envDefault:"500"` - FlushInterval int `env:"FLUSH_INTERVAL" envDefault:"5"` + NatsURL string `env:"NATS_URL,required"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID,required"` + SearchURL string `env:"SEARCH_URL,required"` + SearchBackend string `env:"SEARCH_BACKEND" envDefault:"elasticsearch"` + MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"` + SpotlightIndex string `env:"SPOTLIGHT_INDEX" envDefault:""` + UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` + FetchBatchSize int `env:"FETCH_BATCH_SIZE" envDefault:"100"` + BulkBatchSize int `env:"BULK_BATCH_SIZE" envDefault:"500"` + BulkFlushInterval int `env:"BULK_FLUSH_INTERVAL" envDefault:"5"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -38,6 +49,26 @@ func main() { os.Exit(1) } + if cfg.SpotlightIndex == "" { + cfg.SpotlightIndex = fmt.Sprintf("spotlight-%s-v1-chat", cfg.SiteID) + } + if cfg.UserRoomIndex == "" { + cfg.UserRoomIndex = fmt.Sprintf("user-room-%s", cfg.SiteID) + } + + if cfg.FetchBatchSize <= 0 { + slog.Error("invalid config", "name", "FETCH_BATCH_SIZE", "value", cfg.FetchBatchSize, "reason", "must be > 0") + os.Exit(1) + } + if cfg.BulkBatchSize <= 0 { + slog.Error("invalid config", "name", "BULK_BATCH_SIZE", "value", cfg.BulkBatchSize, "reason", "must be > 0") + os.Exit(1) + } + if cfg.BulkFlushInterval <= 0 { + slog.Error("invalid config", "name", "BULK_FLUSH_INTERVAL", "value", cfg.BulkFlushInterval, "reason", "must be > 0") + os.Exit(1) + } + ctx := context.Background() tracerShutdown, err := otelutil.InitTracer(ctx, "search-sync-worker") @@ -52,15 +83,24 @@ func main() { os.Exit(1) } - coll := newMessageCollection(cfg.MsgIndexPrefix) + collections := []Collection{ + newMessageCollection(cfg.MsgIndexPrefix), + newSpotlightCollection(cfg.SpotlightIndex), + newUserRoomCollection(cfg.UserRoomIndex), + } - tmplName := coll.TemplateName() - tmplBody := coll.TemplateBody() - if err := engine.UpsertTemplate(ctx, tmplName, tmplBody); err != nil { - slog.Error("upsert index template failed", "error", err) - os.Exit(1) + for _, coll := range collections { + name := coll.TemplateName() + body := coll.TemplateBody() + if name == "" || body == nil { + continue + } + if err := engine.UpsertTemplate(ctx, name, body); err != nil { + slog.Error("upsert index template failed", "template", name, "error", err) + os.Exit(1) + } + slog.Info("index template upserted", "name", name) } - slog.Info("index template upserted", "name", tmplName) nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) if err != nil { @@ -73,34 +113,63 @@ func main() { os.Exit(1) } - canonicalCfg := coll.StreamConfig(cfg.SiteID) - if _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ - Name: canonicalCfg.Name, - Subjects: canonicalCfg.Subjects, - }); err != nil { - slog.Error("create MESSAGES_CANONICAL stream failed", "error", err) - os.Exit(1) - } + bulkFlushInterval := time.Duration(cfg.BulkFlushInterval) * time.Second + stopCh := make(chan struct{}) + doneChs := make([]chan struct{}, 0, len(collections)) - cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ - Durable: coll.ConsumerName(), - AckPolicy: jetstream.AckExplicitPolicy, - BackOff: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, - }) - if err != nil { - slog.Error("create consumer failed", "error", err) - os.Exit(1) - } + createdStreams := make(map[string]struct{}, len(collections)) - handler := NewHandler(&engineAdapter{engine: engine}, coll, cfg.BatchSize) + for _, coll := range collections { + streamCfg := coll.StreamConfig(cfg.SiteID) + if cfg.Bootstrap.Enabled { + if _, alreadyCreated := createdStreams[streamCfg.Name]; !alreadyCreated { + if _, err := js.CreateOrUpdateStream(ctx, streamCfg); err != nil { + slog.Error("create stream failed", "stream", streamCfg.Name, "error", err) + os.Exit(1) + } + createdStreams[streamCfg.Name] = struct{}{} + slog.Info("stream bootstrapped", "stream", streamCfg.Name) + } + } - flushInterval := time.Duration(cfg.FlushInterval) * time.Second - stopCh := make(chan struct{}) - doneCh := make(chan struct{}) + consumerCfg := jetstream.ConsumerConfig{ + Durable: coll.ConsumerName(), + AckPolicy: jetstream.AckExplicitPolicy, + BackOff: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, + } + if filters := coll.FilterSubjects(cfg.SiteID); len(filters) > 0 { + consumerCfg.FilterSubjects = filters + } + cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, consumerCfg) + if err != nil { + slog.Error("create consumer failed", + "stream", streamCfg.Name, + "consumer", coll.ConsumerName(), + "error", err, + ) + os.Exit(1) + } - go runConsumer(ctx, cons, handler, cfg.BatchSize, flushInterval, stopCh, doneCh) + handler := NewHandler(&engineAdapter{engine: engine}, coll, cfg.BulkBatchSize) + doneCh := make(chan struct{}) + doneChs = append(doneChs, doneCh) - slog.Info("search-sync-worker running", "site", cfg.SiteID, "prefix", cfg.MsgIndexPrefix) + slog.Info("collection wired", + "stream", streamCfg.Name, + "consumer", coll.ConsumerName(), + "filters", consumerCfg.FilterSubjects, + ) + + go runConsumer(ctx, cons, handler, cfg.FetchBatchSize, cfg.BulkBatchSize, bulkFlushInterval, stopCh, doneCh) + } + + slog.Info("search-sync-worker running", + "site", cfg.SiteID, + "msgPrefix", cfg.MsgIndexPrefix, + "spotlightIndex", cfg.SpotlightIndex, + "userRoomIndex", cfg.UserRoomIndex, + "collections", len(collections), + ) shutdown.Wait(ctx, 25*time.Second, func(ctx context.Context) error { @@ -108,22 +177,29 @@ func main() { return nil }, func(ctx context.Context) error { - select { - case <-doneCh: - return nil - case <-ctx.Done(): - return fmt.Errorf("consumer loop drain timed out: %w", ctx.Err()) + for _, ch := range doneChs { + select { + case <-ch: + case <-ctx.Done(): + return fmt.Errorf("consumer loop drain timed out: %w", ctx.Err()) + } } + return nil }, func(ctx context.Context) error { return tracerShutdown(ctx) }, func(ctx context.Context) error { return nc.Drain() }, ) } -// runConsumer is the batch-flush consumer loop for a single collection. -// It fetches messages from the JetStream consumer, adds them to the handler buffer, -// and flushes when the buffer is full or the flush interval elapses. -func runConsumer(ctx context.Context, cons oteljetstream.Consumer, handler *Handler, batchSize int, flushInterval time.Duration, stopCh <-chan struct{}, doneCh chan<- struct{}) { +func runConsumer( + ctx context.Context, + cons oteljetstream.Consumer, + handler *Handler, + fetchBatchSize, bulkBatchSize int, + bulkFlushInterval time.Duration, + stopCh <-chan struct{}, + doneCh chan<- struct{}, +) { defer close(doneCh) lastFlush := time.Now() @@ -135,12 +211,18 @@ func runConsumer(ctx context.Context, cons oteljetstream.Consumer, handler *Hand default: } - fetchSize := batchSize - handler.BufferLen() - if fetchSize <= 0 { - fetchSize = 1 + remaining := bulkBatchSize - handler.ActionCount() + if remaining <= 0 { + handler.Flush(ctx) + lastFlush = time.Now() + continue + } + fetchCount := fetchBatchSize + if fetchCount > remaining { + fetchCount = remaining } - batch, err := cons.Fetch(fetchSize, jetstream.FetchMaxWait(time.Second)) + batch, err := cons.Fetch(fetchCount, jetstream.FetchMaxWait(time.Second)) if err != nil { select { case <-stopCh: @@ -148,7 +230,7 @@ func runConsumer(ctx context.Context, cons oteljetstream.Consumer, handler *Hand return default: } - if handler.BufferLen() > 0 && time.Since(lastFlush) >= flushInterval { + if handler.ActionCount() > 0 && time.Since(lastFlush) >= bulkFlushInterval { handler.Flush(ctx) lastFlush = time.Now() } @@ -157,19 +239,22 @@ func runConsumer(ctx context.Context, cons oteljetstream.Consumer, handler *Hand for msg := range batch.Messages() { handler.Add(msg.Msg) + if handler.ActionCount() >= bulkBatchSize { + handler.Flush(ctx) + lastFlush = time.Now() + } } - if handler.BufferFull() { + if handler.ActionCount() >= bulkBatchSize { handler.Flush(ctx) lastFlush = time.Now() - } else if handler.BufferLen() > 0 && time.Since(lastFlush) >= flushInterval { + } else if handler.ActionCount() > 0 && time.Since(lastFlush) >= bulkFlushInterval { handler.Flush(ctx) lastFlush = time.Now() } } } -// engineAdapter adapts searchengine.SearchEngine to the Handler's Store interface. type engineAdapter struct { engine searchengine.SearchEngine } diff --git a/search-sync-worker/messages.go b/search-sync-worker/messages.go index a05085182..13f86ca53 100644 --- a/search-sync-worker/messages.go +++ b/search-sync-worker/messages.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/nats-io/nats.go/jetstream" + "github.com/hmchangw/chat/pkg/model" "github.com/hmchangw/chat/pkg/searchengine" "github.com/hmchangw/chat/pkg/stream" @@ -21,12 +23,17 @@ func newMessageCollection(indexPrefix string) *messageCollection { return &messageCollection{indexPrefix: indexPrefix} } -func (c *messageCollection) StreamConfig(siteID string) stream.Config { - return stream.MessagesCanonical(siteID) +func (c *messageCollection) StreamConfig(siteID string) jetstream.StreamConfig { + cfg := stream.MessagesCanonical(siteID) + return jetstream.StreamConfig{Name: cfg.Name, Subjects: cfg.Subjects} } func (c *messageCollection) ConsumerName() string { - return "search-sync-worker" + return "message-sync" +} + +func (c *messageCollection) FilterSubjects(_ string) []string { + return nil } func (c *messageCollection) TemplateName() string { @@ -37,21 +44,21 @@ func (c *messageCollection) TemplateBody() json.RawMessage { return messageTemplateBody(c.indexPrefix) } -func (c *messageCollection) BuildAction(data []byte) (searchengine.BulkAction, error) { +func (c *messageCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { var evt model.MessageEvent if err := json.Unmarshal(data, &evt); err != nil { - return searchengine.BulkAction{}, fmt.Errorf("unmarshal message event: %w", err) + return nil, fmt.Errorf("unmarshal message event: %w", err) } if evt.Message.ID == "" { - return searchengine.BulkAction{}, fmt.Errorf("build message action: missing message id") + return nil, fmt.Errorf("build message action: missing message id") } if evt.Message.CreatedAt.IsZero() { - return searchengine.BulkAction{}, fmt.Errorf("build message action: missing createdAt") + return nil, fmt.Errorf("build message action: missing createdAt") } if evt.Timestamp <= 0 { - return searchengine.BulkAction{}, fmt.Errorf("build message action: missing timestamp") + return nil, fmt.Errorf("build message action: missing timestamp") } - return buildMessageAction(&evt, c.indexPrefix), nil + return []searchengine.BulkAction{buildMessageAction(&evt, c.indexPrefix)}, nil } // --- Message-specific internals --- diff --git a/search-sync-worker/messages_test.go b/search-sync-worker/messages_test.go index 84619973e..b683b2f0e 100644 --- a/search-sync-worker/messages_test.go +++ b/search-sync-worker/messages_test.go @@ -57,7 +57,7 @@ func TestMessageCollection_StreamConfig(t *testing.T) { func TestMessageCollection_ConsumerName(t *testing.T) { coll := newMessageCollection("msgs-v1") - assert.Equal(t, "search-sync-worker", coll.ConsumerName()) + assert.Equal(t, "message-sync", coll.ConsumerName()) } func TestIndexName(t *testing.T) { @@ -219,11 +219,12 @@ func TestMessageCollection_BuildAction(t *testing.T) { } data, _ := json.Marshal(evt) - action, err := coll.BuildAction(data) + actions, err := coll.BuildAction(data) require.NoError(t, err) - assert.Equal(t, searchengine.ActionIndex, action.Action) - assert.Equal(t, "msgs-v1-2026-01", action.Index) - assert.Equal(t, "m1", action.DocID) + require.Len(t, actions, 1) + assert.Equal(t, searchengine.ActionIndex, actions[0].Action) + assert.Equal(t, "msgs-v1-2026-01", actions[0].Index) + assert.Equal(t, "m1", actions[0].DocID) t.Run("malformed JSON returns error", func(t *testing.T) { _, err := coll.BuildAction([]byte("{invalid")) diff --git a/search-sync-worker/room_member.go b/search-sync-worker/room_member.go new file mode 100644 index 000000000..9c355dcdd --- /dev/null +++ b/search-sync-worker/room_member.go @@ -0,0 +1,97 @@ +package main + +import ( + "encoding/json" + "fmt" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) + +// roomMemberCollection is the shared base for collections that index +// subscription lifecycle events (member_added, member_removed) from the +// ROOMS stream. Room-worker publishes enriched MemberAddEvent / +// MemberRemoveEvent to `chat.room.canonical.{site}.member_added/removed` +// which lands in the ROOMS_{siteID} stream. For cross-site events, +// inbox-worker re-publishes to the local ROOMS stream after creating +// subscriptions. +type roomMemberCollection struct{} + +func (b *roomMemberCollection) StreamConfig(siteID string) jetstream.StreamConfig { + c := stream.Rooms(siteID) + return jetstream.StreamConfig{Name: c.Name, Subjects: c.Subjects} +} + +func (b *roomMemberCollection) FilterSubjects(siteID string) []string { + return subject.RoomCanonicalMemberEventSubjects(siteID) +} + +// memberEvent is a tagged union returned by parseMemberEvent. Exactly one +// of Add or Remove is non-nil. +type memberEvent struct { + Add *model.MemberAddEvent + Remove *model.MemberRemoveEvent +} + +// parseMemberEvent peeks at the `type` field in the raw JSON, then +// unmarshals into the appropriate event struct. Returns an error for +// unknown types so mispublished messages don't silently reach the wrong +// indexing path. +func parseMemberEvent(data []byte) (*memberEvent, error) { + var peek struct { + Type string `json:"type"` + } + if err := json.Unmarshal(data, &peek); err != nil { + return nil, fmt.Errorf("unmarshal member event type: %w", err) + } + + switch peek.Type { + case "member_added": + var evt model.MemberAddEvent + if err := json.Unmarshal(data, &evt); err != nil { + return nil, fmt.Errorf("unmarshal member_added event: %w", err) + } + if evt.Timestamp <= 0 { + return nil, fmt.Errorf("parse member_added event: missing timestamp") + } + if evt.RoomID == "" { + return nil, fmt.Errorf("parse member_added event: missing roomID") + } + if len(evt.Accounts) == 0 { + return nil, fmt.Errorf("parse member_added event: empty accounts") + } + for i, account := range evt.Accounts { + if account == "" { + return nil, fmt.Errorf("parse member_added event: empty account at index %d", i) + } + } + return &memberEvent{Add: &evt}, nil + + case "member_removed", "member_left": + var evt model.MemberRemoveEvent + if err := json.Unmarshal(data, &evt); err != nil { + return nil, fmt.Errorf("unmarshal member_removed event: %w", err) + } + if evt.Timestamp <= 0 { + return nil, fmt.Errorf("parse member_removed event: missing timestamp") + } + if evt.RoomID == "" { + return nil, fmt.Errorf("parse member_removed event: missing roomID") + } + if len(evt.Accounts) == 0 { + return nil, fmt.Errorf("parse member_removed event: empty accounts") + } + for i, account := range evt.Accounts { + if account == "" { + return nil, fmt.Errorf("parse member_removed event: empty account at index %d", i) + } + } + return &memberEvent{Remove: &evt}, nil + + default: + return nil, fmt.Errorf("parse member event: unsupported type %q", peek.Type) + } +} diff --git a/search-sync-worker/spotlight.go b/search-sync-worker/spotlight.go new file mode 100644 index 000000000..d64c0e8c1 --- /dev/null +++ b/search-sync-worker/spotlight.go @@ -0,0 +1,129 @@ +package main + +import ( + "encoding/json" + "fmt" + + "github.com/hmchangw/chat/pkg/searchengine" +) + +type spotlightCollection struct { + roomMemberCollection + indexName string +} + +func newSpotlightCollection(indexName string) *spotlightCollection { + return &spotlightCollection{indexName: indexName} +} + +func (c *spotlightCollection) ConsumerName() string { + return "spotlight-sync" +} + +func (c *spotlightCollection) TemplateName() string { + return "spotlight_template" +} + +func (c *spotlightCollection) TemplateBody() json.RawMessage { + return spotlightTemplateBody(c.indexName) +} + +func (c *spotlightCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { + evt, err := parseMemberEvent(data) + if err != nil { + return nil, err + } + + if evt.Add != nil { + add := evt.Add + if add.HistorySharedSince > 0 { + return nil, nil + } + actions := make([]searchengine.BulkAction, 0, len(add.Accounts)) + for _, account := range add.Accounts { + doc := SpotlightSearchIndex{ + UserAccount: account, + RoomID: add.RoomID, + RoomName: add.RoomName, + RoomType: string(add.RoomType), + SiteID: add.SiteID, + JoinedAt: add.JoinedAt, + } + body, err := json.Marshal(doc) + if err != nil { + return nil, fmt.Errorf("marshal spotlight doc: %w", err) + } + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionIndex, + Index: c.indexName, + DocID: spotlightDocID(account, add.RoomID), + Version: add.Timestamp, + Doc: body, + }) + } + return actions, nil + } + + if evt.Remove != nil { + rm := evt.Remove + actions := make([]searchengine.BulkAction, 0, len(rm.Accounts)) + for _, account := range rm.Accounts { + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionDelete, + Index: c.indexName, + DocID: spotlightDocID(account, rm.RoomID), + Version: rm.Timestamp, + }) + } + return actions, nil + } + + return nil, fmt.Errorf("spotlight: no add or remove event parsed") +} + +func spotlightDocID(account, roomID string) string { + return account + "_" + roomID +} + +type SpotlightSearchIndex struct { + UserAccount string `json:"userAccount" es:"keyword"` + RoomID string `json:"roomId" es:"keyword"` + RoomName string `json:"roomName" es:"search_as_you_type,custom_analyzer"` + RoomType string `json:"roomType" es:"keyword"` + SiteID string `json:"siteId" es:"keyword"` + JoinedAt int64 `json:"joinedAt" es:"date"` +} + +func spotlightTemplateBody(indexName string) json.RawMessage { + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": 3, + "number_of_replicas": 1, + }, + "analysis": map[string]any{ + "analyzer": map[string]any{ + "custom_analyzer": map[string]any{ + "type": "custom", + "tokenizer": "custom_tokenizer", + "filter": []string{"lowercase"}, + }, + }, + "tokenizer": map[string]any{ + "custom_tokenizer": map[string]any{ + "type": "whitespace", + }, + }, + }, + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": esPropertiesFromStruct[SpotlightSearchIndex](), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} diff --git a/search-sync-worker/template.go b/search-sync-worker/template.go new file mode 100644 index 000000000..5165511e7 --- /dev/null +++ b/search-sync-worker/template.go @@ -0,0 +1,43 @@ +package main + +import ( + "reflect" + "strings" +) + +// esPropertiesFromStruct reflects over struct T's fields to build an +// Elasticsearch mapping properties map from `es` struct tags. Fields are +// keyed by the `json` tag name. +// +// The `es` tag grammar is "type[,analyzer]" — e.g. `es:"keyword"` or +// `es:"text,custom_analyzer"`. Fields are skipped when: +// - the `es` tag is missing or `-` +// - the `json` tag is missing, empty, or `-` (fail closed: we never emit +// a mapping entry under the empty string, which would silently corrupt +// the template for any future struct that adds an `es`-tagged field +// without a matching `json` tag) +func esPropertiesFromStruct[T any]() map[string]any { + var zero T + t := reflect.TypeOf(zero) + props := make(map[string]any, t.NumField()) + for i := range t.NumField() { + field := t.Field(i) + esTag := field.Tag.Get("es") + if esTag == "" || esTag == "-" { + continue + } + jsonTag := field.Tag.Get("json") + name, _, _ := strings.Cut(jsonTag, ",") + if name == "" || name == "-" { + continue + } + + esType, analyzer, _ := strings.Cut(esTag, ",") + prop := map[string]any{"type": esType} + if analyzer != "" { + prop["analyzer"] = analyzer + } + props[name] = prop + } + return props +} diff --git a/search-sync-worker/user_room.go b/search-sync-worker/user_room.go new file mode 100644 index 000000000..7560a63d9 --- /dev/null +++ b/search-sync-worker/user_room.go @@ -0,0 +1,186 @@ +package main + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/hmchangw/chat/pkg/searchengine" +) + +type userRoomCollection struct { + roomMemberCollection + indexName string +} + +func newUserRoomCollection(indexName string) *userRoomCollection { + return &userRoomCollection{indexName: indexName} +} + +func (c *userRoomCollection) ConsumerName() string { + return "user-room-sync" +} + +func (c *userRoomCollection) TemplateName() string { + return "user_room_template" +} + +func (c *userRoomCollection) TemplateBody() json.RawMessage { + return userRoomTemplateBody(c.indexName) +} + +const ( + addRoomScript = `if (ctx._source.roomTimestamps == null) { ctx._source.roomTimestamps = [:]; } ` + + `if (ctx._source.rooms == null) { ctx._source.rooms = []; } ` + + `long stored = ctx._source.roomTimestamps.containsKey(params.rid) ` + + `? ((Number)ctx._source.roomTimestamps.get(params.rid)).longValue() : 0L; ` + + `if (params.ts > stored) { ` + + `if (!ctx._source.rooms.contains(params.rid)) { ctx._source.rooms.add(params.rid); } ` + + `ctx._source.roomTimestamps.put(params.rid, params.ts); ` + + `ctx._source.updatedAt = params.now; ` + + `} else { ctx.op = 'none'; }` + + removeRoomScript = `if (ctx._source.roomTimestamps == null) { ctx._source.roomTimestamps = [:]; } ` + + `long stored = ctx._source.roomTimestamps.containsKey(params.rid) ` + + `? ((Number)ctx._source.roomTimestamps.get(params.rid)).longValue() : 0L; ` + + `if (params.ts > stored) { ` + + `if (ctx._source.rooms != null) { ` + + `int idx = ctx._source.rooms.indexOf(params.rid); ` + + `if (idx >= 0) { ctx._source.rooms.remove(idx); } } ` + + `ctx._source.roomTimestamps.put(params.rid, params.ts); ` + + `} else { ctx.op = 'none'; }` +) + +func (c *userRoomCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { + evt, err := parseMemberEvent(data) + if err != nil { + return nil, err + } + + if evt.Add != nil { + add := evt.Add + if add.HistorySharedSince > 0 { + return nil, nil + } + ts := add.Timestamp + actions := make([]searchengine.BulkAction, 0, len(add.Accounts)) + for _, account := range add.Accounts { + body, err := buildAddRoomUpdateBody(account, add.RoomID, ts) + if err != nil { + return nil, err + } + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionUpdate, + Index: c.indexName, + DocID: account, + Doc: body, + }) + } + return actions, nil + } + + if evt.Remove != nil { + rm := evt.Remove + ts := rm.Timestamp + actions := make([]searchengine.BulkAction, 0, len(rm.Accounts)) + for _, account := range rm.Accounts { + body, err := buildRemoveRoomUpdateBody(rm.RoomID, ts) + if err != nil { + return nil, err + } + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionUpdate, + Index: c.indexName, + DocID: account, + Doc: body, + }) + } + return actions, nil + } + + return nil, fmt.Errorf("user-room: no add or remove event parsed") +} + +type userRoomUpsertDoc struct { + UserAccount string `json:"userAccount"` + Rooms []string `json:"rooms"` + RoomTimestamps map[string]int64 `json:"roomTimestamps"` + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` +} + +func buildAddRoomUpdateBody(account, roomID string, ts int64) (json.RawMessage, error) { + now := time.UnixMilli(ts).UTC().Format(time.RFC3339Nano) + body := map[string]any{ + "script": map[string]any{ + "source": addRoomScript, + "lang": "painless", + "params": map[string]any{ + "rid": roomID, + "ts": ts, + "now": now, + }, + }, + "upsert": userRoomUpsertDoc{ + UserAccount: account, + Rooms: []string{roomID}, + RoomTimestamps: map[string]int64{roomID: ts}, + CreatedAt: now, + UpdatedAt: now, + }, + } + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal add room update: %w", err) + } + return data, nil +} + +func buildRemoveRoomUpdateBody(roomID string, ts int64) (json.RawMessage, error) { + body := map[string]any{ + "script": map[string]any{ + "source": removeRoomScript, + "lang": "painless", + "params": map[string]any{ + "rid": roomID, + "ts": ts, + }, + }, + } + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal remove room update: %w", err) + } + return data, nil +} + +func userRoomTemplateBody(indexName string) json.RawMessage { + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": 1, + "number_of_replicas": 1, + }, + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": map[string]any{ + "userAccount": map[string]any{"type": "keyword"}, + "rooms": map[string]any{ + "type": "text", + "fields": map[string]any{ + "keyword": map[string]any{"type": "keyword", "ignore_above": 256}, + }, + }, + "roomTimestamps": map[string]any{"type": "flattened"}, + "createdAt": map[string]any{"type": "date"}, + "updatedAt": map[string]any{"type": "date"}, + }, + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +}