Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,835 changes: 1,835 additions & 0 deletions docs/superpowers/plans/2026-05-11-spotlight-org-sync.md

Large diffs are not rendered by default.

455 changes: 455 additions & 0 deletions docs/superpowers/specs/2026-05-11-spotlight-org-sync-design.md

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions pkg/model/hrsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package model

import "encoding/json"

// HRSyncEvent is the envelope hr-syncer publishes on every hr.sync.*
// subject. Each consumer defines its own local projection struct
// for the payload elements.
//
// When Gzip=true the payload rides as a JSON string of base64(gzip(JSON
// array)) — json.RawMessage must itself be valid JSON, so raw binary
// bytes can't be embedded directly. Decoders unmarshal into []byte
// (which base64-decodes) and then gunzip.
type HRSyncEvent struct {
Timestamp int64 `json:"timestamp" bson:"timestamp"`
BatchID string `json:"batchId" bson:"batchId"`
Gzip bool `json:"gzip" bson:"gzip"`
Payload json.RawMessage `json:"payload" bson:"payload"`
}
11 changes: 11 additions & 0 deletions pkg/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,17 @@ func TestSyncCreateDMRequestJSON(t *testing.T) {
assert.Equal(t, src, dst)
}

func TestHRSyncEventJSON(t *testing.T) {
src := model.HRSyncEvent{
Timestamp: 1735689600000,
BatchID: "0192a4f7-8c2d-7c9a-abcd-e0123456789f",
Gzip: true,
Payload: json.RawMessage(`[{"sectId":"S001"}]`),
}
var dst model.HRSyncEvent
roundTrip(t, &src, &dst)
}

func TestSyncCreateDMReplyJSON(t *testing.T) {
now := time.Date(2026, 5, 7, 12, 0, 0, 0, time.UTC)
src := model.SyncCreateDMReply{
Expand Down
9 changes: 9 additions & 0 deletions pkg/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,12 @@ func Inbox(siteID string) Config {
},
}
}

