diff --git a/docs/superpowers/plans/2026-05-11-spotlight-org-sync.md b/docs/superpowers/plans/2026-05-11-spotlight-org-sync.md new file mode 100644 index 000000000..af185d6cf --- /dev/null +++ b/docs/superpowers/plans/2026-05-11-spotlight-org-sync.md @@ -0,0 +1,1835 @@ +# Spotlight Org Sync Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a `spotlight-org` collection to `search-sync-worker` that consumes `hr.sync.{siteID}.employees.upsert` envelopes from `hr-syncer` and maintains the `spotlightorg-{siteID}` ES index keyed by `sectId` via doc-merge upserts. + +**Architecture:** New `spotlightOrgCollection` implementing the existing `Collection` interface, alongside a shared `HRSyncEvent` envelope in `pkg/model`, a new `HR_SYNC_{siteID}` stream definition, new subject builders, shared template helpers (`indexTopology`, `customAnalyzerSettings`), and a worker-wide `DEV_MODE` toggle reused from the project-wide env var already used by `auth-service` and `chat-frontend`. + +**Tech Stack:** Go 1.25, NATS JetStream (`nats.go/jetstream`), Elasticsearch 8.11, `compress/gzip`, `caarlos0/env` for config, `stretchr/testify` for assertions, `testcontainers-go` for integration tests. + +**Spec:** `docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.md` + +--- + +## File Structure + +**New files** +- `pkg/model/employee.go` — minimal `Employee` struct with `SectID` + the nine org fields. Owns the consumer-side contract; Mat can extend with non-org fields. +- `pkg/model/hrsync.go` — `HRSyncEvent` envelope. +- `search-sync-worker/spotlight_org.go` — new collection, projection struct, BuildAction, template body. +- `search-sync-worker/spotlight_org_test.go` — unit tests. + +**Modified files** +- `pkg/stream/stream.go` — add `HRSync(siteID)`. +- `pkg/stream/stream_test.go` — assert `HRSync`. +- `pkg/subject/subject.go` — add `HRSyncEmployeesUpsert`, `HRSyncUsersUpsert`. +- `pkg/subject/subject_test.go` — assert new subjects. +- `pkg/model/model_test.go` — roundtrip tests for `Employee` + `HRSyncEvent`. +- `search-sync-worker/template.go` — add `indexTopology` + `customAnalyzerSettings` helpers. +- `search-sync-worker/messages.go` — thread `devMode` into `messageCollection` + `messageTemplateBody`. +- `search-sync-worker/messages_test.go` — `devMode=true` subtest. +- `search-sync-worker/spotlight.go` — thread `devMode`, switch to `customAnalyzerSettings`, add `token_chars`, drop stale comment. +- `search-sync-worker/spotlight_test.go` — update constructor calls, add `devMode=true` subtest. +- `search-sync-worker/user_room.go` — thread `devMode`. +- `search-sync-worker/user_room_test.go` — update constructor calls, add `devMode=true` subtest. +- `search-sync-worker/main.go` — new config fields, default index name, skip HR_SYNC bootstrap, wire new collection. +- `search-sync-worker/integration_test.go` — add `TestSearchSync_SpotlightOrg_Integration`. +- `search-sync-worker/consumer_config_test.go` — extend if it enumerates collections. +- `search-sync-worker/deploy/docker-compose.yml` — `DEV_MODE=${DEV_MODE:-true}`, optional `SPOTLIGHT_ORG_INDEX`. + +--- + +## Task 1 (REMOVED): ~~Add `model.Employee`~~ + +**Status: superseded.** The original Task 1 added a `pkg/model.Employee` +struct, but this would conflict on merge with the internal repo's +already-existing `pkg/model/employee.go` (which defines a fuller +`Employee` plus an `Org` type for that repo's other consumers). + +Replacement: the consumer-side projection moves into +`search-sync-worker/spotlight_org.go` as `SpotlightOrgIndex` (defined +in Task 11). One struct serves three roles — unmarshal target for the +wire payload, document body on ES write, and source of truth for the +ES mapping. No public `pkg/model.Employee` is introduced in this PR. + +Commits `d9199cf` and `bd92d63` have been reverted (in a single combined +revert commit). Skip Task 1 when executing. + +--- + +## Task 2: Add `model.HRSyncEvent` envelope + +Mirrors `OutboxEvent` in shape: small fixed metadata + opaque payload bytes the consumer types per subject. The `Gzip` flag lets dev tooling skip compression without changing the consumer. + +**Files:** +- Create: `pkg/model/hrsync.go` +- Modify: `pkg/model/model_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `pkg/model/model_test.go`: + +```go +func TestHRSyncEventJSON(t *testing.T) { + src := model.HRSyncEvent{ + Timestamp: 1735689600000, + BatchID: "0192a4f7-8c2d-7c9a-abcd-e0123456789f", + Gzip: true, + Payload: json.RawMessage(`[{"sectId":"S001"}]`), + } + var dst model.HRSyncEvent + roundTrip(t, &src, &dst) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=pkg/model 2>&1 | tail -10` +Expected: FAIL with `undefined: model.HRSyncEvent`. + +- [ ] **Step 3: Create `pkg/model/hrsync.go`** + +```go +package model + +import "encoding/json" + +// HRSyncEvent is the envelope `hr-syncer` publishes on every +// `hr.sync.*` subject. Mirrors OutboxEvent: small fixed metadata plus +// an opaque payload the consumer types per-subject. +// +// Payload typing: +// - `hr.sync.{siteID}.employees.upsert` → []Employee +// - `hr.sync.{siteID}.users.upsert` → []User (future) +// +// Gzip lets dev tooling publish uncompressed without changing the +// consumer. Timestamp is set at the publish site in milliseconds since +// epoch; consumers reject `Timestamp <= 0`. BatchID is a UUIDv7 used +// for end-to-end tracing of one cron run. +type HRSyncEvent struct { + Timestamp int64 `json:"timestamp"` + BatchID string `json:"batchId"` + Gzip bool `json:"gzip"` + Payload json.RawMessage `json:"payload"` +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=pkg/model 2>&1 | tail -10` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add pkg/model/hrsync.go pkg/model/model_test.go +git commit -m "feat(model): add HRSyncEvent envelope for hr-syncer publishes" +``` + +--- + +## Task 3: Add `stream.HRSync` + +Site-scoped stream definition. `search-sync-worker` consumes from this stream but does NOT own its schema — `hr-syncer` does. Following the same pattern as `INBOX_{siteID}` (owned by `inbox-worker`). + +**Files:** +- Modify: `pkg/stream/stream.go` +- Modify: `pkg/stream/stream_test.go` + +- [ ] **Step 1: Write the failing test** + +Append the row in the `TestStreamConfigs` table in `pkg/stream/stream_test.go`: + +```go +{"HRSync", stream.HRSync(siteID), "HR_SYNC_site-a", "hr.sync.site-a.>"}, +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=pkg/stream 2>&1 | tail -10` +Expected: FAIL with `undefined: stream.HRSync`. + +- [ ] **Step 3: Add `HRSync` to `pkg/stream/stream.go`** + +Append after the `Inbox` function: + +```go +// HRSync returns the canonical config for the `HR_SYNC_{siteID}` stream +// that carries HR account sync events published daily by `hr-syncer` +// (e.g., `hr.sync.{siteID}.employees.upsert`, +// `hr.sync.{siteID}.users.upsert`). Schema is owned by `hr-syncer`; +// consumers like `search-sync-worker` must skip this stream in their +// bootstrap loop the same way they skip INBOX (owned by inbox-worker). +func HRSync(siteID string) Config { + return Config{ + Name: fmt.Sprintf("HR_SYNC_%s", siteID), + Subjects: []string{fmt.Sprintf("hr.sync.%s.>", siteID)}, + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=pkg/stream 2>&1 | tail -10` +Expected: PASS for the new `HRSync` subtest. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add pkg/stream/stream.go pkg/stream/stream_test.go +git commit -m "feat(stream): add HR_SYNC stream definition" +``` + +--- + +## Task 4: Add HR sync subject builders + +Two builders so the future `users.upsert` consumer can reuse the same pattern. Filter subjects are passed to `jetstream.ConsumerConfig.FilterSubjects`. + +**Files:** +- Modify: `pkg/subject/subject.go` +- Modify: `pkg/subject/subject_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `pkg/subject/subject_test.go`: + +```go +func TestHRSyncEmployeesUpsert(t *testing.T) { + got := subject.HRSyncEmployeesUpsert("site-a") + assert.Equal(t, "hr.sync.site-a.employees.upsert", got) +} + +func TestHRSyncUsersUpsert(t *testing.T) { + got := subject.HRSyncUsersUpsert("site-a") + assert.Equal(t, "hr.sync.site-a.users.upsert", got) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=pkg/subject 2>&1 | tail -10` +Expected: FAIL with `undefined: subject.HRSyncEmployeesUpsert`. + +- [ ] **Step 3: Add builders to `pkg/subject/subject.go`** + +Append after the `MsgCanonicalDeleted` function: + +```go +// HRSyncEmployeesUpsert returns the subject for batch HR-account +// upsert events. Consumed by `search-sync-worker`'s spotlight-org +// collection. +func HRSyncEmployeesUpsert(siteID string) string { + return fmt.Sprintf("hr.sync.%s.employees.upsert", siteID) +} + +// HRSyncUsersUpsert returns the subject for batch HR-user upsert +// events. Consumed by a separate service that maintains the `users` +// MongoDB collection (out of scope for this worker). +func HRSyncUsersUpsert(siteID string) string { + return fmt.Sprintf("hr.sync.%s.users.upsert", siteID) +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=pkg/subject 2>&1 | tail -10` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add pkg/subject/subject.go pkg/subject/subject_test.go +git commit -m "feat(subject): add HR sync subject builders" +``` + +--- + +## Task 5: Add `indexTopology` template helper + +Shared helper so every template's dev-mode toggle is centralized. Prod values stay configurable per collection; dev mode collapses uniformly to 1/0. + +**Files:** +- Modify: `search-sync-worker/template.go` +- Create: `search-sync-worker/template_test.go` (new — `template.go` doesn't have its own test file today; assertions for the existing reflect helper are split across `*_test.go` files) + +- [ ] **Step 1: Write the failing test** + +Create `search-sync-worker/template_test.go`: + +```go +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexTopology_Prod(t *testing.T) { + shards, replicas := indexTopology(4, 2, false) + assert.Equal(t, 4, shards) + assert.Equal(t, 2, replicas) +} + +func TestIndexTopology_Dev(t *testing.T) { + // Dev collapses every input to 1/0 regardless of prod values. + shards, replicas := indexTopology(4, 2, true) + assert.Equal(t, 1, shards) + assert.Equal(t, 0, replicas) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -10` +Expected: FAIL with `undefined: indexTopology`. + +- [ ] **Step 3: Add `indexTopology` to `search-sync-worker/template.go`** + +Append to the end of `template.go`: + +```go +// indexTopology returns the (shards, replicas) pair an ES index +// template should declare. Prod values vary by collection — pass them +// in. In dev mode every template collapses to 1/0 so a single +// DEV_MODE toggle gives every index a fast local footprint without +// per-template env vars. +func indexTopology(prodShards, prodReplicas int, devMode bool) (int, int) { + if devMode { + return 1, 0 + } + return prodShards, prodReplicas +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -10` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/template.go search-sync-worker/template_test.go +git commit -m "feat(search-sync-worker): add indexTopology helper for DEV_MODE" +``` + +--- + +## Task 6: Add `customAnalyzerSettings` template helper + +Shared analyzer block both spotlight templates use. Includes the `token_chars` setting on the whitespace tokenizer (verified to be accepted on ES 8.11). + +**Files:** +- Modify: `search-sync-worker/template.go` +- Modify: `search-sync-worker/template_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `search-sync-worker/template_test.go`: + +```go +func TestCustomAnalyzerSettings_Shape(t *testing.T) { + got := customAnalyzerSettings() + + analyzer := got["analyzer"].(map[string]any) + custom := analyzer["custom_analyzer"].(map[string]any) + assert.Equal(t, "custom", custom["type"]) + assert.Equal(t, "custom_tokenizer", custom["tokenizer"]) + assert.Equal(t, []string{"lowercase"}, custom["filter"]) + + tokenizer := got["tokenizer"].(map[string]any) + tok := tokenizer["custom_tokenizer"].(map[string]any) + assert.Equal(t, "whitespace", tok["type"]) + assert.Equal(t, []string{"letter", "digit", "punctuation", "symbol"}, tok["token_chars"]) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -10` +Expected: FAIL with `undefined: customAnalyzerSettings`. + +- [ ] **Step 3: Add `customAnalyzerSettings` to `search-sync-worker/template.go`** + +Append after `indexTopology`: + +```go +// customAnalyzerSettings returns the `analysis` block shared by the +// spotlight (room-typeahead) and spotlight-org (section-typeahead) +// templates. A whitespace tokenizer with a permissive `token_chars` +// set (letter, digit, punctuation, symbol) feeds a lowercase-folding +// `custom_analyzer`. Returning a fresh map per call prevents aliasing +// if a caller mutates the result. +func customAnalyzerSettings() map[string]any { + return map[string]any{ + "analyzer": map[string]any{ + "custom_analyzer": map[string]any{ + "type": "custom", + "tokenizer": "custom_tokenizer", + "filter": []string{"lowercase"}, + }, + }, + "tokenizer": map[string]any{ + "custom_tokenizer": map[string]any{ + "type": "whitespace", + "token_chars": []string{"letter", "digit", "punctuation", "symbol"}, + }, + }, + } +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -10` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/template.go search-sync-worker/template_test.go +git commit -m "feat(search-sync-worker): add customAnalyzerSettings helper" +``` + +--- + +## Task 7: Refactor `spotlightTemplateBody` to use shared analyzer + add `token_chars` + +The existing spotlight template inlines its analyzer block and has a stale comment claiming `token_chars` is rejected by ES. Per the user's verification on ES 8.11, drop the comment and route through `customAnalyzerSettings()`. + +**Files:** +- Modify: `search-sync-worker/spotlight.go:130-168` +- Modify: `search-sync-worker/spotlight_test.go` (no behavior change expected — existing assertions should still pass) + +- [ ] **Step 1: Write the failing test** + +Append to `search-sync-worker/spotlight_test.go`: + +```go +// TestSpotlightTemplateBody_HasTokenChars locks in that the spotlight +// template adopts the shared analyzer (which carries token_chars). +// Before this change the spotlight tokenizer had no token_chars set. +func TestSpotlightTemplateBody_HasTokenChars(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat") + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + + tmpl := parsed["template"].(map[string]any) + settings := tmpl["settings"].(map[string]any) + analysis := settings["analysis"].(map[string]any) + tokenizer := analysis["tokenizer"].(map[string]any) + custom := tokenizer["custom_tokenizer"].(map[string]any) + tokenChars, ok := custom["token_chars"].([]any) + require.True(t, ok, "custom_tokenizer must declare token_chars after the refactor") + assert.ElementsMatch(t, []any{"letter", "digit", "punctuation", "symbol"}, tokenChars) +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `make test SERVICE=search-sync-worker -run TestSpotlightTemplateBody_HasTokenChars 2>&1 | tail -10` +Expected: FAIL — current template doesn't declare `token_chars`. + +- [ ] **Step 3: Refactor `spotlightTemplateBody`** + +Replace the body of `spotlightTemplateBody` in `search-sync-worker/spotlight.go` (lines 130-168): + +```go +// spotlightTemplateBody builds the ES index template for the spotlight +// (room-typeahead) collection. Analyzer config is shared with the +// spotlight-org template via customAnalyzerSettings(). The +// `index_patterns` field is set to the exact configured index name so +// a custom SPOTLIGHT_INDEX value still receives the correct mapping. +func spotlightTemplateBody(indexName string) json.RawMessage { + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": 3, + "number_of_replicas": 1, + }, + "analysis": customAnalyzerSettings(), + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": esPropertiesFromStruct[SpotlightSearchIndex](), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} +``` + +(Note: `devMode` is deferred to Task 10 — this task is purely the analyzer extraction so the diff stays reviewable.) + +- [ ] **Step 4: Run all spotlight tests to verify nothing broke** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS — including the new `TestSpotlightTemplateBody_HasTokenChars` and all existing spotlight tests. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/spotlight.go search-sync-worker/spotlight_test.go +git commit -m "refactor(search-sync-worker): spotlight template uses shared analyzer" +``` + +--- + +## Task 8: Thread `devMode` into `messageCollection` + +Adds `devMode bool` to the constructor and template-body signature. Existing prod behavior (4 shards, 2 replicas) preserved when `devMode=false`. + +**Files:** +- Modify: `search-sync-worker/messages.go` +- Modify: `search-sync-worker/messages_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `search-sync-worker/messages_test.go`: + +```go +func TestMessageTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := messageTemplateBody("messages-site-a-v1", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(4), idx["number_of_shards"]) + assert.Equal(t, float64(2), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := messageTemplateBody("messages-site-a-v1", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +} +``` + +Also update every existing call site in `messages_test.go` from `messageTemplateBody(prefix)` → `messageTemplateBody(prefix, false)`, and from `newMessageCollection(prefix)` → `newMessageCollection(prefix, false)`. + +- [ ] **Step 2: Run test to verify build error / failures** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: build error — `messageTemplateBody` is currently 1-arg. + +- [ ] **Step 3: Update `search-sync-worker/messages.go`** + +Modify the struct, constructor, `TemplateBody`, and `messageTemplateBody`: + +```go +type messageCollection struct { + indexPrefix string + devMode bool +} + +func newMessageCollection(indexPrefix string, devMode bool) *messageCollection { + return &messageCollection{indexPrefix: indexPrefix, devMode: devMode} +} +``` + +```go +func (c *messageCollection) TemplateBody() json.RawMessage { + return messageTemplateBody(c.indexPrefix, c.devMode) +} +``` + +```go +func messageTemplateBody(prefix string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(4, 2, devMode) + tmpl := map[string]any{ + "index_patterns": []string{fmt.Sprintf("%s-*", prefix)}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": shards, + "number_of_replicas": replicas, + "refresh_interval": "30s", + }, + "analysis": map[string]any{ + "analyzer": map[string]any{ + "custom_analyzer": map[string]any{ + "type": "custom", + "tokenizer": "underscore_preserving", + "filter": []string{"underscore_subword", "cjk_bigram", "lowercase"}, + "char_filter": []string{"html_strip"}, + }, + }, + "tokenizer": map[string]any{ + "underscore_preserving": map[string]any{ + "type": "pattern", + "pattern": `[\s,;!?()\[\]{}"'<>]+`, + }, + }, + "filter": map[string]any{ + "underscore_subword": map[string]any{ + "type": "word_delimiter_graph", + "split_on_case_change": false, + "split_on_numerics": false, + "preserve_original": true, + }, + }, + }, + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": messageTemplateProperties(), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} +``` + +Also update `main.go` temporarily to keep the build green: `newMessageCollection(cfg.MsgIndexPrefix, false)`. The proper wiring (passing `cfg.DevMode`) is Task 14. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS — including the new dev-mode subtest. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/messages.go search-sync-worker/messages_test.go search-sync-worker/main.go +git commit -m "feat(search-sync-worker): thread DEV_MODE into message template" +``` + +--- + +## Task 9: Thread `devMode` into `spotlightCollection` + +Same mechanical refactor as Task 8. + +**Files:** +- Modify: `search-sync-worker/spotlight.go` +- Modify: `search-sync-worker/spotlight_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `search-sync-worker/spotlight_test.go`: + +```go +func TestSpotlightTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(3), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +} +``` + +Update every call to `newSpotlightCollection(idx)` → `newSpotlightCollection(idx, false)`, and `spotlightTemplateBody(idx)` → `spotlightTemplateBody(idx, false)` in this file and in the Task 7 test you just added. + +- [ ] **Step 2: Run test to verify build error** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: build error. + +- [ ] **Step 3: Update `search-sync-worker/spotlight.go`** + +```go +type spotlightCollection struct { + inboxMemberCollection + indexName string + devMode bool +} + +func newSpotlightCollection(indexName string, devMode bool) *spotlightCollection { + return &spotlightCollection{indexName: indexName, devMode: devMode} +} +``` + +```go +func (c *spotlightCollection) TemplateBody() json.RawMessage { + return spotlightTemplateBody(c.indexName, c.devMode) +} +``` + +```go +func spotlightTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(3, 1, devMode) + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": shards, + "number_of_replicas": replicas, + }, + "analysis": customAnalyzerSettings(), + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": esPropertiesFromStruct[SpotlightSearchIndex](), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} +``` + +Update `main.go`: `newSpotlightCollection(cfg.SpotlightIndex, false)`. + +- [ ] **Step 4: Run tests** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/spotlight.go search-sync-worker/spotlight_test.go search-sync-worker/main.go +git commit -m "feat(search-sync-worker): thread DEV_MODE into spotlight template" +``` + +--- + +## Task 10: Thread `devMode` into `userRoomCollection` + +Same mechanical refactor. + +**Files:** +- Modify: `search-sync-worker/user_room.go` +- Modify: `search-sync-worker/user_room_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `search-sync-worker/user_room_test.go`: + +```go +func TestUserRoomTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := userRoomTemplateBody("user-room-site-a", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := userRoomTemplateBody("user-room-site-a", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +} +``` + +Update every `newUserRoomCollection(idx)` → `newUserRoomCollection(idx, false)` and `userRoomTemplateBody(idx)` → `userRoomTemplateBody(idx, false)`. + +- [ ] **Step 2: Run test** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: build error. + +- [ ] **Step 3: Update `search-sync-worker/user_room.go`** + +```go +type userRoomCollection struct { + inboxMemberCollection + indexName string + devMode bool +} + +func newUserRoomCollection(indexName string, devMode bool) *userRoomCollection { + return &userRoomCollection{indexName: indexName, devMode: devMode} +} +``` + +```go +func (c *userRoomCollection) TemplateBody() json.RawMessage { + return userRoomTemplateBody(c.indexName, c.devMode) +} +``` + +```go +func userRoomTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(1, 1, devMode) + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": shards, + "number_of_replicas": replicas, + }, + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": map[string]any{ + "userAccount": map[string]any{"type": "keyword"}, + "rooms": map[string]any{ + "type": "text", + "fields": map[string]any{ + "keyword": map[string]any{"type": "keyword", "ignore_above": 256}, + }, + }, + "restrictedRooms": map[string]any{"type": "flattened"}, + "roomTimestamps": map[string]any{"type": "flattened"}, + "createdAt": map[string]any{"type": "date"}, + "updatedAt": map[string]any{"type": "date"}, + }, + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} +``` + +Update `main.go`: `newUserRoomCollection(cfg.UserRoomIndex, false)`. + +- [ ] **Step 4: Run tests** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/user_room.go search-sync-worker/user_room_test.go search-sync-worker/main.go +git commit -m "feat(search-sync-worker): thread DEV_MODE into user-room template" +``` + +--- + +## Task 11: Scaffold `spotlightOrgCollection` (metadata, no BuildAction yet) + +Create the type, constructor, and the four metadata methods (`StreamConfig`, `ConsumerName`, `FilterSubjects`, `TemplateName`, `TemplateBody`). `BuildAction` is stubbed to return an error so the build succeeds; real logic lands in Tasks 12-13. + +**Files:** +- Create: `search-sync-worker/spotlight_org.go` +- Create: `search-sync-worker/spotlight_org_test.go` + +- [ ] **Step 1: Write the failing metadata test** + +Create `search-sync-worker/spotlight_org_test.go`: + +```go +package main + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSpotlightOrgCollection_Metadata(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + + assert.Equal(t, "spotlight-org-sync", coll.ConsumerName()) + assert.Equal(t, "spotlight_org_template", coll.TemplateName()) + + cfg := coll.StreamConfig("site-a") + assert.Equal(t, "HR_SYNC_site-a", cfg.Name) + assert.Equal(t, []string{"hr.sync.site-a.>"}, cfg.Subjects) + assert.Empty(t, cfg.Sources) + + filters := coll.FilterSubjects("site-a") + assert.Equal(t, []string{"hr.sync.site-a.employees.upsert"}, filters) +} + +func TestSpotlightOrgTemplateBody_Prod(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + body := coll.TemplateBody() + require.NotNil(t, body) + + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + + patterns := parsed["index_patterns"].([]any) + assert.Equal(t, "spotlightorg-site-a", patterns[0]) + + tmpl := parsed["template"].(map[string]any) + idx := tmpl["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(3), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + + mappings := tmpl["mappings"].(map[string]any) + assert.Equal(t, false, mappings["dynamic"]) + props := mappings["properties"].(map[string]any) + for _, f := range []string{ + "sectId", "sectTCName", "sectName", "sectDescription", + "deptId", "deptTCName", "deptName", "deptDescription", "divisionId", + } { + field, ok := props[f].(map[string]any) + require.True(t, ok, "missing property: %s", f) + assert.Equal(t, "search_as_you_type", field["type"], "field %s wrong type", f) + assert.Equal(t, "custom_analyzer", field["analyzer"], "field %s wrong analyzer", f) + } +} + +func TestSpotlightOrgTemplateBody_Dev(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", true) + body := coll.TemplateBody() + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) +} +``` + +- [ ] **Step 2: Run test to verify build error** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -10` +Expected: build error — `undefined: newSpotlightOrgCollection`. + +- [ ] **Step 3: Create `search-sync-worker/spotlight_org.go`** + +```go +package main + +import ( + "encoding/json" + "errors" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/hmchangw/chat/pkg/searchengine" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) + +// spotlightOrgCollection implements Collection for the spotlight-org +// search index. One document per `sectId` carries the nine org fields +// projected from each HR account row in the batched payload. The doc +// ID is the sectId itself — many employees collapse to one document +// via dedup in BuildAction. +// +// The wire-side row type and the ES doc projection are the same struct +// (SpotlightOrgIndex below), keeping the consumer loosely coupled to +// hr-syncer's own internal Employee/Org types without taking a public +// dependency on a pkg/model.Employee that would conflict on merge with +// the internal repo's existing one. +// +// The HR_SYNC_{siteID} stream is owned by `hr-syncer`; this collection +// is a pure consumer. main.go skips HR_SYNC in its bootstrap loop the +// same way it skips INBOX. +type spotlightOrgCollection struct { + indexName string + devMode bool +} + +func newSpotlightOrgCollection(indexName string, devMode bool) *spotlightOrgCollection { + return &spotlightOrgCollection{indexName: indexName, devMode: devMode} +} + +func (c *spotlightOrgCollection) StreamConfig(siteID string) jetstream.StreamConfig { + cfg := stream.HRSync(siteID) + return jetstream.StreamConfig{Name: cfg.Name, Subjects: cfg.Subjects} +} + +func (c *spotlightOrgCollection) ConsumerName() string { + return "spotlight-org-sync" +} + +func (c *spotlightOrgCollection) FilterSubjects(siteID string) []string { + return []string{subject.HRSyncEmployeesUpsert(siteID)} +} + +func (c *spotlightOrgCollection) TemplateName() string { + return "spotlight_org_template" +} + +func (c *spotlightOrgCollection) TemplateBody() json.RawMessage { + return spotlightOrgTemplateBody(c.indexName, c.devMode) +} + +// BuildAction is implemented in stages — see Tasks 12-13 of the plan. +// The stub returns an error so a misconfigured wiring fails loudly +// rather than silently dropping HR sync events. +func (c *spotlightOrgCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { + return nil, errors.New("spotlightOrgCollection.BuildAction: not yet implemented") +} + +// SpotlightOrgIndex serves three roles in the consumer: unmarshal +// target for the wire-side row, document body on ES write, and source +// of truth for the ES mapping via esPropertiesFromStruct. Every field +// is `omitempty` `string` so absent values serialize away and +// doc-merge upsert preserves the stored value rather than overwriting +// with empty. Fields not in this struct are silently ignored by the +// json decoder — hr-syncer is free to publish additional fields. +type SpotlightOrgIndex struct { + SectID string `json:"sectId,omitempty" es:"search_as_you_type,custom_analyzer"` + SectTCName string `json:"sectTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectName string `json:"sectName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectDescription string `json:"sectDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptID string `json:"deptId,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptTCName string `json:"deptTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptName string `json:"deptName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptDescription string `json:"deptDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DivisionID string `json:"divisionId,omitempty" es:"search_as_you_type,custom_analyzer"` +} + +func spotlightOrgTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(3, 1, devMode) + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": shards, + "number_of_replicas": replicas, + }, + "analysis": customAnalyzerSettings(), + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": esPropertiesFromStruct[SpotlightOrgIndex](), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} +``` + +- [ ] **Step 4: Run metadata tests to verify pass** + +Run: `make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS for metadata + template tests. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/spotlight_org.go search-sync-worker/spotlight_org_test.go +git commit -m "feat(search-sync-worker): scaffold spotlight-org collection metadata" +``` + +--- + +## Task 12: Implement `BuildAction` happy path + dedup + +Parse the envelope (uncompressed only for now — gzip lands in Task 13), unmarshal `[]SpotlightOrgIndex` (defined in Task 11; the wire-side row and the ES doc projection are the same struct), dedup by `SectID`, emit one `_update` per unique sectId with `doc_as_upsert:true`. + +**Files:** +- Modify: `search-sync-worker/spotlight_org.go` +- Modify: `search-sync-worker/spotlight_org_test.go` + +- [ ] **Step 1: Write the failing tests** + +Append to `search-sync-worker/spotlight_org_test.go`: + +```go +import ( + "github.com/hmchangw/chat/pkg/model" +) + +// makeHRSyncEvent builds a plaintext (gzip=false) HRSyncEvent containing +// the given employees. Used by every BuildAction test that doesn't +// exercise the compression path. +func makeHRSyncEvent(t *testing.T, ts int64, employees []SpotlightOrgIndex) []byte { + t.Helper() + payload, err := json.Marshal(employees) + require.NoError(t, err) + evt := model.HRSyncEvent{ + Timestamp: ts, + BatchID: "b-1", + Gzip: false, + Payload: payload, + } + data, err := json.Marshal(evt) + require.NoError(t, err) + return data +} + +func TestSpotlightOrg_BuildAction_HappyPath(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1735689600000, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Eng", DeptID: "D1", DeptName: "Tech"}, + {SectID: "S2", SectName: "Sales", DeptID: "D2", DeptName: "Biz"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 2) + + docIDs := map[string]bool{} + for _, a := range actions { + assert.Equal(t, searchengine.ActionUpdate, a.Action) + assert.Equal(t, "spotlightorg-site-a", a.Index) + assert.Equal(t, int64(0), a.Version, "ActionUpdate must not use external versioning") + docIDs[a.DocID] = true + + var body map[string]any + require.NoError(t, json.Unmarshal(a.Doc, &body)) + assert.Equal(t, true, body["doc_as_upsert"]) + assert.Contains(t, body, "doc") + } + assert.True(t, docIDs["S1"]) + assert.True(t, docIDs["S2"]) +} + +// TestSpotlightOrg_BuildAction_DedupBySectID covers many-employees-one-section: +// 100 employees sharing the same sectId must collapse to a single ES action. +// The kept row is the LAST occurrence so the most recent in-batch update wins. +func TestSpotlightOrg_BuildAction_DedupBySectID(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + {SectID: "S1", SectName: "Engineering Renamed"}, + {SectID: "S2", SectName: "Sales"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 2) + + var s1Body map[string]any + for _, a := range actions { + if a.DocID == "S1" { + require.NoError(t, json.Unmarshal(a.Doc, &s1Body)) + } + } + require.NotNil(t, s1Body) + doc := s1Body["doc"].(map[string]any) + assert.Equal(t, "Engineering Renamed", doc["sectName"], "last-wins on dedup") +} + +func TestSpotlightOrg_BuildAction_EmptySectIDsSkipped(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "", SectName: "no-section"}, + {SectID: "S1", SectName: "Eng"}, + {SectID: "", DeptID: "D9"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + assert.Equal(t, "S1", actions[0].DocID) +} + +func TestSpotlightOrg_BuildAction_AllEmptySectIDs(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectName: "no-section-1"}, + {SectName: "no-section-2"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + assert.Nil(t, actions, "all empty sectIds → zero actions, handler acks JS msg without ES write") +} + +func TestSpotlightOrg_BuildAction_EmptyEmployees(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{}) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + assert.Nil(t, actions) +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `make test SERVICE=search-sync-worker -run TestSpotlightOrg_BuildAction 2>&1 | tail -20` +Expected: FAIL — stub returns an error. + +- [ ] **Step 3: Replace the `BuildAction` stub in `search-sync-worker/spotlight_org.go`** + +```go +import ( + "encoding/json" + "fmt" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/searchengine" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) +``` + +```go +// BuildAction parses an HR sync envelope, dedupes employees by SectID +// (keeping the last occurrence per sectId), and emits one ES `_update` +// per unique sectId with `doc_as_upsert:true`. Doc-merge means absent +// fields on the projection preserve the stored value rather than +// overwriting with empty strings — partial-field events from +// `hr-syncer` work without a painless script. +// +// Returns (nil, nil) for empty employee arrays or batches where every +// row has an empty SectID; the handler then acks the JS message with +// no ES write. +func (c *spotlightOrgCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { + var envelope model.HRSyncEvent + if err := json.Unmarshal(data, &envelope); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync envelope: %w", err) + } + if envelope.Timestamp <= 0 { + return nil, fmt.Errorf("build spotlight-org action: missing timestamp") + } + + var employees []SpotlightOrgIndex + if err := json.Unmarshal(envelope.Payload, &employees); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync employees: %w", err) + } + if len(employees) == 0 { + return nil, nil + } + + // Dedup by SectID keeping the LAST occurrence — within one batch + // the most recent update wins. Empty SectID rows are skipped + // silently (employees not yet assigned to a section). + deduped := make(map[string]SpotlightOrgIndex, len(employees)) + order := make([]string, 0, len(employees)) + for _, emp := range employees { + if emp.SectID == "" { + continue + } + if _, seen := deduped[emp.SectID]; !seen { + order = append(order, emp.SectID) + } + deduped[emp.SectID] = emp + } + if len(deduped) == 0 { + return nil, nil + } + + actions := make([]searchengine.BulkAction, 0, len(deduped)) + for _, sectID := range order { + row := deduped[sectID] + body, err := buildSpotlightOrgUpdateBody(row) + if err != nil { + return nil, err + } + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionUpdate, + Index: c.indexName, + DocID: sectID, + Doc: body, + // No Version: ActionUpdate must not use external versioning. + // handler.go::isBulkItemSuccess depends on this. + }) + } + return actions, nil +} + +// buildSpotlightOrgUpdateBody wraps the row in an ES `_update` body +// with `doc_as_upsert:true`. The row is already the projection — the +// json `omitempty` discipline guarantees absent fields don't appear +// in the body and therefore don't overwrite stored values on the +// merge. Errors here are theoretical (the inputs are plain strings), +// but we return the wrapped error to keep the call site explicit. +func buildSpotlightOrgUpdateBody(row SpotlightOrgIndex) (json.RawMessage, error) { + body := map[string]any{ + "doc": row, + "doc_as_upsert": true, + } + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal spotlight-org update body: %w", err) + } + return data, nil +} +``` + +Also delete the now-unused `errors` import. + +- [ ] **Step 4: Run tests** + +Run: `make test SERVICE=search-sync-worker -run TestSpotlightOrg 2>&1 | tail -20` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/spotlight_org.go search-sync-worker/spotlight_org_test.go +git commit -m "feat(search-sync-worker): spotlight-org BuildAction happy path + dedup" +``` + +--- + +## Task 13: Implement gzip decompression + partial-field test + error cases + +Add the `compress/gzip` branch and lock in the remaining behavior. + +**Files:** +- Modify: `search-sync-worker/spotlight_org.go` +- Modify: `search-sync-worker/spotlight_org_test.go` + +- [ ] **Step 1: Write the failing tests** + +Append to `search-sync-worker/spotlight_org_test.go`: + +```go +import ( + "bytes" + "compress/gzip" +) + +// makeHRSyncEventGzip mirrors makeHRSyncEvent but gzip-compresses the +// employee payload. Used to exercise the consumer's decompression path. +func makeHRSyncEventGzip(t *testing.T, ts int64, employees []SpotlightOrgIndex) []byte { + t.Helper() + raw, err := json.Marshal(employees) + require.NoError(t, err) + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, err = gz.Write(raw) + require.NoError(t, err) + require.NoError(t, gz.Close()) + evt := model.HRSyncEvent{ + Timestamp: ts, + BatchID: "b-1", + Gzip: true, + Payload: buf.Bytes(), + } + data, err := json.Marshal(evt) + require.NoError(t, err) + return data +} + +func TestSpotlightOrg_BuildAction_Gzip(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEventGzip(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + assert.Equal(t, "S1", actions[0].DocID) + + var body map[string]any + require.NoError(t, json.Unmarshal(actions[0].Doc, &body)) + doc := body["doc"].(map[string]any) + assert.Equal(t, "Engineering", doc["sectName"]) +} + +// TestSpotlightOrg_BuildAction_PartialFields locks in the partial-update +// contract: an Employee carrying only SectID + SectName must produce +// an ES `doc` body containing ONLY those two keys. Other org fields +// must be absent so doc-merge preserves their stored values. +func TestSpotlightOrg_BuildAction_PartialFields(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + + var body map[string]any + require.NoError(t, json.Unmarshal(actions[0].Doc, &body)) + doc := body["doc"].(map[string]any) + + assert.Equal(t, "S1", doc["sectId"]) + assert.Equal(t, "Engineering", doc["sectName"]) + for _, absent := range []string{ + "sectTCName", "sectDescription", + "deptId", "deptTCName", "deptName", "deptDescription", "divisionId", + } { + _, present := doc[absent] + assert.False(t, present, "doc must not carry %s when Employee did not set it", absent) + } +} + +func TestSpotlightOrg_BuildAction_Errors(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + + t.Run("malformed envelope", func(t *testing.T) { + _, err := coll.BuildAction([]byte("{invalid")) + assert.Error(t, err) + }) + + t.Run("missing timestamp", func(t *testing.T) { + data := makeHRSyncEvent(t, 0, []SpotlightOrgIndex{{SectID: "S1"}}) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) + + t.Run("malformed employees payload", func(t *testing.T) { + evt := model.HRSyncEvent{ + Timestamp: 1, + Payload: json.RawMessage(`not json`), + } + data, _ := json.Marshal(evt) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) + + t.Run("corrupt gzip header", func(t *testing.T) { + evt := model.HRSyncEvent{ + Timestamp: 1, + Gzip: true, + Payload: []byte("not gzip"), + } + data, _ := json.Marshal(evt) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) +} +``` + +- [ ] **Step 2: Run tests to verify failures** + +Run: `make test SERVICE=search-sync-worker -run TestSpotlightOrg_BuildAction_Gzip 2>&1 | tail -10` +Expected: FAIL — the implementation doesn't yet handle `Gzip=true`. + +- [ ] **Step 3: Add gzip decompression to `BuildAction`** + +In `search-sync-worker/spotlight_org.go`, update the imports: + +```go +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/searchengine" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) +``` + +Replace the body of `BuildAction` between the envelope unmarshal and the employees unmarshal: + +```go + payload := []byte(envelope.Payload) + if envelope.Gzip { + decompressed, err := gunzipBytes(payload) + if err != nil { + return nil, fmt.Errorf("decompress hr-sync payload: %w", err) + } + payload = decompressed + } + + var employees []SpotlightOrgIndex + if err := json.Unmarshal(payload, &employees); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync employees: %w", err) + } +``` + +Append the helper: + +```go +// gunzipBytes returns the gzip-decompressed contents of b. A corrupt +// header or truncated stream returns an error; the caller turns that +// into a NAK so JetStream retries — a transient publisher hiccup +// should not silently drop the batch. +func gunzipBytes(b []byte) ([]byte, error) { + gr, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + defer gr.Close() + return io.ReadAll(gr) +} +``` + +- [ ] **Step 4: Run all spotlight-org tests** + +Run: `make test SERVICE=search-sync-worker -run TestSpotlightOrg 2>&1 | tail -20` +Expected: PASS. + +- [ ] **Step 5: Lint and commit** + +```bash +make lint +git add search-sync-worker/spotlight_org.go search-sync-worker/spotlight_org_test.go +git commit -m "feat(search-sync-worker): spotlight-org gzip + partial-field handling" +``` + +--- + +## Task 14: Wire `DEV_MODE` config + `SPOTLIGHT_ORG_INDEX` + HR_SYNC skip + collection registration + +Final assembly in `main.go` — adds config fields, defaults the index name, skips HR_SYNC bootstrap, and wires the new collection alongside the existing three (now also passing the real `cfg.DevMode`). + +**Files:** +- Modify: `search-sync-worker/main.go` +- Modify: `search-sync-worker/consumer_config_test.go` (if it enumerates collections by name) + +- [ ] **Step 1: Check whether `consumer_config_test.go` needs updating** + +Run: `grep -n "spotlight\|user-room\|message-sync" /home/user/chat/search-sync-worker/consumer_config_test.go` + +If the file enumerates collection consumer names, add `"spotlight-org-sync"` to whatever list it asserts. If it uses a generic fake, no change needed. (This file is a one-off check, not a TDD step — adjust based on what you see.) + +- [ ] **Step 2: Update `search-sync-worker/main.go::config`** + +Add the two new env vars to the `config` struct alongside the existing ones: + +```go +SpotlightOrgIndex string `env:"SPOTLIGHT_ORG_INDEX" envDefault:""` +DevMode bool `env:"DEV_MODE" envDefault:"false"` +``` + +- [ ] **Step 3: Default the index name in `main`** + +After the existing `SpotlightIndex` default (around line 91), add: + +```go +if cfg.SpotlightOrgIndex == "" { + cfg.SpotlightOrgIndex = fmt.Sprintf("spotlightorg-%s", cfg.SiteID) +} +``` + +- [ ] **Step 4: Extend the bootstrap skip list** + +Modify the section around line 176-184 from: + +```go +// INBOX is owned by inbox-worker — see the skip in the loop below. +inboxName := stream.Inbox(cfg.SiteID).Name +``` + +to: + +```go +// INBOX is owned by inbox-worker; HR_SYNC is owned by hr-syncer. +// search-sync-worker is a pure consumer of both and must not create +// their schemas. +inboxName := stream.Inbox(cfg.SiteID).Name +hrSyncName := stream.HRSync(cfg.SiteID).Name +``` + +And modify the condition (around line 184) from: + +```go +if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName { +``` + +to: + +```go +if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName && streamCfg.Name != hrSyncName { +``` + +- [ ] **Step 5: Wire the new collection and pass `cfg.DevMode` to all four** + +Replace the `collections := []Collection{...}` slice (around line 137-141): + +```go +collections := []Collection{ + newMessageCollection(cfg.MsgIndexPrefix, cfg.DevMode), + newSpotlightCollection(cfg.SpotlightIndex, cfg.DevMode), + newSpotlightOrgCollection(cfg.SpotlightOrgIndex, cfg.DevMode), + newUserRoomCollection(cfg.UserRoomIndex, cfg.DevMode), +} +``` + +Update the trailing `slog.Info` (around line 219-225) to include the new index: + +```go +slog.Info("search-sync-worker running", + "site", cfg.SiteID, + "msgPrefix", cfg.MsgIndexPrefix, + "spotlightIndex", cfg.SpotlightIndex, + "spotlightOrgIndex", cfg.SpotlightOrgIndex, + "userRoomIndex", cfg.UserRoomIndex, + "devMode", cfg.DevMode, + "collections", len(collections), +) +``` + +- [ ] **Step 6: Build, lint, run all unit tests** + +Run: `make build SERVICE=search-sync-worker && make lint && make test SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: build succeeds, lint clean, all tests pass. + +- [ ] **Step 7: Commit** + +```bash +git add search-sync-worker/main.go search-sync-worker/consumer_config_test.go +git commit -m "feat(search-sync-worker): wire spotlight-org collection and DEV_MODE" +``` + +--- + +## Task 15: Integration test + +End-to-end test in real testcontainers — publishes a gzipped `HRSyncEvent` to a fresh `HR_SYNC_{siteID}` stream and asserts the resulting ES docs. + +**Files:** +- Modify: `search-sync-worker/integration_test.go` + +- [ ] **Step 1: Read the existing integration test setup** + +Run: `grep -n "func setup\|TestSearchSync\|func Test" /home/user/chat/search-sync-worker/integration_test.go | head -20` + +Identify the existing setup helpers (likely `setupNATS`, `setupES`, or a single composite). Match their style for the new test. + +- [ ] **Step 2: Add the test to `search-sync-worker/integration_test.go`** + +Append to the file (keep the existing `//go:build integration` tag intact at the top): + +```go +// TestSearchSync_SpotlightOrg_Integration drives the new collection end +// to end: publish a gzipped HRSyncEvent to a fresh HR_SYNC stream, let +// the worker run briefly, and verify the resulting documents. +// +// Doc-merge is the key behavior under test: a second event carrying +// only SectID + SectName must NOT clear the other stored fields. +func TestSearchSync_SpotlightOrg_Integration(t *testing.T) { + const siteID = "site-int" + const indexName = "spotlightorg-site-int" + + ctx := context.Background() + + // Reuse the existing setup helpers. Adjust names to match what's + // already in this file. + nc, js := setupNATS(t, ctx) + engine, esURL := setupSearchEngine(t, ctx) + + // Create HR_SYNC stream (the test owns it because hr-syncer isn't + // running; in prod hr-syncer creates it). + hrCfg := stream.HRSync(siteID) + _, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: hrCfg.Name, + Subjects: hrCfg.Subjects, + }) + require.NoError(t, err) + + // Upsert the template once before any docs land so the mapping + // applies to the very first index. + coll := newSpotlightOrgCollection(indexName, true) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), coll.TemplateBody())) + + // Build the durable consumer and the handler exactly as main.go + // would, then drive runConsumer in a goroutine bounded by stopCh. + consumerCfg := buildConsumerConfig(stream.DefaultConsumerSettings(), coll, siteID) + cons, err := js.CreateOrUpdateConsumer(ctx, hrCfg.Name, consumerCfg) + require.NoError(t, err) + handler := NewHandler(&engineAdapter{engine: engine}, coll, 500) + stopCh := make(chan struct{}) + doneCh := make(chan struct{}) + go runConsumer(ctx, cons, handler, 100, 500, time.Second, stopCh, doneCh) + t.Cleanup(func() { + close(stopCh) + <-doneCh + }) + + // Publish event #1 with full org info for two sections. + subj := subject.HRSyncEmployeesUpsert(siteID) + publishHRSyncEvent(t, nc, subj, time.Now().UnixMilli(), []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering", DeptID: "D1", DeptName: "Tech"}, + {SectID: "S2", SectName: "Sales", DeptID: "D2", DeptName: "Biz"}, + }) + + // Poll until both docs land. ES is eventually consistent; refresh + // or a short retry loop is needed. + require.Eventually(t, func() bool { + s1 := getESDoc(t, esURL, indexName, "S1") + s2 := getESDoc(t, esURL, indexName, "S2") + return s1 != nil && s2 != nil && + s1["sectName"] == "Engineering" && s2["sectName"] == "Sales" + }, 30*time.Second, 500*time.Millisecond, "first batch did not land") + + // Publish event #2 — partial update for S1 with only SectName changed. + // The doc-merge contract says deptId/deptName must survive. + publishHRSyncEvent(t, nc, subj, time.Now().UnixMilli(), []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering Renamed"}, + }) + + require.Eventually(t, func() bool { + s1 := getESDoc(t, esURL, indexName, "S1") + return s1 != nil && + s1["sectName"] == "Engineering Renamed" && + s1["deptId"] == "D1" && + s1["deptName"] == "Tech" + }, 30*time.Second, 500*time.Millisecond, "doc-merge did not preserve untouched fields") +} + +// publishHRSyncEvent builds a gzipped HRSyncEvent and publishes it to +// the given subject. The test owns the encoding so it can also exercise +// the worker's decompression path. +func publishHRSyncEvent(t *testing.T, nc *nats.Conn, subj string, ts int64, employees []SpotlightOrgIndex) { + t.Helper() + raw, err := json.Marshal(employees) + require.NoError(t, err) + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, err = gz.Write(raw) + require.NoError(t, err) + require.NoError(t, gz.Close()) + envelope := model.HRSyncEvent{ + Timestamp: ts, + BatchID: fmt.Sprintf("test-%d", ts), + Gzip: true, + Payload: buf.Bytes(), + } + data, err := json.Marshal(envelope) + require.NoError(t, err) + require.NoError(t, nc.Publish(subj, data)) +} + +// getESDoc fetches a document by ID via ES HTTP _source endpoint and +// returns the parsed _source. Returns nil when the doc isn't there +// yet (404) so the caller's Eventually polling loop works cleanly. +func getESDoc(t *testing.T, esURL, index, docID string) map[string]any { + t.Helper() + resp, err := http.Get(fmt.Sprintf("%s/%s/_source/%s", esURL, index, docID)) + if err != nil { + return nil + } + defer resp.Body.Close() + if resp.StatusCode == 404 { + return nil + } + if resp.StatusCode != 200 { + return nil + } + var doc map[string]any + if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { + return nil + } + return doc +} +``` + +Add any missing imports to the file's import block: `bytes`, `compress/gzip`, `net/http`, `time`, `github.com/hmchangw/chat/pkg/model`, `github.com/hmchangw/chat/pkg/stream`, `github.com/hmchangw/chat/pkg/subject`. + +**Note**: the helper names `setupNATS`, `setupSearchEngine`, and the type returned by the latter may differ in your file. Match the existing names; the structure of the test is what matters. If `setupSearchEngine` returns only an engine and not a URL, add a second helper that returns the container's mapped URL. + +- [ ] **Step 3: Run the integration test** + +Run: `make test-integration SERVICE=search-sync-worker 2>&1 | tail -30` +Expected: PASS (initial run pulls Docker images — may take 1-2 minutes). + +- [ ] **Step 4: Lint and commit** + +```bash +make lint +git add search-sync-worker/integration_test.go +git commit -m "test(search-sync-worker): integration test for spotlight-org sync" +``` + +--- + +## Task 16: Docker compose + +Local dev now wants `DEV_MODE=true` (collapses every template to 1/0) and an optional `SPOTLIGHT_ORG_INDEX` override. + +**Files:** +- Modify: `search-sync-worker/deploy/docker-compose.yml` + +- [ ] **Step 1: Update the env block** + +Replace the `environment` list in `search-sync-worker/deploy/docker-compose.yml` to add `DEV_MODE`: + +```yaml + environment: + - NATS_URL=nats://nats:4222 + - NATS_CREDS_FILE=/etc/nats/backend.creds + - SITE_ID=site-local + - SEARCH_URL=http://elasticsearch:9200 + - SEARCH_BACKEND=elasticsearch + - MSG_INDEX_PREFIX=messages-site-local-v1 + - BOOTSTRAP_STREAMS=true + - DEV_MODE=${DEV_MODE:-true} + # SPOTLIGHT_ORG_INDEX defaults to spotlightorg-${SITE_ID} when unset. + # Override here if you need a different name for parallel test setups. +``` + +- [ ] **Step 2: Verify YAML parses** + +Run: `docker compose -f search-sync-worker/deploy/docker-compose.yml config 2>&1 | tail -20` +Expected: parses cleanly, shows `DEV_MODE: "true"` in the rendered output. + +- [ ] **Step 3: Commit** + +```bash +git add search-sync-worker/deploy/docker-compose.yml +git commit -m "chore(search-sync-worker): enable DEV_MODE in local docker-compose" +``` + +--- + +## Task 17: Final verification + push + +- [ ] **Step 1: Full unit test pass with race detector** + +Run: `make test 2>&1 | tail -20` +Expected: all packages PASS with `-race`. + +- [ ] **Step 2: Full integration test pass** + +Run: `make test-integration SERVICE=search-sync-worker 2>&1 | tail -20` +Expected: PASS. + +- [ ] **Step 3: Lint clean** + +Run: `make lint 2>&1 | tail -10` +Expected: no issues. + +- [ ] **Step 4: Verify coverage on the new file** + +Run: `cd search-sync-worker && go test -coverprofile=/tmp/cov.out -run TestSpotlightOrg && go tool cover -func=/tmp/cov.out | grep spotlight_org` +Expected: ≥80% per CLAUDE.md, target ≥90%. + +- [ ] **Step 5: Push the branch** + +```bash +git push -u origin claude/spotlight-org-sync-design-fS2qh +``` + +(Per the branch policy: do NOT create a PR unless the user asks.) + +--- + +## Self-review notes + +Spec coverage check: +- §3.1 Stream definition → Task 3 +- §3.2 Envelope → Task 2 +- §3.3 Inner payload + Employee model → Task 1 +- §4.1 Collection scaffold → Task 11 +- §4.2 BuildAction flow → Tasks 12 + 13 +- §4.3 Doc-merge upsert rationale → covered in Task 12 implementation +- §4.4 Fan-out — relies on existing handler.go logic, no new code; the dedup test asserts the per-batch action count +- §5.1 SpotlightOrgIndex struct → Task 11 +- §5.2 Shared analyzer (+ updating existing spotlight) → Tasks 6, 7 +- §5.3 Topology + dev mode → Task 5, applied in Tasks 8-11 +- §6 Config & wiring → Task 14 +- §7 Files touched — every entry mapped to a task +- §8 Testing — unit Tasks 11-13, dev-mode subtests Tasks 8-10, integration Task 15 +- §9 Local dev — Task 16 diff --git a/docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.md b/docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.md new file mode 100644 index 000000000..5c3a04a4c --- /dev/null +++ b/docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.md @@ -0,0 +1,455 @@ +# Spotlight Org Sync Design + +**Status:** Draft +**Date:** 2026-05-11 +**Branch:** `claude/spotlight-org-sync-design-fS2qh` + +## 1. Problem + +Today the org-shape data (sections, departments, divisions) used by the +spotlight-org Elasticsearch index has to be derived from MongoDB change +streams on the `users` and `hr_acct_org` collections. A new `hr-syncer` +service — a daily 8am cronjob that reads HR account data from a MinIO +file — will instead publish JetStream events directly, replacing the +change-stream pipeline. + +This design adds a new collection inside `search-sync-worker` that +consumes those events and writes the spotlight-org index. The events +are batched arrays of employee changes, possibly compressed, possibly +carrying only the subset of fields that changed for an account. The +target index is keyed by `sectId` — many employees collapse to one ES +document. + +## 2. Goals & Non-goals + +**Goals** + +- Add a `spotlight-org` collection to `search-sync-worker` that + consumes `hr.sync.{siteID}.employees.upsert`. +- Maintain an ES index `spotlightorg-{siteID}` where `_id` is the + `sectId` and the document carries the nine org fields listed below. +- Handle partial-field updates correctly: an event that carries only + some org fields must NOT clobber other stored fields. +- Stay consistent with existing `search-sync-worker` patterns + (`Collection` interface, bulk-flush handler, JetStream pull consumer, + externally-owned streams). +- Reuse one project-wide `DEV_MODE` env var to flip every ES template + in this worker to a single-shard / no-replica topology. + +**Non-goals** + +- Publisher side (`hr-syncer`). Mat owns that; this design only nails + down the consumer contract and the on-wire envelope. +- Consuming `hr.sync.{siteID}.users.upsert`. That subject feeds a + different consumer in a different service. The envelope type is + shared so a future consumer can reuse it. +- Cross-batch strict LWW (painless `params.ts > stored` guards). Daily + cron + JetStream retry semantics make doc-merge idempotency sufficient. +- Deleting sections. Subject is upsert-only; delete handling is + deferred to a future `hr.sync.*.delete` subject if it becomes needed. + +## 3. On-the-wire contract + +### 3.1 Stream + +`HR_SYNC_{siteID}` with subjects `hr.sync.{siteID}.>`. Owned by +`hr-syncer` (publisher) — `search-sync-worker` is a pure consumer and +must NOT bootstrap it, the same way it must not bootstrap `INBOX`. + +The site-ID token is in the subject (`hr.sync.{siteID}.employees.upsert`) +so multi-site NATS clusters keep HR traffic per-site. Every other stream +in `pkg/stream/stream.go` follows this convention; we follow it here. + +### 3.2 Envelope + +A new type in `pkg/model/hrsync.go`: + +```go +// HRSyncEvent is the envelope hr-syncer publishes on every hr.sync.* +// subject. Mirrors OutboxEvent: small fixed metadata + opaque payload +// bytes the consumer types per-subject. +type HRSyncEvent struct { + Timestamp int64 `json:"timestamp"` // ms since epoch, set at publish site + BatchID string `json:"batchId"` // UUIDv7 for end-to-end trace of one cron run + Gzip bool `json:"gzip"` // true → Payload is gzip(JSON) + Payload json.RawMessage `json:"payload"` // typed by subject at consume time +} +``` + +Why an envelope rather than raw `[]Employee`: + +- Same envelope serves `users.upsert` and any future `hr.sync.*` subject + — payload type varies per subject, picked at consume time. +- Carries the event-level timestamp used downstream as a tie-breaker if + we ever add stricter LWW. +- Carries a `Gzip` flag so dev tooling can publish uncompressed without + changing the consumer. + +### 3.3 Inner payload + +The `Payload` on `hr.sync.{siteID}.employees.upsert` is a JSON array +of HR account rows. The publisher (`hr-syncer`, owned by Mat) defines +its own internal `Employee` / `Org` types with the full HR field set; +those types live in the publisher's package and are not imported here. + +The consumer does **not** declare a public `pkg/model.Employee`. Doing +so would conflict on merge with the internal repo's existing +`pkg/model/employee.go` (which already defines a fuller `Employee` and +`Org` for that repo's other consumers). Instead, `search-sync-worker` +defines a local projection — `SpotlightOrgIndex` in +`search-sync-worker/spotlight_org.go` — that carries only the nine org +fields it reads, with matching json tags so the wire format is +identical: + +``` +sectId, sectTCName, sectName, sectDescription, +deptId, deptTCName, deptName, deptDescription, +divisionId +``` + +All `string` with `omitempty`. Same struct serves three roles in the +consumer: wire-side row type to unmarshal into, ES doc projection on +write, and source of truth for the ES mapping via +`esPropertiesFromStruct[SpotlightOrgIndex]()`. One source of truth, no +copying between shapes. + +Empty strings serialize to absent JSON — the consumer treats absent +keys as "this field did not change" for doc-merge upsert. + +## 4. Worker-side design + +### 4.1 New `spotlightOrgCollection` + +File: `search-sync-worker/spotlight_org.go`. Implements the existing +`Collection` interface alongside `messageCollection`, +`spotlightCollection`, and `userRoomCollection`. + +```go +func (c *spotlightOrgCollection) StreamConfig(siteID string) jetstream.StreamConfig { + cfg := stream.HRSync(siteID) + return jetstream.StreamConfig{Name: cfg.Name, Subjects: cfg.Subjects} +} + +func (c *spotlightOrgCollection) ConsumerName() string { + return "spotlight-org-sync" +} + +func (c *spotlightOrgCollection) FilterSubjects(siteID string) []string { + return []string{subject.HRSyncEmployeesUpsert(siteID)} +} +``` + +### 4.2 `BuildAction` flow + +1. Unmarshal `data` into `model.HRSyncEvent`. Reject `Timestamp <= 0` + with an error (NAK + redeliver). +2. If `envelope.Gzip`, decompress `envelope.Payload` via + `compress/gzip`. Corrupt gzip → error. +3. Unmarshal the (possibly decompressed) bytes into + `[]SpotlightOrgIndex` — the local projection type defined alongside + the collection. Fields not in the projection are ignored by the + decoder. +4. **Dedup by `SectID`.** Walk the slice and keep the last occurrence + per `SectID` in a `map[string]SpotlightOrgIndex`. Rows with empty + `SectID` are silently skipped (employees not yet assigned to a + section). If the result is empty, return `(nil, nil)` and the + handler will ack the JS message with no ES write. +5. For each unique `sectId`, build the ES `_update` body using the + deduped row directly as `doc`: + + ```go + body := map[string]any{ + "doc": row, // SpotlightOrgIndex + "doc_as_upsert": true, + } + ``` + +6. Emit one `searchengine.BulkAction` per unique `sectId`: + + ```go + searchengine.BulkAction{ + Action: searchengine.ActionUpdate, + Index: c.indexName, + DocID: sectID, // overrides ES _id with sectId — the explicit requirement + Doc: bodyJSON, + // No Version: ActionUpdate must NOT use external versioning. + // handler.go::isBulkItemSuccess depends on this. + } + ``` + +### 4.3 Why doc-merge upsert (no painless script) + +ES `_update` with `doc_as_upsert:true` overwrites only the keys present +in `doc`; absent keys preserve their stored value. Combined with +`omitempty` on the projection struct, an event carrying only +`{SectID, SectName}` produces a body containing only `sectId` and +`sectName`, leaving the other seven stored fields untouched. + +Idempotency on JetStream redelivery is automatic: re-applying a +doc-merge with the same fields is a no-op at the application level, +even though ES may bump `_version`. The handler's existing 404/409 +logic in `isBulkItemSuccess` covers `ActionUpdate` correctly. + +Trade-off accepted: doc-merge cannot *clear* a field (overwrite "Eng" +back to ""). If HR sync ever needs to wipe a field, add an explicit +`clearFields []string` to the envelope or switch the projection to +`*string`. YAGNI until then. + +### 4.4 Fan-out characteristics + +A 10K-employee batch with ~500 unique sections produces 500 ES actions +— a fan-out collection where `ActionCount() > MessageCount()`. The +existing mid-batch flush in `main.go::runConsumer` handles this: +`fetchCount` is clamped to remaining bulk capacity, and a single big +event that pushes the buffer over `BULK_BATCH_SIZE` triggers an +immediate flush. + +## 5. Index template + +### 5.1 Document struct + mapping + +```go +type SpotlightOrgIndex struct { + SectID string `json:"sectId,omitempty" es:"search_as_you_type,custom_analyzer"` + SectTCName string `json:"sectTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectName string `json:"sectName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectDescription string `json:"sectDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptID string `json:"deptId,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptTCName string `json:"deptTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptName string `json:"deptName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptDescription string `json:"deptDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DivisionID string `json:"divisionId,omitempty" es:"search_as_you_type,custom_analyzer"` +} +``` + +The same struct drives THREE roles in the consumer: unmarshal target +for the wire payload, document body on ES write (via `json.Marshal`), +and mapping source for the index template (via +`esPropertiesFromStruct[SpotlightOrgIndex]()`). One source of truth, +no copying between shapes. + +### 5.2 Analysis (shared with `spotlight`) + +The custom analyzer and tokenizer are shared with the existing +`spotlight` (room-typeahead) template, factored into a helper in +`template.go`: + +```go +func customAnalyzerSettings() map[string]any { + return map[string]any{ + "analyzer": map[string]any{ + "custom_analyzer": map[string]any{ + "type": "custom", + "tokenizer": "custom_tokenizer", + "filter": []string{"lowercase"}, + }, + }, + "tokenizer": map[string]any{ + "custom_tokenizer": map[string]any{ + "type": "whitespace", + "token_chars": []string{"letter", "digit", "punctuation", "symbol"}, + }, + }, + } +} +``` + +The existing `spotlight.go` is updated to call `customAnalyzerSettings()` +and gains the `token_chars` setting (verified to be accepted on +target ES 8.11). The now-stale comment in `spotlightTemplateBody` +about `token_chars` being rejected is removed. + +### 5.3 Topology & dev mode + +A shared helper toggles shard/replica counts: + +```go +// indexTopology returns (shards, replicas) for an ES index template. +// In dev mode every template collapses to 1/0 regardless of prod +// values, so a single DEV_MODE toggle gives every index a fast local +// footprint without per-template env vars. +func indexTopology(prodShards, prodReplicas int, devMode bool) (int, int) { + if devMode { + return 1, 0 + } + return prodShards, prodReplicas +} +``` + +Applied to all four templates: + +| Template | Prod | Dev | +|---|---|---| +| `messageTemplateBody` | 4 shards / 2 replicas | 1 / 0 | +| `spotlightTemplateBody` | 3 / 1 | 1 / 0 | +| `spotlightOrgTemplateBody` | 3 / 1 | 1 / 0 | +| `userRoomTemplateBody` | 1 / 1 | 1 / 0 | + +The `refresh_interval:30s` on the messages template stays put — it's a +write-perf knob, not a topology one. + +## 6. Config & wiring + +### 6.1 New env vars on `search-sync-worker/main.go::config` + +```go +SpotlightOrgIndex string `env:"SPOTLIGHT_ORG_INDEX" envDefault:""` +DevMode bool `env:"DEV_MODE" envDefault:"false"` +``` + +`DEV_MODE` reuses the project-wide env var already used by +`auth-service` and `chat-frontend` (the same variable, not a new one +scoped to this worker). + +`SPOTLIGHT_ORG_INDEX` defaults to `spotlightorg-{siteID}` when empty, +mirroring how `SPOTLIGHT_INDEX` defaults. + +### 6.2 Stream bootstrap skip + +`main.go` currently skips `INBOX_{siteID}` when `BOOTSTRAP_STREAMS=true` +because `inbox-worker` owns it. Extend the skip list to also exclude +`HR_SYNC_{siteID}` (owned by `hr-syncer`): + +```go +hrSyncName := stream.HRSync(cfg.SiteID).Name +inboxName := stream.Inbox(cfg.SiteID).Name +// ... +if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName && streamCfg.Name != hrSyncName { + js.CreateOrUpdateStream(ctx, streamCfg) +} +``` + +### 6.3 Collection wiring + +```go +collections := []Collection{ + newMessageCollection(cfg.MsgIndexPrefix, cfg.DevMode), + newSpotlightCollection(cfg.SpotlightIndex, cfg.DevMode), + newSpotlightOrgCollection(cfg.SpotlightOrgIndex, cfg.DevMode), + newUserRoomCollection(cfg.UserRoomIndex, cfg.DevMode), +} +``` + +Every collection constructor accepts a `devMode bool`, stores it on the +struct, and threads it into `TemplateBody()` via the corresponding +`*TemplateBody(indexName, devMode bool)` function signature. + +### 6.4 Consumer config + +`buildConsumerConfig` is unchanged. `spotlightOrgCollection.FilterSubjects` +narrows the consumer to `hr.sync.{siteID}.employees.upsert`. The +existing 1s/5s/30s `BackOff` progression applies — a daily-cron source +benefits from progressive retries on transient ES failures, same as +the other collections. + +## 7. Files touched / created + +| File | Action | Purpose | +|---|---|---| +| `pkg/stream/stream.go` | Edit | Add `HRSync(siteID) Config` | +| `pkg/stream/stream_test.go` | Edit | Test for `HRSync` | +| `pkg/subject/subject.go` | Edit | Add `HRSyncEmployeesUpsert`, `HRSyncUsersUpsert` | +| `pkg/subject/subject_test.go` | Edit | Tests for new subject builders | +| `pkg/model/hrsync.go` | Create | `HRSyncEvent` envelope type | +| `pkg/model/model_test.go` | Edit | Roundtrip test for `HRSyncEvent` | +| `search-sync-worker/spotlight_org.go` | Create | New collection | +| `search-sync-worker/spotlight_org_test.go` | Create | Unit tests for new collection | +| `search-sync-worker/spotlight.go` | Edit | Thread `devMode`, use shared analyzer helper, add `token_chars` | +| `search-sync-worker/spotlight_test.go` | Edit | `devMode=true` subtest | +| `search-sync-worker/messages.go` | Edit | Thread `devMode` into template | +| `search-sync-worker/messages_test.go` | Edit | `devMode=true` subtest | +| `search-sync-worker/user_room.go` | Edit | Thread `devMode` into template | +| `search-sync-worker/user_room_test.go` | Edit | `devMode=true` subtest | +| `search-sync-worker/template.go` | Edit | Add `indexTopology` + `customAnalyzerSettings` helpers | +| `search-sync-worker/main.go` | Edit | New config fields, default index name, skip HR_SYNC bootstrap, wire collection | +| `search-sync-worker/integration_test.go` | Edit | Add `TestSearchSync_SpotlightOrg_Integration` | +| `search-sync-worker/deploy/docker-compose.yml` | Edit | `DEV_MODE=${DEV_MODE:-true}`, optional `SPOTLIGHT_ORG_INDEX` | +| `search-sync-worker/consumer_config_test.go` | Possibly edit | Cover new collection if it asserts by-name | + +## 8. Testing + +### 8.1 Unit tests (`spotlight_org_test.go`) + +Table-driven, mocked store via existing `mock_store_test.go`: + +| Test | Scenario | +|---|---| +| `BuildAction_HappyPath` | Valid envelope, gzip=false, 3 employees / 2 unique sectIds → 2 actions; last-wins on dup sectId verified by marshaled doc. | +| `BuildAction_Gzip` | Same with `Gzip=true` and a gzipped payload; assert decompress + parse path. | +| `BuildAction_PartialFields` | Employee with only `SectID + SectName` → resulting update body's `doc` contains exactly those keys. No empty-string keys for the other seven. | +| `BuildAction_EmptySectID` | Mix of employees, some empty `SectID` → empty ones skipped, non-empty emitted. No error. | +| `BuildAction_AllEmptySectIDs` | All empty `SectID` → returns `(nil, nil)`. Handler acks JS message with no ES write. | +| `BuildAction_DocAsUpsertSet` | Bulk body contains `"doc_as_upsert":true`. | +| `BuildAction_DocIDIsSectID` | `BulkAction.DocID == employee.SectID`. | +| `BuildAction_NoVersionOnUpdate` | `BulkAction.Version == 0` (handler 409 logic depends on this). | +| `BuildAction_InvalidEnvelope` | Malformed JSON, zero timestamp, corrupt gzip → error, NAK + redeliver. | +| `BuildAction_EmptyEmployees` | `payload=[]` → returns `(nil, nil)`. | +| `SpotlightOrgTemplateBody_Prod` | `devMode=false` → shards=3, replicas=1, all nine fields present as `search_as_you_type,custom_analyzer`. | +| `SpotlightOrgTemplateBody_Dev` | `devMode=true` → shards=1, replicas=0. | + +### 8.2 Updates to existing unit tests + +- `messages_test.go`, `spotlight_test.go`, `user_room_test.go`: add a + `devMode=true` subtest to each `*TemplateBody` test asserting 1/0 + topology. Existing assertions remain valid for the default + `devMode=false` path. +- `mock_store_test.go`: untouched (the `Store` interface didn't + change). + +### 8.3 Integration test + +In `integration_test.go` (build tag `//go:build integration`), reuse +the existing `nats` + `searchengine` testcontainers: + +`TestSearchSync_SpotlightOrg_Integration`: +1. Create the `HR_SYNC_{siteID}` stream via `js.CreateOrUpdateStream` + (the test owns it; `hr-syncer` isn't running). +2. Publish a real gzipped `HRSyncEvent` with three employees across + two `sectId`s. +3. Run the worker briefly with `DEV_MODE=true` and + `BOOTSTRAP_STREAMS=true`. +4. Assert: + - ES index `spotlightorg-{siteID}` exists with the expected mapping + (read via `_index_template/spotlight_org_template`). + - Two docs exist, keyed by `sectId`. + - A second envelope carrying only `{SectID, SectName}` for one of + the sectIds preserves the other stored fields (doc-merge worked). + +### 8.4 Coverage + +Per `CLAUDE.md` Section 4: ≥80% required, ≥90% target for new core +code. The matrix above covers happy path, idempotency, fan-out, +partial fields, dedup, malformed input, and template shape. + +## 9. Local dev + +`search-sync-worker/deploy/docker-compose.yml`: + +```yaml +environment: + DEV_MODE: ${DEV_MODE:-true} + BOOTSTRAP_STREAMS: "true" + # SPOTLIGHT_ORG_INDEX defaults to spotlightorg-{siteID} when unset. +``` + +The `HR_SYNC_{siteID}` stream itself is owned by `hr-syncer`'s +compose. To exercise the new collection end-to-end locally without +running `hr-syncer`, the integration test path (step 8.3) demonstrates +how to publish a synthetic event. + +## 10. Open questions / future work + +- **Painless LWW guard.** Current design relies on doc-merge + idempotency + single-publisher cron. If multiple HR sync sources are + ever introduced, add a per-field timestamp guard mirroring + `user-room`'s `roomTimestamps` flattened map. +- **Field-clearing.** Doc-merge can't clear a stored field. If HR data + ever requires explicit clears, add a `clearFields []string` to + `HRSyncEvent` or switch the projection to `*string`. +- **Section deletes.** Subject is upsert-only today. A + `hr.sync.{siteID}.employees.delete` subject would be a separate + filter on the same consumer with a `delete` branch in `BuildAction`. +- **Users collection.** `hr.sync.{siteID}.users.upsert` is out of + scope here. The shared `HRSyncEvent` envelope is intentional so a + future consumer (different service, different MongoDB collection) + can reuse it. diff --git a/pkg/model/hrsync.go b/pkg/model/hrsync.go new file mode 100644 index 000000000..f39422a23 --- /dev/null +++ b/pkg/model/hrsync.go @@ -0,0 +1,18 @@ +package model + +import "encoding/json" + +// HRSyncEvent is the envelope hr-syncer publishes on every hr.sync.* +// subject. Each consumer defines its own local projection struct +// for the payload elements. +// +// When Gzip=true the payload rides as a JSON string of base64(gzip(JSON +// array)) — json.RawMessage must itself be valid JSON, so raw binary +// bytes can't be embedded directly. Decoders unmarshal into []byte +// (which base64-decodes) and then gunzip. +type HRSyncEvent struct { + Timestamp int64 `json:"timestamp" bson:"timestamp"` + BatchID string `json:"batchId" bson:"batchId"` + Gzip bool `json:"gzip" bson:"gzip"` + Payload json.RawMessage `json:"payload" bson:"payload"` +} diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index f06493feb..79970cd31 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -2019,6 +2019,17 @@ func TestSyncCreateDMRequestJSON(t *testing.T) { assert.Equal(t, src, dst) } +func TestHRSyncEventJSON(t *testing.T) { + src := model.HRSyncEvent{ + Timestamp: 1735689600000, + BatchID: "0192a4f7-8c2d-7c9a-abcd-e0123456789f", + Gzip: true, + Payload: json.RawMessage(`[{"sectId":"S001"}]`), + } + var dst model.HRSyncEvent + roundTrip(t, &src, &dst) +} + func TestSyncCreateDMReplyJSON(t *testing.T) { now := time.Date(2026, 5, 7, 12, 0, 0, 0, time.UTC) src := model.SyncCreateDMReply{ diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go index cddd7ed09..f2bf978f8 100644 --- a/pkg/stream/stream.go +++ b/pkg/stream/stream.go @@ -77,3 +77,12 @@ func Inbox(siteID string) Config { }, } } + +// HRSync is the HR_SYNC_{siteID} stream, populated by hr-syncer's daily +// publishes on hr.sync.{siteID}.>. Schema is owned by hr-syncer. +func HRSync(siteID string) Config { + return Config{ + Name: fmt.Sprintf("HR_SYNC_%s", siteID), + Subjects: []string{fmt.Sprintf("hr.sync.%s.>", siteID)}, + } +} diff --git a/pkg/stream/stream_test.go b/pkg/stream/stream_test.go index b52d71e1d..d35b4083d 100644 --- a/pkg/stream/stream_test.go +++ b/pkg/stream/stream_test.go @@ -25,6 +25,7 @@ func TestStreamConfigs(t *testing.T) { {"MessagesCanonical", stream.MessagesCanonical(siteID), "MESSAGES_CANONICAL_site-a", "chat.msg.canonical.site-a.>"}, {"Rooms", stream.Rooms(siteID), "ROOMS_site-a", "chat.room.canonical.site-a.>"}, {"Outbox", stream.Outbox(siteID), "OUTBOX_site-a", "outbox.site-a.>"}, + {"HRSync", stream.HRSync(siteID), "HR_SYNC_site-a", "hr.sync.site-a.>"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/subject/subject.go b/pkg/subject/subject.go index 302b12239..e210a5e67 100644 --- a/pkg/subject/subject.go +++ b/pkg/subject/subject.go @@ -161,6 +161,14 @@ func MsgCanonicalDeleted(siteID string) string { return fmt.Sprintf("chat.msg.canonical.%s.deleted", siteID) } +func HRSyncEmployeesUpsert(siteID string) string { + return fmt.Sprintf("hr.sync.%s.employees.upsert", siteID) +} + +func HRSyncUsersUpsert(siteID string) string { + return fmt.Sprintf("hr.sync.%s.users.upsert", siteID) +} + func RoomEvent(roomID string) string { return fmt.Sprintf("chat.room.%s.event", roomID) } diff --git a/pkg/subject/subject_test.go b/pkg/subject/subject_test.go index ce054713d..172d32488 100644 --- a/pkg/subject/subject_test.go +++ b/pkg/subject/subject_test.go @@ -3,6 +3,8 @@ package subject_test import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/hmchangw/chat/pkg/subject" ) @@ -350,3 +352,13 @@ func TestRoomCreateDMSync(t *testing.T) { t.Errorf("RoomCreateDMSync: got %q, want %q", got, want) } } + +func TestHRSyncEmployeesUpsert(t *testing.T) { + got := subject.HRSyncEmployeesUpsert("site-a") + assert.Equal(t, "hr.sync.site-a.employees.upsert", got) +} + +func TestHRSyncUsersUpsert(t *testing.T) { + got := subject.HRSyncUsersUpsert("site-a") + assert.Equal(t, "hr.sync.site-a.users.upsert", got) +} diff --git a/search-sync-worker/deploy/docker-compose.yml b/search-sync-worker/deploy/docker-compose.yml index f2948e0bc..184704c46 100644 --- a/search-sync-worker/deploy/docker-compose.yml +++ b/search-sync-worker/deploy/docker-compose.yml @@ -13,6 +13,7 @@ services: - SEARCH_BACKEND=elasticsearch - MSG_INDEX_PREFIX=messages-site-local-v1 - BOOTSTRAP_STREAMS=true + - DEV_MODE=${DEV_MODE:-true} volumes: - ../../docker-local/backend.creds:/etc/nats/backend.creds:ro networks: diff --git a/search-sync-worker/handler_test.go b/search-sync-worker/handler_test.go index 2ca98b613..57ff87e89 100644 --- a/search-sync-worker/handler_test.go +++ b/search-sync-worker/handler_test.go @@ -47,7 +47,7 @@ func makeStubMsg(t *testing.T, evt *model.MessageEvent) *stubMsg { func TestHandler_Add(t *testing.T) { ctrl := gomock.NewController(t) store := NewMockStore(ctrl) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) evt := model.MessageEvent{ Event: model.EventCreated, @@ -66,7 +66,7 @@ func TestHandler_Add(t *testing.T) { func TestHandler_Add_MalformedJSON(t *testing.T) { ctrl := gomock.NewController(t) store := NewMockStore(ctrl) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msg := &stubMsg{data: []byte("{invalid")} h.Add(msg) @@ -92,7 +92,7 @@ func TestHandler_Flush(t *testing.T) { Bulk(gomock.Any(), gomock.Len(1)). Return([]searchengine.BulkResult{{Status: 201}}, nil) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msg := makeStubMsg(t, &baseEvt) h.Add(msg) h.Flush(context.Background()) @@ -109,7 +109,7 @@ func TestHandler_Flush(t *testing.T) { Bulk(gomock.Any(), gomock.Len(1)). Return([]searchengine.BulkResult{{Status: 409, Error: "version conflict"}}, nil) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msg := makeStubMsg(t, &baseEvt) h.Add(msg) h.Flush(context.Background()) @@ -125,7 +125,7 @@ func TestHandler_Flush(t *testing.T) { Bulk(gomock.Any(), gomock.Len(1)). Return([]searchengine.BulkResult{{Status: 500, Error: "internal"}}, nil) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msg := makeStubMsg(t, &baseEvt) h.Add(msg) h.Flush(context.Background()) @@ -141,7 +141,7 @@ func TestHandler_Flush(t *testing.T) { Bulk(gomock.Any(), gomock.Len(2)). Return(nil, fmt.Errorf("connection refused")) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msg1 := makeStubMsg(t, &baseEvt) evt2 := baseEvt evt2.Message.ID = "m2" @@ -159,7 +159,7 @@ func TestHandler_Flush(t *testing.T) { t.Run("empty flush is no-op", func(t *testing.T) { ctrl := gomock.NewController(t) store := NewMockStore(ctrl) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) h.Flush(context.Background()) assert.Equal(t, 0, h.MessageCount()) }) @@ -175,7 +175,7 @@ func TestHandler_Flush(t *testing.T) { {Status: 500, Error: "shard failure"}, }, nil) - h := NewHandler(store, newMessageCollection("msgs-v1"), 500) + h := NewHandler(store, newMessageCollection("msgs-v1", false), 500) msgs := make([]*stubMsg, 3) for i := range msgs { evt := baseEvt diff --git a/search-sync-worker/inbox_integration_test.go b/search-sync-worker/inbox_integration_test.go index 73772a515..d841fb0b5 100644 --- a/search-sync-worker/inbox_integration_test.go +++ b/search-sync-worker/inbox_integration_test.go @@ -157,8 +157,8 @@ func TestSpotlightSync_Integration(t *testing.T) { require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) - coll := newSpotlightCollection(indexName) - require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName)))) + coll := newSpotlightCollection(indexName, false) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName, false)))) preCreateIndex(t, esURL, indexName) waitForClusterGreen(t, esURL, 120*time.Second) @@ -249,8 +249,8 @@ func TestSpotlightSync_BulkInvite(t *testing.T) { require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) - coll := newSpotlightCollection(indexName) - require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName)))) + coll := newSpotlightCollection(indexName, false) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName, false)))) preCreateIndex(t, esURL, indexName) waitForClusterGreen(t, esURL, 120*time.Second) @@ -322,8 +322,8 @@ func TestUserRoomSync_Integration(t *testing.T) { require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) - coll := newUserRoomCollection(indexName) - require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName)))) + coll := newUserRoomCollection(indexName, false) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false)))) preCreateIndex(t, esURL, indexName) waitForClusterGreen(t, esURL, 120*time.Second) @@ -444,8 +444,8 @@ func TestUserRoomSync_BulkInvite(t *testing.T) { require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) - coll := newUserRoomCollection(indexName) - require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName)))) + coll := newUserRoomCollection(indexName, false) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false)))) preCreateIndex(t, esURL, indexName) waitForClusterGreen(t, esURL, 120*time.Second) @@ -559,8 +559,8 @@ func TestUserRoomSync_LWWGuard(t *testing.T) { require.NoError(t, err) waitForClusterGreen(t, esURL, 120*time.Second) - coll := newUserRoomCollection(indexName) - require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName)))) + coll := newUserRoomCollection(indexName, false) + require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false)))) preCreateIndex(t, esURL, indexName) waitForClusterGreen(t, esURL, 120*time.Second) diff --git a/search-sync-worker/integration_test.go b/search-sync-worker/integration_test.go index 6abc1677f..6443de343 100644 --- a/search-sync-worker/integration_test.go +++ b/search-sync-worker/integration_test.go @@ -316,8 +316,8 @@ func TestSearchSyncIntegration(t *testing.T) { // Wait for cluster to be green before creating indices. waitForClusterGreen(t, esURL, 120*time.Second) - coll := newMessageCollection(prefix) - err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix))) + coll := newMessageCollection(prefix, false) + err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix, false))) require.NoError(t, err, "upsert template") // Pre-create indices so shard allocation completes before bulk indexing. @@ -501,8 +501,8 @@ func TestCustomAnalyzer(t *testing.T) { waitForClusterGreen(t, esURL, 120*time.Second) - coll := newMessageCollection(prefix) - err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix))) + coll := newMessageCollection(prefix, false) + err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix, false))) require.NoError(t, err, "upsert template") preCreateIndex(t, esURL, prefix+"-2026-03") @@ -613,3 +613,98 @@ func TestCustomAnalyzer(t *testing.T) { "cross-compound should not match") }) } + +// TestSearchSyncSpotlightOrg_Integration drives the spotlight-org collection +// end to end: publish a gzipped HRSyncEvent to a fresh HR_SYNC stream, let +// the handler process the resulting message, and verify the ES documents. +// +// Doc-merge is the key behavior under test: a second event carrying only +// SectID + SectName must NOT clear the other stored fields. +func TestSearchSyncSpotlightOrg_Integration(t *testing.T) { + esURL := setupElasticsearch(t) + js, _ := setupNATSJetStream(t) + ctx := context.Background() + + const siteID = "site-org-int" + const indexName = "spotlightorg-site-org-int" + + engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL}) + require.NoError(t, err, "create search engine") + + waitForClusterGreen(t, esURL, 120*time.Second) + + coll := newSpotlightOrgCollection(indexName, true) + require.NoError(t, + engine.UpsertTemplate(ctx, coll.TemplateName(), coll.TemplateBody()), + "upsert spotlight-org template", + ) + preCreateIndex(t, esURL, indexName) + waitForClusterGreen(t, esURL, 120*time.Second) + + hrCfg := stream.HRSync(siteID) + _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: hrCfg.Name, + Subjects: hrCfg.Subjects, + }) + require.NoError(t, err, "create HR_SYNC stream") + + subj := subject.HRSyncEmployeesUpsert(siteID) + + publish := func(employees []SpotlightOrgIndex, ts int64) { + t.Helper() + _, err := js.Publish(ctx, subj, makeHRSyncEventGzip(t, ts, employees)) + require.NoError(t, err, "publish hr-sync event") + } + + cons, err := js.CreateOrUpdateConsumer(ctx, hrCfg.Name, jetstream.ConsumerConfig{ + Durable: "spotlight-org-sync-test", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubjects: coll.FilterSubjects(siteID), + }) + require.NoError(t, err, "create consumer") + handler := NewHandler(&engineAdapter{engine: engine}, coll, 100) + + publish([]SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering", DeptID: "D1", DeptName: "Tech"}, + {SectID: "S2", SectName: "Sales", DeptID: "D2", DeptName: "Biz"}, + }, time.Now().UnixMilli()) + + batch, err := cons.Fetch(1, jetstream.FetchMaxWait(10*time.Second)) + require.NoError(t, err) + for msg := range batch.Messages() { + handler.Add(msg) + } + handler.Flush(ctx) + refreshIndex(t, esURL, indexName) + + t.Run("two docs landed", func(t *testing.T) { + assert.Equal(t, 2, countDocs(t, esURL, indexName)) + s1 := getDoc(t, esURL, indexName, "S1") + require.NotNil(t, s1) + assert.Equal(t, "Engineering", s1["sectName"]) + assert.Equal(t, "Tech", s1["deptName"]) + s2 := getDoc(t, esURL, indexName, "S2") + require.NotNil(t, s2) + assert.Equal(t, "Sales", s2["sectName"]) + }) + + publish([]SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering Renamed"}, + }, time.Now().UnixMilli()+1) + + batch, err = cons.Fetch(1, jetstream.FetchMaxWait(10*time.Second)) + require.NoError(t, err) + for msg := range batch.Messages() { + handler.Add(msg) + } + handler.Flush(ctx) + refreshIndex(t, esURL, indexName) + + t.Run("doc-merge preserves untouched fields", func(t *testing.T) { + s1 := getDoc(t, esURL, indexName, "S1") + require.NotNil(t, s1) + assert.Equal(t, "Engineering Renamed", s1["sectName"], "renamed field updated") + assert.Equal(t, "D1", s1["deptId"], "deptId preserved") + assert.Equal(t, "Tech", s1["deptName"], "deptName preserved") + }) +} diff --git a/search-sync-worker/main.go b/search-sync-worker/main.go index 2d98a687b..6955479c1 100644 --- a/search-sync-worker/main.go +++ b/search-sync-worker/main.go @@ -50,7 +50,9 @@ type config struct { SearchTLSSkipVerify bool `env:"SEARCH_TLS_SKIP_VERIFY" envDefault:"false"` MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"` SpotlightIndex string `env:"SPOTLIGHT_INDEX" envDefault:""` + SpotlightOrgIndex string `env:"SPOTLIGHT_ORG_INDEX" envDefault:""` UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` + DevMode bool `env:"DEV_MODE" envDefault:"false"` // FetchBatchSize is the maximum number of JetStream messages to pull // per Fetch() round-trip. Smaller values give lower latency per message @@ -91,6 +93,9 @@ func main() { if cfg.SpotlightIndex == "" { cfg.SpotlightIndex = fmt.Sprintf("spotlight-%s-v1-chat", cfg.SiteID) } + if cfg.SpotlightOrgIndex == "" { + cfg.SpotlightOrgIndex = fmt.Sprintf("spotlightorg-%s", cfg.SiteID) + } if cfg.UserRoomIndex == "" { cfg.UserRoomIndex = fmt.Sprintf("user-room-%s", cfg.SiteID) } @@ -135,9 +140,10 @@ func main() { } collections := []Collection{ - newMessageCollection(cfg.MsgIndexPrefix), - newSpotlightCollection(cfg.SpotlightIndex), - newUserRoomCollection(cfg.UserRoomIndex), + newMessageCollection(cfg.MsgIndexPrefix, cfg.DevMode), + newSpotlightCollection(cfg.SpotlightIndex, cfg.DevMode), + newSpotlightOrgCollection(cfg.SpotlightOrgIndex, cfg.DevMode), + newUserRoomCollection(cfg.UserRoomIndex, cfg.DevMode), } for _, coll := range collections { @@ -173,15 +179,15 @@ func main() { // we don't redundantly call CreateOrUpdateStream per collection. createdStreams := make(map[string]struct{}, len(collections)) - // INBOX is owned by inbox-worker — see the skip in the loop below. + // INBOX is owned by inbox-worker; HR_SYNC is owned by hr-syncer. + // search-sync-worker is a pure consumer of both and must not create + // their schemas. inboxName := stream.Inbox(cfg.SiteID).Name + hrSyncName := stream.HRSync(cfg.SiteID).Name for _, coll := range collections { streamCfg := coll.StreamConfig(cfg.SiteID) - // Skip INBOX bootstrap — inbox-worker owns its schema, ops/IaC - // owns its federation. Consumer creation still runs for - // INBOX-based collections (spotlight, user-room). - if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName { + if cfg.Bootstrap.Enabled && streamCfg.Name != inboxName && streamCfg.Name != hrSyncName { if _, alreadyCreated := createdStreams[streamCfg.Name]; !alreadyCreated { if _, err := js.CreateOrUpdateStream(ctx, streamCfg); err != nil { slog.Error("create stream failed", "stream", streamCfg.Name, "error", err) @@ -220,7 +226,9 @@ func main() { "site", cfg.SiteID, "msgPrefix", cfg.MsgIndexPrefix, "spotlightIndex", cfg.SpotlightIndex, + "spotlightOrgIndex", cfg.SpotlightOrgIndex, "userRoomIndex", cfg.UserRoomIndex, + "devMode", cfg.DevMode, "collections", len(collections), ) diff --git a/search-sync-worker/messages.go b/search-sync-worker/messages.go index 600a7b938..e027faa2e 100644 --- a/search-sync-worker/messages.go +++ b/search-sync-worker/messages.go @@ -15,10 +15,11 @@ import ( // messageCollection implements Collection for message search sync. type messageCollection struct { indexPrefix string + devMode bool } -func newMessageCollection(indexPrefix string) *messageCollection { - return &messageCollection{indexPrefix: indexPrefix} +func newMessageCollection(indexPrefix string, devMode bool) *messageCollection { + return &messageCollection{indexPrefix: indexPrefix, devMode: devMode} } func (c *messageCollection) StreamConfig(siteID string) jetstream.StreamConfig { @@ -43,7 +44,7 @@ func (c *messageCollection) TemplateName() string { } func (c *messageCollection) TemplateBody() json.RawMessage { - return messageTemplateBody(c.indexPrefix) + return messageTemplateBody(c.indexPrefix, c.devMode) } func (c *messageCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { @@ -144,14 +145,15 @@ func messageTemplateProperties() map[string]any { return esPropertiesFromStruct[MessageSearchIndex]() } -func messageTemplateBody(prefix string) json.RawMessage { +func messageTemplateBody(prefix string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(4, 2, devMode) tmpl := map[string]any{ "index_patterns": []string{fmt.Sprintf("%s-*", prefix)}, "template": map[string]any{ "settings": map[string]any{ "index": map[string]any{ - "number_of_shards": 4, - "number_of_replicas": 2, + "number_of_shards": shards, + "number_of_replicas": replicas, "refresh_interval": "30s", }, "analysis": map[string]any{ diff --git a/search-sync-worker/messages_test.go b/search-sync-worker/messages_test.go index 9196132a3..232c03041 100644 --- a/search-sync-worker/messages_test.go +++ b/search-sync-worker/messages_test.go @@ -15,12 +15,12 @@ import ( ) func TestMessageCollection_TemplateName(t *testing.T) { - coll := newMessageCollection("messages-site1-v1") + coll := newMessageCollection("messages-site1-v1", false) assert.Equal(t, "messages-site1-v1_template", coll.TemplateName()) } func TestMessageCollection_TemplateBody(t *testing.T) { - coll := newMessageCollection("messages-site1-v1") + coll := newMessageCollection("messages-site1-v1", false) body := coll.TemplateBody() require.NotNil(t, body) @@ -52,13 +52,13 @@ func TestMessageCollection_TemplateBody(t *testing.T) { } func TestMessageCollection_StreamConfig(t *testing.T) { - coll := newMessageCollection("msgs-v1") + coll := newMessageCollection("msgs-v1", false) cfg := coll.StreamConfig("site-a") assert.Equal(t, "MESSAGES_CANONICAL_site-a", cfg.Name) } func TestMessageCollection_ConsumerName(t *testing.T) { - coll := newMessageCollection("msgs-v1") + coll := newMessageCollection("msgs-v1", false) assert.Equal(t, "message-sync", coll.ConsumerName()) } @@ -235,7 +235,7 @@ func TestNewMessageSearchIndex_TShowOmittedWhenFalse(t *testing.T) { } func TestMessageCollection_BuildAction(t *testing.T) { - coll := newMessageCollection("msgs-v1") + coll := newMessageCollection("msgs-v1", false) evt := model.MessageEvent{ Event: model.EventCreated, Message: model.Message{ @@ -258,3 +258,22 @@ func TestMessageCollection_BuildAction(t *testing.T) { assert.Error(t, err) }) } + +func TestMessageTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := messageTemplateBody("messages-site-a-v1", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(4), idx["number_of_shards"]) + assert.Equal(t, float64(2), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := messageTemplateBody("messages-site-a-v1", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +} diff --git a/search-sync-worker/spotlight.go b/search-sync-worker/spotlight.go index d5bcbd38e..1c975a00a 100644 --- a/search-sync-worker/spotlight.go +++ b/search-sync-worker/spotlight.go @@ -17,10 +17,11 @@ import ( type spotlightCollection struct { inboxMemberCollection indexName string + devMode bool } -func newSpotlightCollection(indexName string) *spotlightCollection { - return &spotlightCollection{indexName: indexName} +func newSpotlightCollection(indexName string, devMode bool) *spotlightCollection { + return &spotlightCollection{indexName: indexName, devMode: devMode} } func (c *spotlightCollection) ConsumerName() string { @@ -32,7 +33,7 @@ func (c *spotlightCollection) TemplateName() string { } func (c *spotlightCollection) TemplateBody() json.RawMessage { - return spotlightTemplateBody(c.indexName) + return spotlightTemplateBody(c.indexName, c.devMode) } // BuildAction fans a member_added / member_removed event out into one ES @@ -124,36 +125,21 @@ func newSpotlightSearchIndex(account string, evt *model.InboxMemberEvent) Spotli } // spotlightTemplateBody builds the ES index template for the spotlight -// collection. The `index_patterns` field is set to the exact configured -// index name so a custom SPOTLIGHT_INDEX value still receives the correct -// mapping (no broad wildcard that might catch unrelated indices). -func spotlightTemplateBody(indexName string) json.RawMessage { +// (room-typeahead) collection. Analyzer config is shared with the +// spotlight-org template via customAnalyzerSettings(). The +// `index_patterns` field is set to the exact configured index name so +// a custom SPOTLIGHT_INDEX value still receives the correct mapping. +func spotlightTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(3, 1, devMode) tmpl := map[string]any{ "index_patterns": []string{indexName}, "template": map[string]any{ "settings": map[string]any{ "index": map[string]any{ - "number_of_shards": 3, - "number_of_replicas": 1, - }, - "analysis": map[string]any{ - "analyzer": map[string]any{ - "custom_analyzer": map[string]any{ - "type": "custom", - "tokenizer": "custom_tokenizer", - "filter": []string{"lowercase"}, - }, - }, - "tokenizer": map[string]any{ - // Whitespace tokenizer only supports max_token_length - // (default 255). `token_chars` is valid on ngram / - // edge_ngram tokenizers, not whitespace — sending it - // here would reject the UpsertTemplate request. - "custom_tokenizer": map[string]any{ - "type": "whitespace", - }, - }, + "number_of_shards": shards, + "number_of_replicas": replicas, }, + "analysis": customAnalyzerSettings(), }, "mappings": map[string]any{ "dynamic": false, @@ -161,8 +147,6 @@ func spotlightTemplateBody(indexName string) json.RawMessage { }, }, } - // tmpl is built entirely from map/slice/string/int literals that are - // always JSON-marshalable, so the error cannot occur in practice. data, _ := json.Marshal(tmpl) return data } diff --git a/search-sync-worker/spotlight_org.go b/search-sync-worker/spotlight_org.go new file mode 100644 index 000000000..20b687a7f --- /dev/null +++ b/search-sync-worker/spotlight_org.go @@ -0,0 +1,190 @@ +package main + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/searchengine" + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) + +// spotlightOrgCollection maintains the spotlight-org ES index, one +// document per sectId, sourced from hr-syncer's batched HR account +// publishes. The doc ID is the sectId; many employees collapse to one +// document via dedup in BuildAction. The HR_SYNC stream is owned by +// hr-syncer — this collection is a pure consumer. +type spotlightOrgCollection struct { + indexName string + devMode bool +} + +func newSpotlightOrgCollection(indexName string, devMode bool) *spotlightOrgCollection { + return &spotlightOrgCollection{indexName: indexName, devMode: devMode} +} + +func (c *spotlightOrgCollection) StreamConfig(siteID string) jetstream.StreamConfig { + cfg := stream.HRSync(siteID) + return jetstream.StreamConfig{Name: cfg.Name, Subjects: cfg.Subjects} +} + +func (c *spotlightOrgCollection) ConsumerName() string { + return "spotlight-org-sync" +} + +func (c *spotlightOrgCollection) FilterSubjects(siteID string) []string { + return []string{subject.HRSyncEmployeesUpsert(siteID)} +} + +func (c *spotlightOrgCollection) TemplateName() string { + return "spotlight_org_template" +} + +func (c *spotlightOrgCollection) TemplateBody() json.RawMessage { + return spotlightOrgTemplateBody(c.indexName, c.devMode) +} + +// BuildAction parses an HR sync envelope, dedupes by SectID (last-wins), +// and emits one ES _update per unique sectId with doc_as_upsert:true. +// Doc-merge plus omitempty means partial-field publishes preserve the +// stored values for unset fields. Empty SectID rows and empty batches +// return (nil, nil) so the handler acks with no ES write. +func (c *spotlightOrgCollection) BuildAction(data []byte) ([]searchengine.BulkAction, error) { + var envelope model.HRSyncEvent + if err := json.Unmarshal(data, &envelope); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync envelope: %w", err) + } + if envelope.Timestamp <= 0 { + return nil, fmt.Errorf("build spotlight-org action: missing timestamp") + } + + var employees []SpotlightOrgIndex + if envelope.Gzip { + var compressed []byte + if err := json.Unmarshal(envelope.Payload, &compressed); err != nil { + return nil, fmt.Errorf("decode gzip payload base64: %w", err) + } + decompressed, err := gunzipBytes(compressed) + if err != nil { + return nil, fmt.Errorf("decompress hr-sync payload: %w", err) + } + if err := json.Unmarshal(decompressed, &employees); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync employees: %w", err) + } + } else if err := json.Unmarshal(envelope.Payload, &employees); err != nil { + return nil, fmt.Errorf("unmarshal hr-sync employees: %w", err) + } + if len(employees) == 0 { + return nil, nil + } + + // Empty SectID rows are skipped silently (employees not yet + // assigned to a section). Last write per sectId wins within a batch. + deduped := make(map[string]*SpotlightOrgIndex, len(employees)) + order := make([]string, 0, len(employees)) + for i := range employees { + emp := &employees[i] + if emp.SectID == "" { + continue + } + if _, seen := deduped[emp.SectID]; !seen { + order = append(order, emp.SectID) + } + deduped[emp.SectID] = emp + } + if len(deduped) == 0 { + return nil, nil + } + + actions := make([]searchengine.BulkAction, 0, len(deduped)) + for _, sectID := range order { + body, err := buildSpotlightOrgUpdateBody(deduped[sectID]) + if err != nil { + return nil, err + } + actions = append(actions, searchengine.BulkAction{ + Action: searchengine.ActionUpdate, + Index: c.indexName, + DocID: sectID, + Doc: body, + }) + } + return actions, nil +} + +// buildSpotlightOrgUpdateBody wraps row in {doc, doc_as_upsert:true} +// for ES. +func buildSpotlightOrgUpdateBody(row *SpotlightOrgIndex) (json.RawMessage, error) { + body := map[string]any{ + "doc": row, + "doc_as_upsert": true, + } + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal spotlight-org update body: %w", err) + } + return data, nil +} + +func gunzipBytes(b []byte) ([]byte, error) { + gr, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + // gzip.Reader.Close reports trailing checksum / truncation errors, + // which io.ReadAll can miss on a corrupted stream — surface both. + data, readErr := io.ReadAll(gr) + closeErr := gr.Close() + if readErr != nil { + return nil, readErr + } + if closeErr != nil { + return nil, closeErr + } + return data, nil +} + +// SpotlightOrgIndex is the wire row, ES document body, and ES mapping +// source for the spotlight-org index. omitempty drives the partial-update +// contract: absent fields don't appear in the ES doc and doc-merge +// preserves the stored value. Fields not declared here are ignored when +// unmarshaling, so hr-syncer may publish additional fields. +type SpotlightOrgIndex struct { + SectID string `json:"sectId,omitempty" es:"search_as_you_type,custom_analyzer"` + SectTCName string `json:"sectTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectName string `json:"sectName,omitempty" es:"search_as_you_type,custom_analyzer"` + SectDescription string `json:"sectDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptID string `json:"deptId,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptTCName string `json:"deptTCName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptName string `json:"deptName,omitempty" es:"search_as_you_type,custom_analyzer"` + DeptDescription string `json:"deptDescription,omitempty" es:"search_as_you_type,custom_analyzer"` + DivisionID string `json:"divisionId,omitempty" es:"search_as_you_type,custom_analyzer"` +} + +func spotlightOrgTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(3, 1, devMode) + tmpl := map[string]any{ + "index_patterns": []string{indexName}, + "template": map[string]any{ + "settings": map[string]any{ + "index": map[string]any{ + "number_of_shards": shards, + "number_of_replicas": replicas, + }, + "analysis": customAnalyzerSettings(), + }, + "mappings": map[string]any{ + "dynamic": false, + "properties": esPropertiesFromStruct[SpotlightOrgIndex](), + }, + }, + } + data, _ := json.Marshal(tmpl) + return data +} diff --git a/search-sync-worker/spotlight_org_test.go b/search-sync-worker/spotlight_org_test.go new file mode 100644 index 000000000..6413653d1 --- /dev/null +++ b/search-sync-worker/spotlight_org_test.go @@ -0,0 +1,326 @@ +package main + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "reflect" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/searchengine" +) + +func TestSpotlightOrgCollection_Metadata(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + + assert.Equal(t, "spotlight-org-sync", coll.ConsumerName()) + assert.Equal(t, "spotlight_org_template", coll.TemplateName()) + + cfg := coll.StreamConfig("site-a") + assert.Equal(t, "HR_SYNC_site-a", cfg.Name) + assert.Equal(t, []string{"hr.sync.site-a.>"}, cfg.Subjects) + assert.Empty(t, cfg.Sources) + + filters := coll.FilterSubjects("site-a") + assert.Equal(t, []string{"hr.sync.site-a.employees.upsert"}, filters) +} + +func TestSpotlightOrgTemplateBody_Prod(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + body := coll.TemplateBody() + require.NotNil(t, body) + + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + + patterns := parsed["index_patterns"].([]any) + assert.Equal(t, "spotlightorg-site-a", patterns[0]) + + tmpl := parsed["template"].(map[string]any) + idx := tmpl["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(3), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + + mappings := tmpl["mappings"].(map[string]any) + assert.Equal(t, false, mappings["dynamic"]) + props := mappings["properties"].(map[string]any) + for _, f := range []string{ + "sectId", "sectTCName", "sectName", "sectDescription", + "deptId", "deptTCName", "deptName", "deptDescription", "divisionId", + } { + field, ok := props[f].(map[string]any) + require.True(t, ok, "missing property: %s", f) + assert.Equal(t, "search_as_you_type", field["type"], "field %s wrong type", f) + assert.Equal(t, "custom_analyzer", field["analyzer"], "field %s wrong analyzer", f) + } +} + +func TestSpotlightOrgTemplateBody_Dev(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", true) + body := coll.TemplateBody() + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) +} + +// TestSpotlightOrgTemplateProperties_MatchesStruct guards against +// silent drift between SpotlightOrgIndex (the wire row + ES doc +// projection + mapping source) and the properties block written +// into the ES template. If a future edit adds an `es`-tagged field +// to the struct or forgets to add one, this test fails. Mirrors +// TestSpotlightTemplateProperties_MatchesStruct in spotlight_test.go. +func TestSpotlightOrgTemplateProperties_MatchesStruct(t *testing.T) { + props := esPropertiesFromStruct[SpotlightOrgIndex]() + + typ := reflect.TypeOf(SpotlightOrgIndex{}) + esFieldCount := 0 + for i := range typ.NumField() { + field := typ.Field(i) + esTag := field.Tag.Get("es") + if esTag == "" || esTag == "-" { + continue + } + esFieldCount++ + jsonTag := field.Tag.Get("json") + name, _, _ := strings.Cut(jsonTag, ",") + _, ok := props[name] + assert.True(t, ok, "template missing property for field %s (json %s)", field.Name, name) + } + assert.Equal(t, esFieldCount, len(props)) + assert.Equal(t, 9, esFieldCount, "SpotlightOrgIndex should expose exactly 9 ES-mapped fields") +} + +func makeHRSyncEvent(t *testing.T, ts int64, employees []SpotlightOrgIndex) []byte { + t.Helper() + payload, err := json.Marshal(employees) + require.NoError(t, err) + evt := model.HRSyncEvent{ + Timestamp: ts, + BatchID: "b-1", + Gzip: false, + Payload: payload, + } + data, err := json.Marshal(evt) + require.NoError(t, err) + return data +} + +func TestSpotlightOrg_BuildAction_HappyPath(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1735689600000, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Eng", DeptID: "D1", DeptName: "Tech"}, + {SectID: "S2", SectName: "Sales", DeptID: "D2", DeptName: "Biz"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 2) + + docIDs := map[string]bool{} + for _, a := range actions { + assert.Equal(t, searchengine.ActionUpdate, a.Action) + assert.Equal(t, "spotlightorg-site-a", a.Index) + assert.Equal(t, int64(0), a.Version, "ActionUpdate must not use external versioning") + docIDs[a.DocID] = true + + var body map[string]any + require.NoError(t, json.Unmarshal(a.Doc, &body)) + assert.Equal(t, true, body["doc_as_upsert"]) + assert.Contains(t, body, "doc") + } + assert.True(t, docIDs["S1"]) + assert.True(t, docIDs["S2"]) +} + +// TestSpotlightOrg_BuildAction_DedupBySectID covers many-employees-one-section: +// multiple employees sharing the same sectId must collapse to a single ES +// action. The kept row is the LAST occurrence so the most recent in-batch +// update wins. +func TestSpotlightOrg_BuildAction_DedupBySectID(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + {SectID: "S1", SectName: "Engineering Renamed"}, + {SectID: "S2", SectName: "Sales"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 2) + + var s1Body map[string]any + for _, a := range actions { + if a.DocID == "S1" { + require.NoError(t, json.Unmarshal(a.Doc, &s1Body)) + } + } + require.NotNil(t, s1Body) + doc := s1Body["doc"].(map[string]any) + assert.Equal(t, "Engineering Renamed", doc["sectName"], "last-wins on dedup") +} + +func TestSpotlightOrg_BuildAction_EmptySectIDsSkipped(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "", SectName: "no-section"}, + {SectID: "S1", SectName: "Eng"}, + {SectID: "", DeptID: "D9"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + assert.Equal(t, "S1", actions[0].DocID) +} + +func TestSpotlightOrg_BuildAction_AllEmptySectIDs(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectName: "no-section-1"}, + {SectName: "no-section-2"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + assert.Nil(t, actions, "all empty sectIds → zero actions, handler acks JS msg without ES write") +} + +func TestSpotlightOrg_BuildAction_EmptyEmployees(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{}) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + assert.Nil(t, actions) +} + +func makeHRSyncEventGzip(t *testing.T, ts int64, employees []SpotlightOrgIndex) []byte { + t.Helper() + raw, err := json.Marshal(employees) + require.NoError(t, err) + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + _, err = gz.Write(raw) + require.NoError(t, err) + require.NoError(t, gz.Close()) + // Gzip payloads are carried as base64-encoded JSON strings so they + // survive JSON marshaling of the envelope. json.Marshal([]byte) + // automatically base64-encodes, matching what BuildAction expects. + encodedPayload, err := json.Marshal(buf.Bytes()) + require.NoError(t, err) + evt := model.HRSyncEvent{ + Timestamp: ts, + BatchID: "b-1", + Gzip: true, + Payload: json.RawMessage(encodedPayload), + } + data, err := json.Marshal(evt) + require.NoError(t, err) + return data +} + +func TestSpotlightOrg_BuildAction_Gzip(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEventGzip(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + assert.Equal(t, "S1", actions[0].DocID) + + var body map[string]any + require.NoError(t, json.Unmarshal(actions[0].Doc, &body)) + doc := body["doc"].(map[string]any) + assert.Equal(t, "Engineering", doc["sectName"]) +} + +// TestSpotlightOrg_BuildAction_PartialFields locks in the partial-update +// contract: a row carrying only SectID + SectName must produce an ES `doc` +// body containing ONLY those two keys. Other org fields must be absent so +// doc-merge preserves their stored values. +func TestSpotlightOrg_BuildAction_PartialFields(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + data := makeHRSyncEvent(t, 1, []SpotlightOrgIndex{ + {SectID: "S1", SectName: "Engineering"}, + }) + + actions, err := coll.BuildAction(data) + require.NoError(t, err) + require.Len(t, actions, 1) + + var body map[string]any + require.NoError(t, json.Unmarshal(actions[0].Doc, &body)) + doc := body["doc"].(map[string]any) + + assert.Equal(t, "S1", doc["sectId"]) + assert.Equal(t, "Engineering", doc["sectName"]) + for _, absent := range []string{ + "sectTCName", "sectDescription", + "deptId", "deptTCName", "deptName", "deptDescription", "divisionId", + } { + _, present := doc[absent] + assert.False(t, present, "doc must not carry %s when input did not set it", absent) + } +} + +func TestSpotlightOrg_BuildAction_Errors(t *testing.T) { + coll := newSpotlightOrgCollection("spotlightorg-site-a", false) + + t.Run("malformed envelope", func(t *testing.T) { + _, err := coll.BuildAction([]byte("{invalid")) + assert.Error(t, err) + }) + + t.Run("missing timestamp", func(t *testing.T) { + data := makeHRSyncEvent(t, 0, []SpotlightOrgIndex{{SectID: "S1"}}) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) + + t.Run("malformed employees payload", func(t *testing.T) { + evt := model.HRSyncEvent{ + Timestamp: 1, + Payload: json.RawMessage(`not json`), + } + data, _ := json.Marshal(evt) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) + + t.Run("corrupt gzip header", func(t *testing.T) { + // Encode "not gzip" bytes as a base64 JSON string so the envelope + // marshals cleanly. BuildAction will decode it, then fail when + // gzip.NewReader sees no valid gzip magic bytes. + corruptGzip, _ := json.Marshal([]byte("not gzip")) + evt := model.HRSyncEvent{ + Timestamp: 1, + Gzip: true, + Payload: json.RawMessage(corruptGzip), + } + data, _ := json.Marshal(evt) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) + + t.Run("gzip payload not a base64 string", func(t *testing.T) { + // Payload must be a JSON string (base64) when Gzip=true. + // If it's a JSON number or object, the base64 decode step fails. + evt := model.HRSyncEvent{ + Timestamp: 1, + Gzip: true, + Payload: json.RawMessage(`123`), + } + data, _ := json.Marshal(evt) + _, err := coll.BuildAction(data) + assert.Error(t, err) + }) +} diff --git a/search-sync-worker/spotlight_test.go b/search-sync-worker/spotlight_test.go index 8c9722e1d..f7aa4b33c 100644 --- a/search-sync-worker/spotlight_test.go +++ b/search-sync-worker/spotlight_test.go @@ -46,7 +46,7 @@ func baseInboxMemberEvent() *model.InboxMemberEvent { } func TestSpotlightCollection_Metadata(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) assert.Equal(t, "spotlight-sync", coll.ConsumerName()) assert.Equal(t, "spotlight_template", coll.TemplateName()) @@ -69,7 +69,7 @@ func TestSpotlightCollection_Metadata(t *testing.T) { } func TestSpotlightCollection_TemplateBody(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) body := coll.TemplateBody() require.NotNil(t, body) @@ -115,7 +115,7 @@ func TestSpotlightTemplateProperties_MatchesStruct(t *testing.T) { } func TestSpotlightCollection_BuildAction_MemberAdded(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) payload := baseInboxMemberEvent() data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, 1000) @@ -142,7 +142,7 @@ func TestSpotlightCollection_BuildAction_MemberAdded(t *testing.T) { } func TestSpotlightCollection_BuildAction_MemberRemoved(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) payload := baseInboxMemberEvent() data := makeInboxMemberEvent(t, model.OutboxMemberRemoved, payload, 2000) @@ -161,7 +161,7 @@ func TestSpotlightCollection_BuildAction_MemberRemoved(t *testing.T) { func TestSpotlightCollection_BuildAction_RestrictedRoomIndexedLikeAnyOther(t *testing.T) { // See spotlightCollection.BuildAction docstring for the room-name // vs message-content access boundary. - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob"} hss := int64(1735689500000) @@ -196,7 +196,7 @@ func TestSpotlightCollection_BuildAction_RestrictedRoomIndexedLikeAnyOther(t *te } func TestSpotlightCollection_BuildAction_Errors(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) t.Run("malformed outbox event", func(t *testing.T) { _, err := coll.BuildAction([]byte("{invalid")) @@ -255,7 +255,7 @@ func TestSpotlightCollection_BuildAction_Errors(t *testing.T) { // event carrying N accounts produces N index actions, all sharing the same // external Version (event timestamp). func TestSpotlightCollection_BuildAction_BulkInvite(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob", "carol"} data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, 12345) @@ -279,10 +279,28 @@ func TestSpotlightCollection_BuildAction_BulkInvite(t *testing.T) { } } +// TestSpotlightTemplateBody_HasTokenChars locks in that the spotlight +// template adopts the shared analyzer (which carries token_chars). +// Before this change the spotlight tokenizer had no token_chars set. +func TestSpotlightTemplateBody_HasTokenChars(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + + tmpl := parsed["template"].(map[string]any) + settings := tmpl["settings"].(map[string]any) + analysis := settings["analysis"].(map[string]any) + tokenizer := analysis["tokenizer"].(map[string]any) + custom := tokenizer["custom_tokenizer"].(map[string]any) + tokenChars, ok := custom["token_chars"].([]any) + require.True(t, ok, "custom_tokenizer must declare token_chars after the refactor") + assert.ElementsMatch(t, []any{"letter", "digit", "punctuation", "symbol"}, tokenChars) +} + // TestSpotlightCollection_BuildAction_BulkRemove verifies fan-out on remove: // N accounts → N delete actions. func TestSpotlightCollection_BuildAction_BulkRemove(t *testing.T) { - coll := newSpotlightCollection("spotlight-site-a-v1-chat") + coll := newSpotlightCollection("spotlight-site-a-v1-chat", false) payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob"} data := makeInboxMemberEvent(t, model.OutboxMemberRemoved, payload, 67890) @@ -297,3 +315,22 @@ func TestSpotlightCollection_BuildAction_BulkRemove(t *testing.T) { assert.Nil(t, action.Doc) } } + +func TestSpotlightTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(3), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := spotlightTemplateBody("spotlight-site-a-v1-chat", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +} diff --git a/search-sync-worker/template.go b/search-sync-worker/template.go index 5165511e7..dd41774ec 100644 --- a/search-sync-worker/template.go +++ b/search-sync-worker/template.go @@ -41,3 +41,33 @@ func esPropertiesFromStruct[T any]() map[string]any { } return props } + +// customAnalyzerSettings is the analysis block shared by the spotlight +// and spotlight-org templates: a whitespace tokenizer with permissive +// token_chars feeding a lowercase-folding analyzer. +func customAnalyzerSettings() map[string]any { + return map[string]any{ + "analyzer": map[string]any{ + "custom_analyzer": map[string]any{ + "type": "custom", + "tokenizer": "custom_tokenizer", + "filter": []string{"lowercase"}, + }, + }, + "tokenizer": map[string]any{ + "custom_tokenizer": map[string]any{ + "type": "whitespace", + "token_chars": []string{"letter", "digit", "punctuation", "symbol"}, + }, + }, + } +} + +// indexTopology returns (shards, replicas) for an ES index template. +// In dev mode every template collapses to 1/0 regardless of prod values. +func indexTopology(prodShards, prodReplicas int, devMode bool) (int, int) { + if devMode { + return 1, 0 + } + return prodShards, prodReplicas +} diff --git a/search-sync-worker/template_test.go b/search-sync-worker/template_test.go new file mode 100644 index 000000000..277a7b8d5 --- /dev/null +++ b/search-sync-worker/template_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIndexTopology_Prod(t *testing.T) { + shards, replicas := indexTopology(4, 2, false) + assert.Equal(t, 4, shards) + assert.Equal(t, 2, replicas) +} + +func TestIndexTopology_Dev(t *testing.T) { + // Dev collapses every input to 1/0 regardless of prod values. + shards, replicas := indexTopology(4, 2, true) + assert.Equal(t, 1, shards) + assert.Equal(t, 0, replicas) +} + +func TestCustomAnalyzerSettings_Shape(t *testing.T) { + got := customAnalyzerSettings() + + analyzer := got["analyzer"].(map[string]any) + custom := analyzer["custom_analyzer"].(map[string]any) + assert.Equal(t, "custom", custom["type"]) + assert.Equal(t, "custom_tokenizer", custom["tokenizer"]) + assert.Equal(t, []string{"lowercase"}, custom["filter"]) + + tokenizer := got["tokenizer"].(map[string]any) + tok := tokenizer["custom_tokenizer"].(map[string]any) + assert.Equal(t, "whitespace", tok["type"]) + assert.Equal(t, []string{"letter", "digit", "punctuation", "symbol"}, tok["token_chars"]) +} diff --git a/search-sync-worker/user_room.go b/search-sync-worker/user_room.go index aa16da4e5..c691b5de3 100644 --- a/search-sync-worker/user_room.go +++ b/search-sync-worker/user_room.go @@ -26,10 +26,11 @@ import ( type userRoomCollection struct { inboxMemberCollection indexName string + devMode bool } -func newUserRoomCollection(indexName string) *userRoomCollection { - return &userRoomCollection{indexName: indexName} +func newUserRoomCollection(indexName string, devMode bool) *userRoomCollection { + return &userRoomCollection{indexName: indexName, devMode: devMode} } func (c *userRoomCollection) ConsumerName() string { @@ -41,7 +42,7 @@ func (c *userRoomCollection) TemplateName() string { } func (c *userRoomCollection) TemplateBody() json.RawMessage { - return userRoomTemplateBody(c.indexName) + return userRoomTemplateBody(c.indexName, c.devMode) } // addRoomScript / removeRoomScript implement application-level last-write-wins @@ -233,14 +234,15 @@ func buildRemoveRoomUpdateBody(roomID string, ts int64) (json.RawMessage, error) // index name so a custom USER_ROOM_INDEX value still receives the correct // mapping. The `roomTimestamps` field is mapped as `flattened` so new // roomIds don't balloon the mapping with per-key dynamic sub-fields. -func userRoomTemplateBody(indexName string) json.RawMessage { +func userRoomTemplateBody(indexName string, devMode bool) json.RawMessage { + shards, replicas := indexTopology(1, 1, devMode) tmpl := map[string]any{ "index_patterns": []string{indexName}, "template": map[string]any{ "settings": map[string]any{ "index": map[string]any{ - "number_of_shards": 1, - "number_of_replicas": 1, + "number_of_shards": shards, + "number_of_replicas": replicas, }, }, "mappings": map[string]any{ diff --git a/search-sync-worker/user_room_test.go b/search-sync-worker/user_room_test.go index a3c18cb68..e364ff0bf 100644 --- a/search-sync-worker/user_room_test.go +++ b/search-sync-worker/user_room_test.go @@ -12,7 +12,7 @@ import ( ) func TestUserRoomCollection_Metadata(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) assert.Equal(t, "user-room-sync", coll.ConsumerName()) assert.Equal(t, "user_room_template", coll.TemplateName()) @@ -36,7 +36,7 @@ func TestUserRoomCollection_Metadata(t *testing.T) { } func TestUserRoomCollection_TemplateBody(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) body := coll.TemplateBody() require.NotNil(t, body) @@ -69,7 +69,7 @@ func TestUserRoomCollection_TemplateBody(t *testing.T) { } func TestUserRoomCollection_BuildAction_MemberAdded(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() const ts int64 = 1735689600000 data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, ts) @@ -124,7 +124,7 @@ func TestUserRoomCollection_BuildAction_MemberAdded(t *testing.T) { } func TestUserRoomCollection_BuildAction_MemberAdded_Restricted(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() const ts int64 = 1735689700000 const hssVal int64 = 1735689500000 @@ -162,7 +162,7 @@ func TestUserRoomCollection_BuildAction_MemberAdded_Restricted(t *testing.T) { } func TestUserRoomCollection_BuildAction_MemberRemoved(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() const ts int64 = 1735689700000 data := makeInboxMemberEvent(t, model.OutboxMemberRemoved, payload, ts) @@ -202,7 +202,7 @@ func TestUserRoomCollection_BuildAction_MemberRemoved(t *testing.T) { // seeded with `restrictedRooms[rid] = hss` and an empty `rooms[]`. All // actions share the same HSS (event-level field). func TestUserRoomCollection_BuildAction_BulkMixed_AllRestricted(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob", "carol"} const hssVal int64 = 1735689500000 @@ -232,7 +232,7 @@ func TestUserRoomCollection_BuildAction_BulkMixed_AllRestricted(t *testing.T) { // remove body touches both rooms[] and restrictedRooms{} so a member_removed // event works regardless of which slot currently holds the rid. func TestUserRoomCollection_BuildAction_RemoveScriptEvictsBoth(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() data := makeInboxMemberEvent(t, model.OutboxMemberRemoved, payload, 200) @@ -253,7 +253,7 @@ func TestUserRoomCollection_BuildAction_RemoveScriptEvictsBoth(t *testing.T) { // with N accounts produces N distinct user-room update actions (each keyed // by a different account). func TestUserRoomCollection_BuildAction_BulkInvite(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) payload := baseInboxMemberEvent() payload.Accounts = []string{"alice", "bob", "carol"} data := makeInboxMemberEvent(t, model.OutboxMemberAdded, payload, 12345) @@ -275,7 +275,7 @@ func TestUserRoomCollection_BuildAction_BulkInvite(t *testing.T) { } func TestUserRoomCollection_BuildAction_Errors(t *testing.T) { - coll := newUserRoomCollection("user-room-site-a") + coll := newUserRoomCollection("user-room-site-a", false) t.Run("malformed outbox event", func(t *testing.T) { _, err := coll.BuildAction([]byte("{invalid")) @@ -324,3 +324,22 @@ func TestUserRoomCollection_BuildAction_Errors(t *testing.T) { assert.Error(t, err) }) } + +func TestUserRoomTemplateBody_DevMode(t *testing.T) { + t.Run("prod", func(t *testing.T) { + body := userRoomTemplateBody("user-room-site-a", false) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(1), idx["number_of_replicas"]) + }) + t.Run("dev", func(t *testing.T) { + body := userRoomTemplateBody("user-room-site-a", true) + var parsed map[string]any + require.NoError(t, json.Unmarshal(body, &parsed)) + idx := parsed["template"].(map[string]any)["settings"].(map[string]any)["index"].(map[string]any) + assert.Equal(t, float64(1), idx["number_of_shards"]) + assert.Equal(t, float64(0), idx["number_of_replicas"]) + }) +}