feat: spotlight-org sync from hr-syncer events#170
Conversation
Brainstormed design for a new spotlight-org collection in
search-sync-worker that consumes hr.sync.{siteID}.employees.upsert
events from hr-syncer (replacing the MongoDB change-stream pipeline)
and maintains the spotlightorg-{siteID} ES index keyed by sectId
via doc-merge upserts.
https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Bite-sized TDD plan for the spotlight-org sync design committed in 6ca5f07. 17 tasks covering: Employee/HRSyncEvent models, HR_SYNC stream + subjects, shared template helpers (indexTopology + customAnalyzerSettings), DEV_MODE threading across all four collections, the new spotlight-org collection (metadata + BuildAction with dedup + gzip + partial-field merge), main.go wiring, integration test, and docker-compose. https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Add the shared building blocks for hr-syncer's daily HR account
synchronization:
- model.HRSyncEvent: envelope on every hr.sync.* subject (timestamp,
batchId, gzip flag, opaque payload). Gzip=true means Payload is a
JSON string carrying base64(gzip(JSON array)); Gzip=false means
Payload is the JSON array embedded verbatim. Consumers type the
payload per-subject via their own local projection structs — no
pkg/model.Employee is introduced here so the wire stays compatible
with the internal repo's existing Employee/Org types.
- stream.HRSync(siteID): HR_SYNC_{siteID} stream definition. Schema
owned by hr-syncer; consumers like search-sync-worker skip it in
their bootstrap loop the same way they skip INBOX.
- subject.HRSyncEmployeesUpsert / subject.HRSyncUsersUpsert: subject
builders. The users subject is reserved for a separate consumer.
https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
…pers A single DEV_MODE env var now collapses every ES index template to 1 shard / 0 replicas for local dev; prod values per collection are unchanged. - template.go: new indexTopology(prodShards, prodReplicas, devMode) helper centralizes the toggle, and customAnalyzerSettings() factors out the analysis block shared by spotlight and (in the next commit) spotlight-org. The custom_tokenizer now declares token_chars: [letter, digit, punctuation, symbol] — verified to be accepted by ES 8.11 against the stale comment that claimed otherwise. - messageCollection: threads devMode through; messageTemplateBody becomes (prefix, devMode); prod stays 4 shards / 2 replicas. - spotlightCollection: same shape; prod stays 3 / 1; analyzer block now routes through customAnalyzerSettings(). - userRoomCollection: same shape; prod stays 1 / 1. All call sites in tests (handler_test, inbox_integration_test, spotlight_test, etc.) updated to the new signatures. main.go is left passing false placeholders here — it gets cfg.DevMode wired in the next commit alongside the spotlight-org registration. https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Maintains a per-section ES index (spotlightorg-{siteID}) keyed by
sectId, sourced from hr-syncer's daily batch publishes on
hr.sync.{siteID}.employees.upsert. Replaces the previous MongoDB
change-stream pipeline.
Design highlights:
- SpotlightOrgIndex is one struct serving three roles: wire-side row
unmarshal target, ES doc body on write, and source of truth for the
ES mapping via esPropertiesFromStruct. Nine string fields with
omitempty and search_as_you_type / custom_analyzer ES mapping.
- BuildAction parses the HRSyncEvent envelope, optionally
base64-decodes + gunzips a compressed payload, unmarshals
[]SpotlightOrgIndex, dedupes by SectID keeping last-wins per
batch, and emits one ES _update per unique sectId with
doc_as_upsert:true. Doc-merge means partial-field events from
hr-syncer preserve untouched stored fields — no painless script
needed. Empty SectID rows and all-empty batches return (nil, nil)
so the handler acks with no ES write.
- ActionUpdate is used WITHOUT external versioning; handler.go's 409
logic for ActionUpdate depends on Version=0.
- Stream HR_SYNC_{siteID} is owned by hr-syncer; this worker is a
pure consumer and main.go's bootstrap loop skips it the same way
it skips INBOX.
- DEV_MODE=true collapses the new index to 1 shard / 0 replicas; prod
uses 3 / 1.
main.go: register newSpotlightOrgCollection, default
SPOTLIGHT_ORG_INDEX to spotlightorg-{siteID}, plumb cfg.DevMode into
all four collections, and add HR_SYNC to the bootstrap skip list.
Integration test exercises gzip publish, doc upsert, and doc-merge
field preservation via the existing testcontainers harness.
docker-compose.yml: DEV_MODE=${DEV_MODE:-true} for local dev.
The design and plan docs reflect a late-stage redesign: an earlier
draft introduced a pkg/model.Employee struct, but that would conflict
on merge with the internal repo's existing Employee/Org types. The
consumer-side projection now lives in this collection only.
https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Three small improvements surfaced by /simplify:
- Drop the dedup struct copy by switching the BuildAction map from
map[string]SpotlightOrgIndex to map[string]*SpotlightOrgIndex. On
high-cardinality batches (e.g., 500 employees in one section) this
removes 499 × 144 bytes of redundant struct copying per JetStream
message.
- The integration test's inline publish closure duplicated
makeHRSyncEventGzip's encoding logic; route it through the helper
instead. Drops the local bytes/gzip use, so the import goes too.
- Trim docstrings on the new types and helpers. Cut narrative comments
("Used by every BuildAction test...", "handler.go::isBulkItemSuccess
depends on this", "Task 14 will wire ..."), kept only non-obvious WHY
notes the next reader actually needs (Gzip+base64 wire format,
partial-update contract, hr-syncer ownership).
https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR adds a new ChangesSpotlight Org Sync Implementation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
search-sync-worker/template_test.go (1)
22-35: 💤 Low valueConsider guarding type assertions with
require.NotNilchecks.The type assertions on lines 25-34 will panic if
customAnalyzerSettings()returns an unexpected structure. While test panics are acceptable failures, addingrequire.NotNilchecks before each type assertion would make failures clearer and provide better diagnostics.🛡️ Example: safer assertions
func TestCustomAnalyzerSettings_Shape(t *testing.T) { got := customAnalyzerSettings() analyzer := got["analyzer"].(map[string]any) + require.NotNil(t, analyzer, "analyzer section must be present") custom := analyzer["custom_analyzer"].(map[string]any) + require.NotNil(t, custom, "custom_analyzer must be present") assert.Equal(t, "custom", custom["type"]) ...🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@search-sync-worker/template_test.go` around lines 22 - 35, TestCustomAnalyzerSettings_Shape uses unchecked type assertions on the structure returned by customAnalyzerSettings(), which can panic and obscure failures; before each assertion, add require.NotNil checks for got, analyzer := got["analyzer"], custom := analyzer["custom_analyzer"], tokenizer := got["tokenizer"], and tok := tokenizer["custom_tokenizer"] (or equivalent intermediate values) to ensure those keys exist and are non-nil, then proceed with the type assertions and existing assert.Equal checks in the TestCustomAnalyzerSettings_Shape function.search-sync-worker/spotlight_org.go (1)
135-142: 💤 Low valueOptional: surface
gzip.Reader.Close()error instead of silently discarding viadefer.
gr.Close()can return a checksum/truncation error after the body has already been read. Withdefer gr.Close()the error is dropped, so a corrupt-tail payload silently parses as a successful (possibly truncated) JSON. Low-impact in practice —io.ReadAlltypically surfaces the same error first — but worth tightening since this is the trust boundary for an external publisher.♻️ Suggested rewrite
func gunzipBytes(b []byte) ([]byte, error) { gr, err := gzip.NewReader(bytes.NewReader(b)) if err != nil { return nil, err } - defer gr.Close() - return io.ReadAll(gr) + data, readErr := io.ReadAll(gr) + closeErr := gr.Close() + if readErr != nil { + return nil, readErr + } + if closeErr != nil { + return nil, closeErr + } + return data, nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@search-sync-worker/spotlight_org.go` around lines 135 - 142, The current gunzipBytes function defers gzip.NewReader.Close(), which discards any Close() error (e.g., checksum/truncation) — change the function to explicitly call io.ReadAll(gr) into a variable, then call gr.Close() and if Close() returns a non-nil error return that error (or if both Read and Close produce errors prefer the read error but surface the close error when present); update the implementation around gzip.NewReader, io.ReadAll and gr.Close() so Close() errors are not silently ignored.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@search-sync-worker/spotlight_org.go`:
- Around line 135-142: The current gunzipBytes function defers
gzip.NewReader.Close(), which discards any Close() error (e.g.,
checksum/truncation) — change the function to explicitly call io.ReadAll(gr)
into a variable, then call gr.Close() and if Close() returns a non-nil error
return that error (or if both Read and Close produce errors prefer the read
error but surface the close error when present); update the implementation
around gzip.NewReader, io.ReadAll and gr.Close() so Close() errors are not
silently ignored.
In `@search-sync-worker/template_test.go`:
- Around line 22-35: TestCustomAnalyzerSettings_Shape uses unchecked type
assertions on the structure returned by customAnalyzerSettings(), which can
panic and obscure failures; before each assertion, add require.NotNil checks for
got, analyzer := got["analyzer"], custom := analyzer["custom_analyzer"],
tokenizer := got["tokenizer"], and tok := tokenizer["custom_tokenizer"] (or
equivalent intermediate values) to ensure those keys exist and are non-nil, then
proceed with the type assertions and existing assert.Equal checks in the
TestCustomAnalyzerSettings_Shape function.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5e7e4d29-64c0-4fdf-a305-4b2423ad6386
📒 Files selected for processing (23)
docs/superpowers/plans/2026-05-11-spotlight-org-sync.mddocs/superpowers/specs/2026-05-11-spotlight-org-sync-design.mdpkg/model/hrsync.gopkg/model/model_test.gopkg/stream/stream.gopkg/stream/stream_test.gopkg/subject/subject.gopkg/subject/subject_test.gosearch-sync-worker/deploy/docker-compose.ymlsearch-sync-worker/handler_test.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/integration_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/spotlight.gosearch-sync-worker/spotlight_org.gosearch-sync-worker/spotlight_org_test.gosearch-sync-worker/spotlight_test.gosearch-sync-worker/template.gosearch-sync-worker/template_test.gosearch-sync-worker/user_room.gosearch-sync-worker/user_room_test.go
gzip.Reader.Close reports trailing checksum / truncation errors that io.ReadAll can miss on a corrupted stream. Replace the deferred close with an explicit one so a truncated publisher payload doesn't parse as a successful (short) JSON. Per CodeRabbit review on PR #170. https://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Summary
search-sync-workerthat reads daily batched events fromhr-synceronhr.sync.{siteID}.employees.upsertand maintains thespotlightorg-{siteID}Elasticsearch index keyed bysectId.HRSyncEventenvelope (pkg/model),HR_SYNC_{siteID}stream (pkg/stream), and subject builders (pkg/subject). Consumer-side row + ES doc + ES mapping are one struct (SpotlightOrgIndex) — no publicpkg/model.Employeeis introduced so it doesn't conflict with the internal repo's existing one.DEV_MODEtoggle that collapses every search-sync-worker ES index template to 1 shard / 0 replicas in local dev; prod values per collection unchanged. Extracts a sharedcustomAnalyzerSettings()helper used by both spotlight templates.Design
HRSyncEvent{ Timestamp, BatchID, Gzip, Payload }. WhenGzip=truethe payload rides as a JSON string ofbase64(gzip(JSON array))—json.RawMessagemust itself be valid JSON, so raw binary can't sit in the envelope. Decoders unmarshal into[]byte(base64-decodes) then gunzip.hr-syncer(e.g., one employee row with onlysectIdand a renamedsectName) produce an ES_updatebody containing only the changed keys. Doc-merge preserves all other stored fields without a painless script.SectID: within a batch many employees share the same section.BuildActioncollapses to one ES_updateper unique sectId (last-wins). Empty-SectIDrows are skipped silently; all-empty batches return(nil, nil)so the handler acks with no ES write.HR_SYNC_{siteID}schema is owned byhr-syncer.search-sync-workeris a pure consumer and skips this stream in its bootstrap loop, matching how it already skipsINBOX(owned byinbox-worker).Commits
feat(model,stream,subject): HR sync wire infrastructure—HRSyncEvent,stream.HRSync, subject builders.refactor(search-sync-worker): DEV_MODE toggle and shared template helpers—indexTopology,customAnalyzerSettings, threadeddevModethrough messages/spotlight/user-room collections and their tests. The existing spotlight template now uses the shared analyzer and gainstoken_chars(verified accepted on ES 8.11).feat(search-sync-worker): spotlight-org collection for HR org sync— newspotlightOrgCollection,BuildAction(envelope parse → optional gzip → unmarshal → dedup → emit ES updates),main.gowiring (SPOTLIGHT_ORG_INDEX+DEV_MODEconfig,HR_SYNCbootstrap skip), integration test against testcontainers, anddocker-compose.ymlDEV_MODEenv.simplify: trim comments, dedup gzip helper, pointer-keyed dedup map—/simplifypolish: pointer-keyed dedup map removes per-duplicate struct copies, integration test routes through the existingmakeHRSyncEventGziphelper, docstrings trimmed to the non-obvious WHYs.Test plan
make test SERVICE=pkg/model— passesmake test SERVICE=pkg/stream— passesmake test SERVICE=pkg/subject— passesmake test SERVICE=search-sync-worker— passes (17 spotlight-org tests + all preexisting collection tests)make lint— cleango vet -tags=integration ./search-sync-worker/...— clean (integration build compiles)BuildActioncoverage 92.1% (≥ 90% target perCLAUDE.md)make test-integration SERVICE=search-sync-workeragainst the testcontainers harness — pending (TestSearchSyncSpotlightOrg_Integrationexercises real gzip publish → ES doc upsert → second event preserves untouched fields via doc-merge)Coordination with
hr-syncerWire-format contract Mat needs to match on the publisher side:
hr.sync.{siteID}.employees.upsertmodel.HRSyncEvent{Timestamp(ms), BatchID(uuidv7), Gzip(bool), Payload(json.RawMessage)}sectId+ (optional) other org fields match the consumer'sSpotlightOrgIndexprojection — extra fields are silently ignored.base64(gzip(JSON array))carried as a JSON string in the envelope.Spec / plan
docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.mddocs/superpowers/plans/2026-05-11-spotlight-org-sync.mdhttps://claude.ai/code/session_01QLFefxiCHDP24LDLQzLLG2
Generated by Claude Code
Summary by CodeRabbit
New Features
Documentation
Tests