// HRSync is the HR_SYNC_{siteID} stream, populated by hr-syncer's daily
// publishes on hr.sync.{siteID}.>. Schema is owned by hr-syncer.
func HRSync(siteID string) Config {
return Config{
Name: fmt.Sprintf("HR_SYNC_%s", siteID),
Subjects: []string{fmt.Sprintf("hr.sync.%s.>", siteID)},
}
}
1 change: 1 addition & 0 deletions pkg/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestStreamConfigs(t *testing.T) {
{"MessagesCanonical", stream.MessagesCanonical(siteID), "MESSAGES_CANONICAL_site-a", "chat.msg.canonical.site-a.>"},
{"Rooms", stream.Rooms(siteID), "ROOMS_site-a", "chat.room.canonical.site-a.>"},
{"Outbox", stream.Outbox(siteID), "OUTBOX_site-a", "outbox.site-a.>"},
{"HRSync", stream.HRSync(siteID), "HR_SYNC_site-a", "hr.sync.site-a.>"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/subject/subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func MsgCanonicalDeleted(siteID string) string {
return fmt.Sprintf("chat.msg.canonical.%s.deleted", siteID)
}

func HRSyncEmployeesUpsert(siteID string) string {
return fmt.Sprintf("hr.sync.%s.employees.upsert", siteID)
}

func HRSyncUsersUpsert(siteID string) string {
return fmt.Sprintf("hr.sync.%s.users.upsert", siteID)
}

func RoomEvent(roomID string) string {
return fmt.Sprintf("chat.room.%s.event", roomID)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/subject/subject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package subject_test
import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/hmchangw/chat/pkg/subject"
)

Expand Down Expand Up @@ -350,3 +352,13 @@ func TestRoomCreateDMSync(t *testing.T) {
t.Errorf("RoomCreateDMSync: got %q, want %q", got, want)
}
}

func TestHRSyncEmployeesUpsert(t *testing.T) {
got := subject.HRSyncEmployeesUpsert("site-a")
assert.Equal(t, "hr.sync.site-a.employees.upsert", got)
}

func TestHRSyncUsersUpsert(t *testing.T) {
got := subject.HRSyncUsersUpsert("site-a")
assert.Equal(t, "hr.sync.site-a.users.upsert", got)
}
1 change: 1 addition & 0 deletions search-sync-worker/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- SEARCH_BACKEND=elasticsearch
- MSG_INDEX_PREFIX=messages-site-local-v1
- BOOTSTRAP_STREAMS=true
- DEV_MODE=${DEV_MODE:-true}
volumes:
- ../../docker-local/backend.creds:/etc/nats/backend.creds:ro
networks:
Expand Down
16 changes: 8 additions & 8 deletions search-sync-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func makeStubMsg(t *testing.T, evt *model.MessageEvent) *stubMsg {
func TestHandler_Add(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)

evt := model.MessageEvent{
Event: model.EventCreated,
Expand All @@ -66,7 +66,7 @@ func TestHandler_Add(t *testing.T) {
func TestHandler_Add_MalformedJSON(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)

msg := &stubMsg{data: []byte("{invalid")}
h.Add(msg)
Expand All @@ -92,7 +92,7 @@ func TestHandler_Flush(t *testing.T) {
Bulk(gomock.Any(), gomock.Len(1)).
Return([]searchengine.BulkResult{{Status: 201}}, nil)

h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
msg := makeStubMsg(t, &baseEvt)
h.Add(msg)
h.Flush(context.Background())
Expand All @@ -109,7 +109,7 @@ func TestHandler_Flush(t *testing.T) {
Bulk(gomock.Any(), gomock.Len(1)).
Return([]searchengine.BulkResult{{Status: 409, Error: "version conflict"}}, nil)

h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
msg := makeStubMsg(t, &baseEvt)
h.Add(msg)
h.Flush(context.Background())
Expand All @@ -125,7 +125,7 @@ func TestHandler_Flush(t *testing.T) {
Bulk(gomock.Any(), gomock.Len(1)).
Return([]searchengine.BulkResult{{Status: 500, Error: "internal"}}, nil)

h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
msg := makeStubMsg(t, &baseEvt)
h.Add(msg)
h.Flush(context.Background())
Expand All @@ -141,7 +141,7 @@ func TestHandler_Flush(t *testing.T) {
Bulk(gomock.Any(), gomock.Len(2)).
Return(nil, fmt.Errorf("connection refused"))

h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
msg1 := makeStubMsg(t, &baseEvt)
evt2 := baseEvt
evt2.Message.ID = "m2"
Expand All @@ -159,7 +159,7 @@ func TestHandler_Flush(t *testing.T) {
t.Run("empty flush is no-op", func(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
h.Flush(context.Background())
assert.Equal(t, 0, h.MessageCount())
})
Expand All @@ -175,7 +175,7 @@ func TestHandler_Flush(t *testing.T) {
{Status: 500, Error: "shard failure"},
}, nil)

h := NewHandler(store, newMessageCollection("msgs-v1"), 500)
h := NewHandler(store, newMessageCollection("msgs-v1", false), 500)
msgs := make([]*stubMsg, 3)
for i := range msgs {
evt := baseEvt
Expand Down
20 changes: 10 additions & 10 deletions search-sync-worker/inbox_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func TestSpotlightSync_Integration(t *testing.T) {
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newSpotlightCollection(indexName)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName))))
coll := newSpotlightCollection(indexName, false)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName, false))))
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -249,8 +249,8 @@ func TestSpotlightSync_BulkInvite(t *testing.T) {
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newSpotlightCollection(indexName)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName))))
coll := newSpotlightCollection(indexName, false)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(spotlightTemplateBody(indexName, false))))
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -322,8 +322,8 @@ func TestUserRoomSync_Integration(t *testing.T) {
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newUserRoomCollection(indexName)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName))))
coll := newUserRoomCollection(indexName, false)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false))))
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -444,8 +444,8 @@ func TestUserRoomSync_BulkInvite(t *testing.T) {
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newUserRoomCollection(indexName)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName))))
coll := newUserRoomCollection(indexName, false)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false))))
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -559,8 +559,8 @@ func TestUserRoomSync_LWWGuard(t *testing.T) {
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newUserRoomCollection(indexName)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName))))
coll := newUserRoomCollection(indexName, false)
require.NoError(t, engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(userRoomTemplateBody(indexName, false))))
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down
103 changes: 99 additions & 4 deletions search-sync-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ func TestSearchSyncIntegration(t *testing.T) {
// Wait for cluster to be green before creating indices.
waitForClusterGreen(t, esURL, 120*time.Second)

coll := newMessageCollection(prefix)
err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix)))
coll := newMessageCollection(prefix, false)
err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix, false)))
require.NoError(t, err, "upsert template")

