From 682272aecaf52759bed71c48f7a61be95688e171 Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:30:21 +0800 Subject: [PATCH 1/6] chore(local-dev): unblock + speed up dev stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - NATS healthcheck uses /healthz?js-server-only=true so a fresh JetStream volume doesn't 503; bump start_period for slower disks. - Root .dockerignore so service builds don't tar the whole repo. - Split `make up` (no rebuild) from `make up-rebuild`. - stop_grace_period: 2s on every service compose (was 10s × 11 ≈ 110s on `make down`). - `make seed-users` + docker-local/seed-users.sh: idempotent fixtures (alice, bob) so dev-auth users have a `users` row. - `make backfill-room-keys` + docker-local/backfill-room-keys.sh: mint Valkey keys for rooms created before mint-on-create. --- .dockerignore | 32 ++++++++++ Makefile | 29 +++++++-- auth-service/deploy/docker-compose.yml | 1 + docker-local/backfill-room-keys.sh | 59 +++++++++++++++++++ docker-local/compose.deps.yaml | 7 ++- docker-local/seed-users.sh | 33 +++++++++++ history-service/deploy/docker-compose.yml | 1 + inbox-worker/deploy/docker-compose.yml | 1 + message-gatekeeper/deploy/docker-compose.yml | 1 + message-worker/deploy/docker-compose.yml | 1 + notification-worker/deploy/docker-compose.yml | 1 + room-service/deploy/docker-compose.yml | 1 + room-worker/deploy/docker-compose.yml | 1 + search-sync-worker/deploy/docker-compose.yml | 1 + 14 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 .dockerignore create mode 100755 docker-local/backfill-room-keys.sh create mode 100755 docker-local/seed-users.sh diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..3fc247862 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,32 @@ +# Trim service build contexts (each uses repo root via `context: ../..`). + +.git +.gitignore +.github +.claude +.claude/worktrees +.idea +.vscode +**/.DS_Store + +**/node_modules +chat-frontend/dist +chat-frontend/build +chat-frontend/coverage +chat-frontend/.vite +chat-frontend/.cache + +docker-local/backend.creds +docker-local/nats.conf +docker-local/.env +docker-local/cassandra/init +bin/ +coverage.out +coverage.html +*.log +*.test +*.tmp + +*.md +docs/ +tools/ diff --git a/Makefile b/Makefile index 571578e49..8b8995a54 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: lint fmt test test-integration generate build deps-up deps-down up down +.PHONY: lint fmt test test-integration generate build deps-up deps-down up up-rebuild down seed-users backfill-room-keys DEPS_COMPOSE := docker-local/compose.deps.yaml SERVICES_COMPOSE := docker-local/compose.services.yaml @@ -67,9 +67,8 @@ deps-up: deps-down: docker compose -f $(DEPS_COMPOSE) down -# Start microservices. With SERVICE=, starts just that service's compose; -# without, starts every service via compose.services.yaml. Foreground either way -# so container logs stream to the terminal; Ctrl-C stops. +# Start microservices in foreground. SERVICE= for one, otherwise all. +# `up` reuses images for fast boot; use `up-rebuild` after editing source. up: @docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \ echo "Deps are not running. Run 'make deps-up' first."; exit 1; \ @@ -77,12 +76,34 @@ up: @test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \ echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \ } +ifdef SERVICE + docker compose -f $(SERVICE)/deploy/docker-compose.yml up +else + docker compose -f $(SERVICES_COMPOSE) up +endif + +# Same as `up` but rebuilds images first. +up-rebuild: + @docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \ + echo "Deps are not running. Run 'make deps-up' first."; exit 1; \ + } + @test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \ + echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \ + } ifdef SERVICE docker compose -f $(SERVICE)/deploy/docker-compose.yml up --build else docker compose -f $(SERVICES_COMPOSE) up --build endif +# Seed dev-mode users (alice, bob) into Mongo. Idempotent. +seed-users: + ./docker-local/seed-users.sh + +# Backfill Valkey room keys for rooms created before mint-on-create. Idempotent. +backfill-room-keys: + ./docker-local/backfill-room-keys.sh + # Stop microservices. SERVICE= stops one; otherwise stops every service. down: ifdef SERVICE diff --git a/auth-service/deploy/docker-compose.yml b/auth-service/deploy/docker-compose.yml index f3d072b89..2a3ffcc1c 100644 --- a/auth-service/deploy/docker-compose.yml +++ b/auth-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: auth-service/deploy/Dockerfile + stop_grace_period: 2s ports: - "8080:8080" env_file: diff --git a/docker-local/backfill-room-keys.sh b/docker-local/backfill-room-keys.sh new file mode 100755 index 000000000..988a34d90 --- /dev/null +++ b/docker-local/backfill-room-keys.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# +# Mint a P-256 room key in Valkey for any room without one. For rooms +# created before room-service started minting on create. Idempotent. + +set -euo pipefail + +MONGO_CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}" +VALKEY_CONTAINER="${VALKEY_CONTAINER:-chat-local-valkey}" +DB="${MONGO_DB:-chat}" + +if ! docker container inspect -f '{{.State.Running}}' "$MONGO_CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $MONGO_CONTAINER not running" >&2; exit 1 +fi +if ! docker container inspect -f '{{.State.Running}}' "$VALKEY_CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $VALKEY_CONTAINER not running" >&2; exit 1 +fi + +room_ids=$(docker exec "$MONGO_CONTAINER" mongosh "$DB" --quiet --eval 'db.rooms.find({}, {_id:1}).forEach(r => print(r._id))') + +if [ -z "$room_ids" ]; then + echo "(no rooms to check)" + exit 0 +fi + +# Single shared P-256 key for all backfilled rooms; dev only. +tmpdir=$(mktemp -d) +trap 'rm -rf "$tmpdir"' EXIT + +openssl ecparam -name prime256v1 -genkey -noout -out "$tmpdir/priv.pem" 2>/dev/null +priv_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \ + | awk '/priv:/{flag=1; next} /pub:/{flag=0} flag' \ + | tr -d ' :\n' \ + | xxd -r -p \ + | base64) +pub_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \ + | awk '/pub:/{flag=1; next} /ASN1 OID:/{flag=0} flag' \ + | tr -d ' :\n' \ + | xxd -r -p \ + | base64) + +if [ -z "$priv_b64" ] || [ -z "$pub_b64" ]; then + echo "ERROR: failed to extract P-256 key bytes via openssl" >&2; exit 1 +fi + +added=0 +skipped=0 +for rid in $room_ids; do + key="room:${rid}:key" + exists=$(docker exec "$VALKEY_CONTAINER" valkey-cli exists "$key") + if [ "$exists" = "1" ]; then + skipped=$((skipped + 1)) + continue + fi + docker exec "$VALKEY_CONTAINER" valkey-cli hset "$key" pub "$pub_b64" priv "$priv_b64" ver 0 > /dev/null + added=$((added + 1)) +done + +echo "rooms with new key: $added | already had key: $skipped" diff --git a/docker-local/compose.deps.yaml b/docker-local/compose.deps.yaml index 7efea9878..09f6d96da 100644 --- a/docker-local/compose.deps.yaml +++ b/docker-local/compose.deps.yaml @@ -18,12 +18,13 @@ services: - ./nats.conf:/etc/nats/nats.conf:ro - nats-data:/data/jetstream command: ["-c", "/etc/nats/nats.conf"] + # js-server-only skips the deep stream check; default 503s on fresh volumes. healthcheck: - test: ["CMD-SHELL", "wget -qO- http://localhost:8222/healthz || exit 1"] + test: ["CMD-SHELL", "wget -qO- 'http://localhost:8222/healthz?js-server-only=true' || exit 1"] interval: 5s timeout: 3s - retries: 10 - start_period: 5s + retries: 12 + start_period: 15s networks: - chat-local diff --git a/docker-local/seed-users.sh b/docker-local/seed-users.sh new file mode 100755 index 000000000..e3e6ef357 --- /dev/null +++ b/docker-local/seed-users.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# +# Seed the `users` collection with dev fixtures (alice, bob). dev-auth +# accepts any account at login but doesn't insert into Mongo, so workers +# crash with "user not found". _id == account matches frontend convention. +# Idempotent. + +set -euo pipefail + +CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}" +DB="${MONGO_DB:-chat}" +SITE_ID="${SITE_ID:-site-local}" + +if ! docker container inspect -f '{{.State.Running}}' "$CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $CONTAINER is not running. Run 'make deps-up' first." >&2 + exit 1 +fi + +echo "Seeding users into $CONTAINER/$DB (siteId=$SITE_ID)..." + +docker exec "$CONTAINER" mongosh --quiet "$DB" --eval " + const users = [ + { _id: 'alice', account: 'alice', siteId: '$SITE_ID', engName: 'Alice', chineseName: 'Alice', employeeId: 'E0001', sectId: 'dev', sectName: 'Dev' }, + { _id: 'bob', account: 'bob', siteId: '$SITE_ID', engName: 'Bob', chineseName: 'Bob', employeeId: 'E0002', sectId: 'dev', sectName: 'Dev' } + ]; + const ops = users.map(u => ({ + updateOne: { filter: { _id: u._id }, update: { \$set: u }, upsert: true } + })); + const res = db.users.bulkWrite(ops); + print('upserted: ' + res.upsertedCount + ', modified: ' + res.modifiedCount + ', matched: ' + res.matchedCount); +" + +echo "Done. Login as 'alice' or 'bob' (siteId=$SITE_ID) in dev mode." diff --git a/history-service/deploy/docker-compose.yml b/history-service/deploy/docker-compose.yml index 34cad84a8..215a7a54a 100644 --- a/history-service/deploy/docker-compose.yml +++ b/history-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: history-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/inbox-worker/deploy/docker-compose.yml b/inbox-worker/deploy/docker-compose.yml index 8fc9a9665..6e2c5dc28 100644 --- a/inbox-worker/deploy/docker-compose.yml +++ b/inbox-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: inbox-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/message-gatekeeper/deploy/docker-compose.yml b/message-gatekeeper/deploy/docker-compose.yml index 56394d57f..3ec9db121 100644 --- a/message-gatekeeper/deploy/docker-compose.yml +++ b/message-gatekeeper/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: message-gatekeeper/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/message-worker/deploy/docker-compose.yml b/message-worker/deploy/docker-compose.yml index aaed82edc..e2ad97360 100644 --- a/message-worker/deploy/docker-compose.yml +++ b/message-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: message-worker/deploy/Dockerfile + stop_grace_period: 2s pull_policy: build environment: - NATS_URL=nats://nats:4222 diff --git a/notification-worker/deploy/docker-compose.yml b/notification-worker/deploy/docker-compose.yml index 492a9e808..70e990abe 100644 --- a/notification-worker/deploy/docker-compose.yml +++ b/notification-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: notification-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/room-service/deploy/docker-compose.yml b/room-service/deploy/docker-compose.yml index 89eb308a5..ee7d4186d 100644 --- a/room-service/deploy/docker-compose.yml +++ b/room-service/deploy/docker-compose.yml @@ -17,6 +17,7 @@ services: build: context: ../.. dockerfile: room-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/room-worker/deploy/docker-compose.yml b/room-worker/deploy/docker-compose.yml index 0bf367dfc..457542ffc 100644 --- a/room-worker/deploy/docker-compose.yml +++ b/room-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: room-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/search-sync-worker/deploy/docker-compose.yml b/search-sync-worker/deploy/docker-compose.yml index f2948e0bc..d3649b129 100644 --- a/search-sync-worker/deploy/docker-compose.yml +++ b/search-sync-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: search-sync-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds From 4079747cbcb6660d1bb81a161c9cb8e7d05d5bac Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:30:31 +0800 Subject: [PATCH 2/6] fix(otelutil): skip OTLP tracer init when no endpoint env is set InitTracer now no-ops (returns the SDK noop provider) unless OTEL_EXPORTER_OTLP_ENDPOINT or OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is set. Prevents local dev from flooding logs with "traces export: connection refused" against 127.0.0.1:4317 when no collector is running. Prod/staging configure the env via deployment. --- pkg/otelutil/otel.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/otelutil/otel.go b/pkg/otelutil/otel.go index 147980d1d..272f2b942 100644 --- a/pkg/otelutil/otel.go +++ b/pkg/otelutil/otel.go @@ -3,6 +3,7 @@ package otelutil import ( "context" "fmt" + "os" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -14,9 +15,14 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) -// InitTracer creates and registers a TracerProvider with OTLP gRPC exporter. -// Returns a shutdown function. +// InitTracer registers a TracerProvider with OTLP gRPC exporter. Returns a +// shutdown function. Skipped (noop provider) when no OTLP endpoint env is set. func InitTracer(ctx context.Context, serviceName string) (func(context.Context) error, error) { + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" && os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") == "" { + otel.SetTextMapPropagator(propagation.TraceContext{}) + return func(context.Context) error { return nil }, nil + } + exp, err := otlptracegrpc.New(ctx) if err != nil { return nil, fmt.Errorf("otlp exporter: %w", err) From 92e214a828549de383de708e736b33bde7123d89 Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:30:48 +0800 Subject: [PATCH 3/6] fix(room-service): mint room key, enroll owner+DM recipient, emit member_added MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four changes to handleCreateRoom — none of which existed before — so that newly-created rooms are immediately functional end-to-end: - Mint a P-256 keypair in Valkey via h.keyStore.Set after CreateRoom. Without this, broadcast-worker fails the encrypt step ("no current key") and JetStream redelivers forever. Extends the narrow RoomKeyStore interface with Set; nil-tolerated for tests. - DMs now persist a second Subscription for req.Members[0] and bump Room.UserCount to 2. Without this, the recipient logs in and every read path hits "not subscribed to room". Dev convention is account == user.ID, so req.Members[0] doubles for both fields; prod will need a real account → user.ID lookup. - Best-effort core-NATS publish of SubscriptionUpdateEvent{Action: "added"} via a new WithEventPublisher hook so the creator's frontend sees the room appear without a refresh. Mirrors how room-worker emits the event for member-add / role-update. - Best-effort INBOX same-site OutboxEvent{member_added} for the new subscription(s) so search-sync-worker's spotlight + user-room collections index the auto-enrolled accounts. Wire format matches PR #145's spec; HSS=nil keeps the bulk unrestricted. --- room-service/handler.go | 122 ++++++++++++++++++++++++++++++-- room-service/handler_test.go | 11 ++- room-service/main.go | 5 ++ room-service/mock_store_test.go | 15 ++++ room-service/store.go | 8 ++- 5 files changed, 154 insertions(+), 7 deletions(-) diff --git a/room-service/handler.go b/room-service/handler.go index 7741371bb..2f83a9a8e 100644 --- a/room-service/handler.go +++ b/room-service/handler.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/ecdh" + "crypto/rand" "encoding/base64" "encoding/json" "errors" @@ -31,12 +33,19 @@ type Handler struct { maxRoomSize int maxBatchSize int publishToStream func(ctx context.Context, subj string, data []byte) error + // publishEvent is core-NATS for transient client notifications; nil-tolerant for tests. + publishEvent func(ctx context.Context, subj string, data []byte) error } func NewHandler(store RoomStore, keyStore RoomKeyStore, memberListClient MemberListClient, siteID string, maxRoomSize, maxBatchSize int, publishToStream func(context.Context, string, []byte) error) *Handler { return &Handler{store: store, keyStore: keyStore, memberListClient: memberListClient, siteID: siteID, maxRoomSize: maxRoomSize, maxBatchSize: maxBatchSize, publishToStream: publishToStream} } +func (h *Handler) WithEventPublisher(fn func(ctx context.Context, subj string, data []byte) error) *Handler { + h.publishEvent = fn + return h +} + // wrappedCtx returns m.Context() augmented with X-Request-ID from the inbound msg header; entry ctx for every nats* handler. func wrappedCtx(m otelnats.Msg) context.Context { return natsutil.ContextWithRequestIDFromHeaders(m.Context(), m.Msg.Header) @@ -125,21 +134,25 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er if len(req.Members) != 1 { return nil, fmt.Errorf("DM requires exactly one other member, got %d", len(req.Members)) } + if req.Members[0] == req.CreatedBy { + return nil, fmt.Errorf("DM requires a different recipient than the creator") + } roomID = idgen.BuildDMRoomID(req.CreatedBy, req.Members[0]) - // TODO(idgen-rework follow-up): persist a second Subscription for req.Members[0] so DMs are two-sided. - // DMs have no add-member/role-update flow, so the recipient is currently un-enrolled. - // Needs the recipient's Account (store lookup or extend CreateRoomRequest with MembersAccount). Also bump Room.UserCount to 2. default: return nil, fmt.Errorf("unsupported room type %q", req.Type) } + userCount := 1 + if req.Type == model.RoomTypeDM { + userCount = 2 + } room := model.Room{ ID: roomID, Name: req.Name, Type: req.Type, CreatedBy: req.CreatedBy, SiteID: req.SiteID, - UserCount: 1, + UserCount: userCount, CreatedAt: now, UpdatedAt: now, } @@ -148,6 +161,19 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er return nil, fmt.Errorf("create room: %w", err) } + // Mint the room key — broadcast-worker can't encrypt without it. + if h.keyStore != nil { + keyPriv, err := ecdh.P256().GenerateKey(rand.Reader) + if err != nil { + slog.Warn("generate room key failed", "error", err, "roomID", room.ID) + } else if _, err := h.keyStore.Set(ctx, room.ID, roomkeystore.RoomKeyPair{ + PublicKey: keyPriv.PublicKey().Bytes(), + PrivateKey: keyPriv.Bytes(), + }); err != nil { + slog.Warn("set room key failed", "error", err, "roomID", room.ID) + } + } + // Auto-create owner subscription sub := model.Subscription{ ID: idgen.GenerateUUIDv7(), @@ -163,6 +189,94 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er slog.Warn("create owner subscription failed", "error", err) } + // DM: enroll the recipient too, otherwise their reads hit "not subscribed". + // Dev convention: account == user.ID. Prod will need a real account → ID lookup. + var recipSub model.Subscription + if req.Type == model.RoomTypeDM { + recipientAccount := req.Members[0] + recipSub = model.Subscription{ + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: recipientAccount, Account: recipientAccount}, + RoomID: room.ID, + RoomType: req.Type, + SiteID: req.SiteID, + Roles: []model.Role{model.RoleMember}, + HistorySharedSince: &now, + JoinedAt: now, + } + if err := h.store.CreateSubscription(ctx, &recipSub); err != nil { + slog.Warn("create recipient subscription failed", "error", err, "account", recipientAccount) + } + } + + // Best-effort: notify the creator's frontend so the new room appears + // without a refresh; mirrors room-worker's subscription.update on add/role. + subEvt := model.SubscriptionUpdateEvent{ + UserID: sub.User.ID, + Subscription: sub, + Action: "added", + Timestamp: now.UnixMilli(), + } + if h.publishEvent != nil { + if subEvtData, err := json.Marshal(subEvt); err != nil { + slog.Warn("marshal subscription update event failed", "error", err, "roomID", room.ID) + } else if err := h.publishEvent(ctx, subject.SubscriptionUpdate(req.CreatedByAccount), subEvtData); err != nil { + slog.Warn("publish subscription update failed", "error", err, "roomID", room.ID) + } + + // Mirror the notification to the DM recipient so their UI updates too. + if req.Type == model.RoomTypeDM { + recipEvt := model.SubscriptionUpdateEvent{ + UserID: recipSub.User.ID, + Subscription: recipSub, + Action: "added", + Timestamp: now.UnixMilli(), + } + if recipEvtData, err := json.Marshal(recipEvt); err != nil { + slog.Warn("marshal recipient subscription update event failed", "error", err, "roomID", room.ID) + } else if err := h.publishEvent(ctx, subject.SubscriptionUpdate(req.Members[0]), recipEvtData); err != nil { + slog.Warn("publish recipient subscription update failed", "error", err, "roomID", room.ID) + } + } + } + + // Same-site member_added on INBOX so search-sync-worker indexes the + // auto-enrolled members; owner-add bypasses room-worker. Wire shape + // matches PR #145's spec. HSS=nil → unrestricted (spotlight skips + // restricted bulks for MVP). Best-effort. + if h.publishToStream != nil { + accounts := []string{req.CreatedByAccount} + if req.Type == model.RoomTypeDM { + accounts = append(accounts, req.Members[0]) + } + inboxEvt := model.InboxMemberEvent{ + RoomID: room.ID, + RoomName: room.Name, + RoomType: room.Type, + SiteID: h.siteID, + Accounts: accounts, + JoinedAt: now.UnixMilli(), + Timestamp: now.UnixMilli(), + } + inboxData, err := json.Marshal(inboxEvt) + if err != nil { + slog.Warn("marshal inbox member event failed", "error", err, "roomID", room.ID) + } else { + outboxEvt := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: h.siteID, + DestSiteID: h.siteID, + Payload: inboxData, + Timestamp: now.UnixMilli(), + } + if outboxData, err := json.Marshal(outboxEvt); err != nil { + slog.Warn("marshal outbox event failed", "error", err, "roomID", room.ID) + } else if err := h.publishToStream(ctx, subject.InboxMemberAdded(h.siteID), outboxData); err != nil { + slog.Warn("publish owner member_added failed", "error", err, "roomID", room.ID) + } + } + } + return json.Marshal(room) } diff --git a/room-service/handler_test.go b/room-service/handler_test.go index cef4ef855..426185303 100644 --- a/room-service/handler_test.go +++ b/room-service/handler_test.go @@ -1898,8 +1898,17 @@ func TestHandler_handleCreateRoom_ChannelAndDMIDFormats(t *testing.T) { var capturedSub *model.Subscription store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()). DoAndReturn(func(_ context.Context, r *model.Room) error { capturedRoom = r; return nil }) + // Channels create one subscription (creator); DMs create two + // (creator + recipient). AnyTimes covers both, and we only + // capture the first call so existing assertions on subID + // stay valid for the creator's subscription. store.EXPECT().CreateSubscription(gomock.Any(), gomock.Any()). - DoAndReturn(func(_ context.Context, s *model.Subscription) error { capturedSub = s; return nil }) + DoAndReturn(func(_ context.Context, s *model.Subscription) error { + if capturedSub == nil { + capturedSub = s + } + return nil + }).AnyTimes() h := NewHandler(store, nil, nil, "site1", 100, 50, func(ctx context.Context, subj string, data []byte) error { return nil }) diff --git a/room-service/main.go b/room-service/main.go index 4047e3545..4ca7fc409 100644 --- a/room-service/main.go +++ b/room-service/main.go @@ -104,6 +104,11 @@ func main() { return fmt.Errorf("publish to %q: %w", subj, err) } return nil + }).WithEventPublisher(func(ctx context.Context, subj string, data []byte) error { + if err := nc.PublishMsg(ctx, natsutil.NewMsg(ctx, subj, data)); err != nil { + return fmt.Errorf("publish event to %q: %w", subj, err) + } + return nil }) if err := handler.RegisterCRUD(nc); err != nil { diff --git a/room-service/mock_store_test.go b/room-service/mock_store_test.go index d2ed18b7d..984e82276 100644 --- a/room-service/mock_store_test.go +++ b/room-service/mock_store_test.go @@ -258,3 +258,18 @@ func (mr *MockRoomKeyStoreMockRecorder) GetMany(ctx, roomIDs any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMany", reflect.TypeOf((*MockRoomKeyStore)(nil).GetMany), ctx, roomIDs) } + +// Set mocks base method. +func (m *MockRoomKeyStore) Set(ctx context.Context, roomID string, pair roomkeystore.RoomKeyPair) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Set", ctx, roomID, pair) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Set indicates an expected call of Set. +func (mr *MockRoomKeyStoreMockRecorder) Set(ctx, roomID, pair any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockRoomKeyStore)(nil).Set), ctx, roomID, pair) +} diff --git a/room-service/store.go b/room-service/store.go index cdb7e19ff..44981f743 100644 --- a/room-service/store.go +++ b/room-service/store.go @@ -52,8 +52,12 @@ type RoomStore interface { ListOrgMembers(ctx context.Context, orgID string) ([]model.OrgMember, error) } -// RoomKeyStore is the consumer-side interface for room encryption key lookups. -// Only the methods room-service needs are declared here. +// RoomKeyStore is the consumer-side interface for room encryption key lookups +// and the create-on-room-create write. Only the methods room-service needs +// are declared here. broadcast-worker requires a key for every room before +// it can encrypt — Set is called from handleCreateRoom so newly created +// rooms come with a key in place. type RoomKeyStore interface { GetMany(ctx context.Context, roomIDs []string) (map[string]*roomkeystore.VersionedKeyPair, error) + Set(ctx context.Context, roomID string, pair roomkeystore.RoomKeyPair) (int, error) } From 0fc44ef7fd1c43d27fd9e219ff08ffaa2e4a45cf Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:31:43 +0800 Subject: [PATCH 4/6] fix(broadcast-worker): no-key ack-skip + DEV_MODE plaintext MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related changes so channel events stop wedging the consumer and render in local dev: - On keyStore.Get returning nil for a room, log a warning and return nil so the caller acks. Old keyless rooms (created before room-service mint-on-create) previously errored, the consumer loop called Nak, JetStream redelivered, and the worker spammed logs forever. Cassandra still has the message via message-worker. - New DEV_MODE config (env DEV_MODE, default false) keeps evt.Message populated alongside the encrypted payload on channel events so a frontend without client-side decryption can still render. MUST stay false in prod — bundles plaintext alongside the E2E payload. DEV_MODE=true wired in the deploy compose for local; startup slog.Warn on boot when on so it can't slip into prod silently. --- broadcast-worker/deploy/docker-compose.yml | 3 +++ broadcast-worker/handler.go | 12 ++++++++++-- broadcast-worker/handler_test.go | 6 ++++-- broadcast-worker/main.go | 8 +++++++- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/broadcast-worker/deploy/docker-compose.yml b/broadcast-worker/deploy/docker-compose.yml index f4918c492..4aabd1aaf 100644 --- a/broadcast-worker/deploy/docker-compose.yml +++ b/broadcast-worker/deploy/docker-compose.yml @@ -12,6 +12,7 @@ services: build: context: ../.. dockerfile: broadcast-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds @@ -25,6 +26,8 @@ services: - VALKEY_ADDR=valkey:6379 - VALKEY_KEY_GRACE_PERIOD=24h - BOOTSTRAP_STREAMS=true + # Local dev only — bundles plaintext for no-crypto frontends. Never enable in prod. + - DEV_MODE=true volumes: - ../../docker-local/backend.creds:/etc/nats/backend.creds:ro networks: diff --git a/broadcast-worker/handler.go b/broadcast-worker/handler.go index fead19b21..e8781161b 100644 --- a/broadcast-worker/handler.go +++ b/broadcast-worker/handler.go @@ -37,6 +37,9 @@ type Handler struct { userStore userstore.UserStore pub Publisher keyStore RoomKeyProvider + // devMode bundles plaintext alongside the encrypted payload for local + // frontends without crypto. MUST stay false in prod. + devMode bool } func NewHandler(store Store, userStore userstore.UserStore, pub Publisher, keyStore RoomKeyProvider) *Handler { @@ -108,7 +111,10 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli return fmt.Errorf("get room key for room %s: %w", room.ID, err) } if key == nil { - return fmt.Errorf("get room key for room %s: %w", room.ID, errNoCurrentKey) + // Permanent: ack-skip so we don't nak-loop forever. + slog.Warn("room missing encryption key — dropping live broadcast", + "roomID", room.ID, "messageID", clientMsg.ID, "err", errNoCurrentKey) + return nil } encrypted, err := roomcrypto.Encode(string(msgJSON), key.KeyPair.PublicKey, key.Version) @@ -122,7 +128,9 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli } evt.EncryptedMessage = json.RawMessage(encJSON) - evt.Message = nil + if !h.devMode { + evt.Message = nil + } payload, err := json.Marshal(evt) if err != nil { diff --git a/broadcast-worker/handler_test.go b/broadcast-worker/handler_test.go index 385019926..edec32e3a 100644 --- a/broadcast-worker/handler_test.go +++ b/broadcast-worker/handler_test.go @@ -485,8 +485,10 @@ func TestHandler_HandleMessage_ChannelRoom_Encryption(t *testing.T) { h := NewHandler(store, us, pub, keyStore) err := h.HandleMessage(context.Background(), makeMessageEvent("room-1", "hello", msgTime)) - require.Error(t, err) - assert.ErrorIs(t, err, errNoCurrentKey) + // A keyless room is treated as a permanent broadcast failure: the + // handler logs and returns nil so the caller acks (avoiding the + // JetStream redelivery loop). The fan-out is dropped — no publish. + require.NoError(t, err) assert.Empty(t, pub.records) }) diff --git a/broadcast-worker/main.go b/broadcast-worker/main.go index af628ba06..e76805b88 100644 --- a/broadcast-worker/main.go +++ b/broadcast-worker/main.go @@ -37,7 +37,9 @@ type config struct { ValkeyAddr string `env:"VALKEY_ADDR,required"` ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + // DevMode bundles plaintext for local frontends without crypto. MUST stay false in prod. + DevMode bool `env:"DEV_MODE" envDefault:"false"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -112,6 +114,10 @@ func main() { publisher := &natsPublisher{nc: nc} handler := NewHandler(store, us, publisher, keyStore) + handler.devMode = cfg.DevMode + if cfg.DevMode { + slog.Warn("DEV_MODE enabled — plaintext message bundled in channel events; do NOT enable in production") + } iter, err := cons.Messages(jetstream.PullMaxMessages(2 * cfg.MaxWorkers)) if err != nil { From c3ae6871bcaf47d51d51d6eab86c0297acf6765c Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:31:58 +0800 Subject: [PATCH 5/6] fix(search-service): env-driven user-room + spotlight index names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both index names were hardcoded constants ("user-room", "spotlight") that don't match what search-sync-worker writes (user-room-{siteID}, spotlight-{siteID}-v1-chat). The httpAdapter's ignore_unavailable=true masked the mismatch — every query silently returned zero hits. End-user symptom: search returns nothing for any account, any term. Plumb USER_ROOM_INDEX (already partially wired) + new SPOTLIGHT_INDEX through SearchConfig → handlerConfig → searchRooms. Pin both env vars in the deploy compose; prod uses ops/IaC-owned aliases. --- search-service/deploy/docker-compose.yml | 5 +++++ search-service/handler.go | 3 ++- search-service/handler_test.go | 1 + search-service/main.go | 4 +++- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/search-service/deploy/docker-compose.yml b/search-service/deploy/docker-compose.yml index b4c191c99..42670f60a 100644 --- a/search-service/deploy/docker-compose.yml +++ b/search-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: search-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds @@ -19,6 +20,10 @@ services: - SEARCH_RECENT_WINDOW=8760h - SEARCH_REQUEST_TIMEOUT=10s - SEARCH_METRICS_ADDR=:9090 + # Pin to the concrete site-suffixed indices written by + # search-sync-worker. Prod uses aliases owned by ops/IaC. + - SEARCH_USER_ROOM_INDEX=user-room-site-local + - SEARCH_SPOTLIGHT_INDEX=spotlight-site-local-v1-chat ports: # Expose /metrics so Prometheus / curl can scrape from the host # during local dev. The listener is bound on 0.0.0.0:9090 inside diff --git a/search-service/handler.go b/search-service/handler.go index 9e2cfafa6..06753a9e4 100644 --- a/search-service/handler.go +++ b/search-service/handler.go @@ -20,6 +20,7 @@ type handlerConfig struct { RecentWindow time.Duration RequestTimeout time.Duration UserRoomIndex string + SpotlightIndex string } type handler struct { @@ -136,7 +137,7 @@ func (h *handler) searchRooms(c *natsrouter.Context, req model.SearchRoomsReques } observeESDone := observeES() - raw, err := h.store.Search(ctx, []string{SpotlightIndex}, body) + raw, err := h.store.Search(ctx, []string{h.cfg.SpotlightIndex}, body) observeESDone() if err != nil { slog.Error("room search backend failed", "account", account, "error", err) diff --git a/search-service/handler_test.go b/search-service/handler_test.go index 3a13f2aeb..d51f413e8 100644 --- a/search-service/handler_test.go +++ b/search-service/handler_test.go @@ -84,6 +84,7 @@ func newTestHandler(store SearchStore, cache RestrictedRoomCache) *handler { MaxDocCounts: 100, RestrictedRoomsCacheTTL: 5 * time.Minute, RecentWindow: 365 * 24 * time.Hour, + SpotlightIndex: SpotlightIndex, }) } diff --git a/search-service/main.go b/search-service/main.go index 2e383b40d..53caea62b 100644 --- a/search-service/main.go +++ b/search-service/main.go @@ -45,7 +45,8 @@ type SearchConfig struct { RestrictedRoomsCacheTTL time.Duration `env:"RESTRICTED_ROOMS_CACHE_TTL" envDefault:"5m"` RecentWindow time.Duration `env:"RECENT_WINDOW" envDefault:"8760h"` RequestTimeout time.Duration `env:"REQUEST_TIMEOUT" envDefault:"10s"` - UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` + UserRoomIndex string `env:"USER_ROOM_INDEX,required"` + SpotlightIndex string `env:"SPOTLIGHT_INDEX,required"` MetricsAddr string `env:"METRICS_ADDR" envDefault:":9090"` } @@ -107,6 +108,7 @@ func main() { RecentWindow: cfg.Search.RecentWindow, RequestTimeout: cfg.Search.RequestTimeout, UserRoomIndex: cfg.Search.UserRoomIndex, + SpotlightIndex: cfg.Search.SpotlightIndex, }) router := natsrouter.New(nc, "search-service") From cf2c31a3667f55993a734c604314404c7f73cfd2 Mon Sep 17 00:00:00 2001 From: joey0538 Date: Sun, 3 May 2026 16:32:37 +0800 Subject: [PATCH 6/6] fix(room-worker): publish member_added on add + sysMsg sender on remove MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two member-event fixes: - processAddMembers now publishes a same-site OutboxEvent{member_added} on chat.inbox.{siteID}.member_added for the local subset of accounts (cross-site keep going through OUTBOX unchanged). Implements the add-members slice of PR #145's spec; same wire format as room-service's room-create owner publish so search-sync-worker's parseMemberEvent accepts both. - processRemoveIndividual + processRemoveOrg system messages now populate UserID/UserAccount from req.Requester. Prior code left these blank, so message-worker logged "user not found for system message" on every member-remove and the chat history rendered the entry as "Unknown". Dev convention: account == _id. Prod needs a real account → user.ID lookup upstream. Remove-individual / remove-org INBOX publishes from #145's spec are still TODO; only the add-member slice is closed here. --- broadcast-worker/main.go | 26 ++++++++--------- room-worker/handler.go | 62 +++++++++++++++++++++++++++++++++------- 2 files changed, 65 insertions(+), 23 deletions(-) diff --git a/broadcast-worker/main.go b/broadcast-worker/main.go index e76805b88..5b438e37c 100644 --- a/broadcast-worker/main.go +++ b/broadcast-worker/main.go @@ -24,19 +24,19 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"default"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` - UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` - ValkeyAddr string `env:"VALKEY_ADDR,required"` - ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` - ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"default"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` + UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` + ValkeyAddr string `env:"VALKEY_ADDR,required"` + ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` + ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` // DevMode bundles plaintext for local frontends without crypto. MUST stay false in prod. DevMode bool `env:"DEV_MODE" envDefault:"false"` Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` diff --git a/room-worker/handler.go b/room-worker/handler.go index 25efc8d9c..d8f2c7b28 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -287,12 +287,15 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } seed := messageDedupSeed(ctx, "processRemoveIndividual", req.RoomID, fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp)) + // UserID == UserAccount under dev convention; prod needs real account → ID lookup. sysMsg := model.Message{ - ID: idgen.MessageIDFromRequestID(seed, "rmindiv"), - RoomID: req.RoomID, - Type: evtType, - SysMsgData: sysMsgData, - CreatedAt: now, + ID: idgen.MessageIDFromRequestID(seed, "rmindiv"), + RoomID: req.RoomID, + UserID: req.Requester, + UserAccount: req.Requester, + Type: evtType, + SysMsgData: sysMsgData, + CreatedAt: now, } msgEvt := model.MessageEvent{ Message: sysMsg, @@ -412,11 +415,13 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR seed := messageDedupSeed(ctx, "processRemoveOrg", req.RoomID, fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp)) sysMsg := model.Message{ - ID: idgen.MessageIDFromRequestID(seed, "rmorg"), - RoomID: req.RoomID, - Type: "member_removed", - SysMsgData: sysMsgPayload, - CreatedAt: now, + ID: idgen.MessageIDFromRequestID(seed, "rmorg"), + RoomID: req.RoomID, + UserID: req.Requester, + UserAccount: req.Requester, + Type: "member_removed", + SysMsgData: sysMsgPayload, + CreatedAt: now, } msgEvt := model.MessageEvent{ Message: sysMsg, @@ -687,6 +692,43 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error return fmt.Errorf("publish add-members system message: %w", err) } + // 9b. Same-site INBOX member_added so search-sync-worker indexes local + // members. Cross-site keep going through OUTBOX below. PR #145 spec. + sameSiteAccounts := make([]string, 0, len(actualAccounts)) + for _, account := range actualAccounts { + user, ok := userMap[account] + if !ok || user.SiteID != room.SiteID { + continue + } + sameSiteAccounts = append(sameSiteAccounts, account) + } + if len(sameSiteAccounts) > 0 { + inboxEvt := model.InboxMemberEvent{ + RoomID: room.ID, + RoomName: room.Name, + RoomType: room.Type, + SiteID: room.SiteID, + Accounts: sameSiteAccounts, + HistorySharedSince: historySharedSince, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), + } + inboxData, _ := json.Marshal(inboxEvt) + outboxWrap := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: inboxData, + Timestamp: now.UnixMilli(), + } + outboxData, _ := json.Marshal(outboxWrap) + payloadSeed := fmt.Sprintf("%s:%s:%d:local-added", req.RoomID, req.RequesterAccount, req.Timestamp) + dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, dedupID); err != nil { + return fmt.Errorf("publish local inbox member_added: %w", err) + } + } + // 10. Outbox for cross-site members — batched by destination site remoteSiteMembers := make(map[string][]string) for _, sub := range subs {