// Pre-create indices so shard allocation completes before bulk indexing.
Expand Down Expand Up @@ -501,8 +501,8 @@ func TestCustomAnalyzer(t *testing.T) {

waitForClusterGreen(t, esURL, 120*time.Second)

coll := newMessageCollection(prefix)
err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix)))
coll := newMessageCollection(prefix, false)
err = engine.UpsertTemplate(ctx, coll.TemplateName(), overrideIndexSettings(messageTemplateBody(prefix, false)))
require.NoError(t, err, "upsert template")

preCreateIndex(t, esURL, prefix+"-2026-03")
Expand Down Expand Up @@ -613,3 +613,98 @@ func TestCustomAnalyzer(t *testing.T) {
"cross-compound should not match")
})
}

// TestSearchSyncSpotlightOrg_Integration drives the spotlight-org collection
// end to end: publish a gzipped HRSyncEvent to a fresh HR_SYNC stream, let
// the handler process the resulting message, and verify the ES documents.
//
// Doc-merge is the key behavior under test: a second event carrying only
// SectID + SectName must NOT clear the other stored fields.
func TestSearchSyncSpotlightOrg_Integration(t *testing.T) {
esURL := setupElasticsearch(t)
js, _ := setupNATSJetStream(t)
ctx := context.Background()

const siteID = "site-org-int"
const indexName = "spotlightorg-site-org-int"

engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err, "create search engine")

waitForClusterGreen(t, esURL, 120*time.Second)

coll := newSpotlightOrgCollection(indexName, true)
require.NoError(t,
engine.UpsertTemplate(ctx, coll.TemplateName(), coll.TemplateBody()),
"upsert spotlight-org template",
)
preCreateIndex(t, esURL, indexName)
waitForClusterGreen(t, esURL, 120*time.Second)

hrCfg := stream.HRSync(siteID)
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: hrCfg.Name,
Subjects: hrCfg.Subjects,
})
require.NoError(t, err, "create HR_SYNC stream")

subj := subject.HRSyncEmployeesUpsert(siteID)

publish := func(employees []SpotlightOrgIndex, ts int64) {
t.Helper()
_, err := js.Publish(ctx, subj, makeHRSyncEventGzip(t, ts, employees))
require.NoError(t, err, "publish hr-sync event")
}

cons, err := js.CreateOrUpdateConsumer(ctx, hrCfg.Name, jetstream.ConsumerConfig{
Durable: "spotlight-org-sync-test",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: coll.FilterSubjects(siteID),
})
require.NoError(t, err, "create consumer")
handler := NewHandler(&engineAdapter{engine: engine}, coll, 100)

publish([]SpotlightOrgIndex{
{SectID: "S1", SectName: "Engineering", DeptID: "D1", DeptName: "Tech"},
{SectID: "S2", SectName: "Sales", DeptID: "D2", DeptName: "Biz"},
}, time.Now().UnixMilli())

batch, err := cons.Fetch(1, jetstream.FetchMaxWait(10*time.Second))
require.NoError(t, err)
for msg := range batch.Messages() {
handler.Add(msg)
}
handler.Flush(ctx)
refreshIndex(t, esURL, indexName)

t.Run("two docs landed", func(t *testing.T) {
assert.Equal(t, 2, countDocs(t, esURL, indexName))
s1 := getDoc(t, esURL, indexName, "S1")
require.NotNil(t, s1)
assert.Equal(t, "Engineering", s1["sectName"])
assert.Equal(t, "Tech", s1["deptName"])
s2 := getDoc(t, esURL, indexName, "S2")
require.NotNil(t, s2)
assert.Equal(t, "Sales", s2["sectName"])
})

publish([]SpotlightOrgIndex{
{SectID: "S1", SectName: "Engineering Renamed"},
}, time.Now().UnixMilli()+1)

batch, err = cons.Fetch(1, jetstream.FetchMaxWait(10*time.Second))
require.NoError(t, err)
for msg := range batch.Messages() {
handler.Add(msg)
}
handler.Flush(ctx)
refreshIndex(t, esURL, indexName)

t.Run("doc-merge preserves untouched fields", func(t *testing.T) {
s1 := getDoc(t, esURL, indexName, "S1")
require.NotNil(t, s1)
assert.Equal(t, "Engineering Renamed", s1["sectName"], "renamed field updated")
assert.Equal(t, "D1", s1["deptId"], "deptId preserved")
assert.Equal(t, "Tech", s1["deptName"], "deptName preserved")
})
}
Loading
Loading