diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..3fc247862 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,32 @@ +# Trim service build contexts (each uses repo root via `context: ../..`). + +.git +.gitignore +.github +.claude +.claude/worktrees +.idea +.vscode +**/.DS_Store + +**/node_modules +chat-frontend/dist +chat-frontend/build +chat-frontend/coverage +chat-frontend/.vite +chat-frontend/.cache + +docker-local/backend.creds +docker-local/nats.conf +docker-local/.env +docker-local/cassandra/init +bin/ +coverage.out +coverage.html +*.log +*.test +*.tmp + +*.md +docs/ +tools/ diff --git a/Makefile b/Makefile index 571578e49..8b8995a54 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: lint fmt test test-integration generate build deps-up deps-down up down +.PHONY: lint fmt test test-integration generate build deps-up deps-down up up-rebuild down seed-users backfill-room-keys DEPS_COMPOSE := docker-local/compose.deps.yaml SERVICES_COMPOSE := docker-local/compose.services.yaml @@ -67,9 +67,8 @@ deps-up: deps-down: docker compose -f $(DEPS_COMPOSE) down -# Start microservices. With SERVICE=, starts just that service's compose; -# without, starts every service via compose.services.yaml. Foreground either way -# so container logs stream to the terminal; Ctrl-C stops. +# Start microservices in foreground. SERVICE= for one, otherwise all. +# `up` reuses images for fast boot; use `up-rebuild` after editing source. up: @docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \ echo "Deps are not running. Run 'make deps-up' first."; exit 1; \ @@ -77,12 +76,34 @@ up: @test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \ echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \ } +ifdef SERVICE + docker compose -f $(SERVICE)/deploy/docker-compose.yml up +else + docker compose -f $(SERVICES_COMPOSE) up +endif + +# Same as `up` but rebuilds images first. +up-rebuild: + @docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \ + echo "Deps are not running. Run 'make deps-up' first."; exit 1; \ + } + @test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \ + echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \ + } ifdef SERVICE docker compose -f $(SERVICE)/deploy/docker-compose.yml up --build else docker compose -f $(SERVICES_COMPOSE) up --build endif +# Seed dev-mode users (alice, bob) into Mongo. Idempotent. +seed-users: + ./docker-local/seed-users.sh + +# Backfill Valkey room keys for rooms created before mint-on-create. Idempotent. +backfill-room-keys: + ./docker-local/backfill-room-keys.sh + # Stop microservices. SERVICE= stops one; otherwise stops every service. down: ifdef SERVICE diff --git a/auth-service/deploy/docker-compose.yml b/auth-service/deploy/docker-compose.yml index f3d072b89..2a3ffcc1c 100644 --- a/auth-service/deploy/docker-compose.yml +++ b/auth-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: auth-service/deploy/Dockerfile + stop_grace_period: 2s ports: - "8080:8080" env_file: diff --git a/broadcast-worker/deploy/docker-compose.yml b/broadcast-worker/deploy/docker-compose.yml index f4918c492..4aabd1aaf 100644 --- a/broadcast-worker/deploy/docker-compose.yml +++ b/broadcast-worker/deploy/docker-compose.yml @@ -12,6 +12,7 @@ services: build: context: ../.. dockerfile: broadcast-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds @@ -25,6 +26,8 @@ services: - VALKEY_ADDR=valkey:6379 - VALKEY_KEY_GRACE_PERIOD=24h - BOOTSTRAP_STREAMS=true + # Local dev only — bundles plaintext for no-crypto frontends. Never enable in prod. + - DEV_MODE=true volumes: - ../../docker-local/backend.creds:/etc/nats/backend.creds:ro networks: diff --git a/broadcast-worker/handler.go b/broadcast-worker/handler.go index fead19b21..e8781161b 100644 --- a/broadcast-worker/handler.go +++ b/broadcast-worker/handler.go @@ -37,6 +37,9 @@ type Handler struct { userStore userstore.UserStore pub Publisher keyStore RoomKeyProvider + // devMode bundles plaintext alongside the encrypted payload for local + // frontends without crypto. MUST stay false in prod. + devMode bool } func NewHandler(store Store, userStore userstore.UserStore, pub Publisher, keyStore RoomKeyProvider) *Handler { @@ -108,7 +111,10 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli return fmt.Errorf("get room key for room %s: %w", room.ID, err) } if key == nil { - return fmt.Errorf("get room key for room %s: %w", room.ID, errNoCurrentKey) + // Permanent: ack-skip so we don't nak-loop forever. + slog.Warn("room missing encryption key — dropping live broadcast", + "roomID", room.ID, "messageID", clientMsg.ID, "err", errNoCurrentKey) + return nil } encrypted, err := roomcrypto.Encode(string(msgJSON), key.KeyPair.PublicKey, key.Version) @@ -122,7 +128,9 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli } evt.EncryptedMessage = json.RawMessage(encJSON) - evt.Message = nil + if !h.devMode { + evt.Message = nil + } payload, err := json.Marshal(evt) if err != nil { diff --git a/broadcast-worker/handler_test.go b/broadcast-worker/handler_test.go index 385019926..edec32e3a 100644 --- a/broadcast-worker/handler_test.go +++ b/broadcast-worker/handler_test.go @@ -485,8 +485,10 @@ func TestHandler_HandleMessage_ChannelRoom_Encryption(t *testing.T) { h := NewHandler(store, us, pub, keyStore) err := h.HandleMessage(context.Background(), makeMessageEvent("room-1", "hello", msgTime)) - require.Error(t, err) - assert.ErrorIs(t, err, errNoCurrentKey) + // A keyless room is treated as a permanent broadcast failure: the + // handler logs and returns nil so the caller acks (avoiding the + // JetStream redelivery loop). The fan-out is dropped — no publish. + require.NoError(t, err) assert.Empty(t, pub.records) }) diff --git a/broadcast-worker/main.go b/broadcast-worker/main.go index af628ba06..5b438e37c 100644 --- a/broadcast-worker/main.go +++ b/broadcast-worker/main.go @@ -24,20 +24,22 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"default"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` - UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` - ValkeyAddr string `env:"VALKEY_ADDR,required"` - ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` - ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"default"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` + UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` + ValkeyAddr string `env:"VALKEY_ADDR,required"` + ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` + ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` + // DevMode bundles plaintext for local frontends without crypto. MUST stay false in prod. + DevMode bool `env:"DEV_MODE" envDefault:"false"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -112,6 +114,10 @@ func main() { publisher := &natsPublisher{nc: nc} handler := NewHandler(store, us, publisher, keyStore) + handler.devMode = cfg.DevMode + if cfg.DevMode { + slog.Warn("DEV_MODE enabled — plaintext message bundled in channel events; do NOT enable in production") + } iter, err := cons.Messages(jetstream.PullMaxMessages(2 * cfg.MaxWorkers)) if err != nil { diff --git a/docker-local/backfill-room-keys.sh b/docker-local/backfill-room-keys.sh new file mode 100755 index 000000000..988a34d90 --- /dev/null +++ b/docker-local/backfill-room-keys.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +# +# Mint a P-256 room key in Valkey for any room without one. For rooms +# created before room-service started minting on create. Idempotent. + +set -euo pipefail + +MONGO_CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}" +VALKEY_CONTAINER="${VALKEY_CONTAINER:-chat-local-valkey}" +DB="${MONGO_DB:-chat}" + +if ! docker container inspect -f '{{.State.Running}}' "$MONGO_CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $MONGO_CONTAINER not running" >&2; exit 1 +fi +if ! docker container inspect -f '{{.State.Running}}' "$VALKEY_CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $VALKEY_CONTAINER not running" >&2; exit 1 +fi + +room_ids=$(docker exec "$MONGO_CONTAINER" mongosh "$DB" --quiet --eval 'db.rooms.find({}, {_id:1}).forEach(r => print(r._id))') + +if [ -z "$room_ids" ]; then + echo "(no rooms to check)" + exit 0 +fi + +# Single shared P-256 key for all backfilled rooms; dev only. +tmpdir=$(mktemp -d) +trap 'rm -rf "$tmpdir"' EXIT + +openssl ecparam -name prime256v1 -genkey -noout -out "$tmpdir/priv.pem" 2>/dev/null +priv_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \ + | awk '/priv:/{flag=1; next} /pub:/{flag=0} flag' \ + | tr -d ' :\n' \ + | xxd -r -p \ + | base64) +pub_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \ + | awk '/pub:/{flag=1; next} /ASN1 OID:/{flag=0} flag' \ + | tr -d ' :\n' \ + | xxd -r -p \ + | base64) + +if [ -z "$priv_b64" ] || [ -z "$pub_b64" ]; then + echo "ERROR: failed to extract P-256 key bytes via openssl" >&2; exit 1 +fi + +added=0 +skipped=0 +for rid in $room_ids; do + key="room:${rid}:key" + exists=$(docker exec "$VALKEY_CONTAINER" valkey-cli exists "$key") + if [ "$exists" = "1" ]; then + skipped=$((skipped + 1)) + continue + fi + docker exec "$VALKEY_CONTAINER" valkey-cli hset "$key" pub "$pub_b64" priv "$priv_b64" ver 0 > /dev/null + added=$((added + 1)) +done + +echo "rooms with new key: $added | already had key: $skipped" diff --git a/docker-local/compose.deps.yaml b/docker-local/compose.deps.yaml index 7efea9878..09f6d96da 100644 --- a/docker-local/compose.deps.yaml +++ b/docker-local/compose.deps.yaml @@ -18,12 +18,13 @@ services: - ./nats.conf:/etc/nats/nats.conf:ro - nats-data:/data/jetstream command: ["-c", "/etc/nats/nats.conf"] + # js-server-only skips the deep stream check; default 503s on fresh volumes. healthcheck: - test: ["CMD-SHELL", "wget -qO- http://localhost:8222/healthz || exit 1"] + test: ["CMD-SHELL", "wget -qO- 'http://localhost:8222/healthz?js-server-only=true' || exit 1"] interval: 5s timeout: 3s - retries: 10 - start_period: 5s + retries: 12 + start_period: 15s networks: - chat-local diff --git a/docker-local/seed-users.sh b/docker-local/seed-users.sh new file mode 100755 index 000000000..e3e6ef357 --- /dev/null +++ b/docker-local/seed-users.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# +# Seed the `users` collection with dev fixtures (alice, bob). dev-auth +# accepts any account at login but doesn't insert into Mongo, so workers +# crash with "user not found". _id == account matches frontend convention. +# Idempotent. + +set -euo pipefail + +CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}" +DB="${MONGO_DB:-chat}" +SITE_ID="${SITE_ID:-site-local}" + +if ! docker container inspect -f '{{.State.Running}}' "$CONTAINER" 2>/dev/null | grep -q true; then + echo "ERROR: $CONTAINER is not running. Run 'make deps-up' first." >&2 + exit 1 +fi + +echo "Seeding users into $CONTAINER/$DB (siteId=$SITE_ID)..." + +docker exec "$CONTAINER" mongosh --quiet "$DB" --eval " + const users = [ + { _id: 'alice', account: 'alice', siteId: '$SITE_ID', engName: 'Alice', chineseName: 'Alice', employeeId: 'E0001', sectId: 'dev', sectName: 'Dev' }, + { _id: 'bob', account: 'bob', siteId: '$SITE_ID', engName: 'Bob', chineseName: 'Bob', employeeId: 'E0002', sectId: 'dev', sectName: 'Dev' } + ]; + const ops = users.map(u => ({ + updateOne: { filter: { _id: u._id }, update: { \$set: u }, upsert: true } + })); + const res = db.users.bulkWrite(ops); + print('upserted: ' + res.upsertedCount + ', modified: ' + res.modifiedCount + ', matched: ' + res.matchedCount); +" + +echo "Done. Login as 'alice' or 'bob' (siteId=$SITE_ID) in dev mode." diff --git a/history-service/deploy/docker-compose.yml b/history-service/deploy/docker-compose.yml index 34cad84a8..215a7a54a 100644 --- a/history-service/deploy/docker-compose.yml +++ b/history-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: history-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/inbox-worker/deploy/docker-compose.yml b/inbox-worker/deploy/docker-compose.yml index 8fc9a9665..6e2c5dc28 100644 --- a/inbox-worker/deploy/docker-compose.yml +++ b/inbox-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: inbox-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/message-gatekeeper/deploy/docker-compose.yml b/message-gatekeeper/deploy/docker-compose.yml index 56394d57f..3ec9db121 100644 --- a/message-gatekeeper/deploy/docker-compose.yml +++ b/message-gatekeeper/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: message-gatekeeper/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/message-worker/deploy/docker-compose.yml b/message-worker/deploy/docker-compose.yml index aaed82edc..e2ad97360 100644 --- a/message-worker/deploy/docker-compose.yml +++ b/message-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: message-worker/deploy/Dockerfile + stop_grace_period: 2s pull_policy: build environment: - NATS_URL=nats://nats:4222 diff --git a/notification-worker/deploy/docker-compose.yml b/notification-worker/deploy/docker-compose.yml index 492a9e808..70e990abe 100644 --- a/notification-worker/deploy/docker-compose.yml +++ b/notification-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: notification-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/pkg/otelutil/otel.go b/pkg/otelutil/otel.go index 147980d1d..272f2b942 100644 --- a/pkg/otelutil/otel.go +++ b/pkg/otelutil/otel.go @@ -3,6 +3,7 @@ package otelutil import ( "context" "fmt" + "os" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -14,9 +15,14 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) -// InitTracer creates and registers a TracerProvider with OTLP gRPC exporter. -// Returns a shutdown function. +// InitTracer registers a TracerProvider with OTLP gRPC exporter. Returns a +// shutdown function. Skipped (noop provider) when no OTLP endpoint env is set. func InitTracer(ctx context.Context, serviceName string) (func(context.Context) error, error) { + if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" && os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") == "" { + otel.SetTextMapPropagator(propagation.TraceContext{}) + return func(context.Context) error { return nil }, nil + } + exp, err := otlptracegrpc.New(ctx) if err != nil { return nil, fmt.Errorf("otlp exporter: %w", err) diff --git a/room-service/deploy/docker-compose.yml b/room-service/deploy/docker-compose.yml index 89eb308a5..ee7d4186d 100644 --- a/room-service/deploy/docker-compose.yml +++ b/room-service/deploy/docker-compose.yml @@ -17,6 +17,7 @@ services: build: context: ../.. dockerfile: room-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/room-service/handler.go b/room-service/handler.go index 7741371bb..2f83a9a8e 100644 --- a/room-service/handler.go +++ b/room-service/handler.go @@ -2,6 +2,8 @@ package main import ( "context" + "crypto/ecdh" + "crypto/rand" "encoding/base64" "encoding/json" "errors" @@ -31,12 +33,19 @@ type Handler struct { maxRoomSize int maxBatchSize int publishToStream func(ctx context.Context, subj string, data []byte) error + // publishEvent is core-NATS for transient client notifications; nil-tolerant for tests. + publishEvent func(ctx context.Context, subj string, data []byte) error } func NewHandler(store RoomStore, keyStore RoomKeyStore, memberListClient MemberListClient, siteID string, maxRoomSize, maxBatchSize int, publishToStream func(context.Context, string, []byte) error) *Handler { return &Handler{store: store, keyStore: keyStore, memberListClient: memberListClient, siteID: siteID, maxRoomSize: maxRoomSize, maxBatchSize: maxBatchSize, publishToStream: publishToStream} } +func (h *Handler) WithEventPublisher(fn func(ctx context.Context, subj string, data []byte) error) *Handler { + h.publishEvent = fn + return h +} + // wrappedCtx returns m.Context() augmented with X-Request-ID from the inbound msg header; entry ctx for every nats* handler. func wrappedCtx(m otelnats.Msg) context.Context { return natsutil.ContextWithRequestIDFromHeaders(m.Context(), m.Msg.Header) @@ -125,21 +134,25 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er if len(req.Members) != 1 { return nil, fmt.Errorf("DM requires exactly one other member, got %d", len(req.Members)) } + if req.Members[0] == req.CreatedBy { + return nil, fmt.Errorf("DM requires a different recipient than the creator") + } roomID = idgen.BuildDMRoomID(req.CreatedBy, req.Members[0]) - // TODO(idgen-rework follow-up): persist a second Subscription for req.Members[0] so DMs are two-sided. - // DMs have no add-member/role-update flow, so the recipient is currently un-enrolled. - // Needs the recipient's Account (store lookup or extend CreateRoomRequest with MembersAccount). Also bump Room.UserCount to 2. default: return nil, fmt.Errorf("unsupported room type %q", req.Type) } + userCount := 1 + if req.Type == model.RoomTypeDM { + userCount = 2 + } room := model.Room{ ID: roomID, Name: req.Name, Type: req.Type, CreatedBy: req.CreatedBy, SiteID: req.SiteID, - UserCount: 1, + UserCount: userCount, CreatedAt: now, UpdatedAt: now, } @@ -148,6 +161,19 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er return nil, fmt.Errorf("create room: %w", err) } + // Mint the room key — broadcast-worker can't encrypt without it. + if h.keyStore != nil { + keyPriv, err := ecdh.P256().GenerateKey(rand.Reader) + if err != nil { + slog.Warn("generate room key failed", "error", err, "roomID", room.ID) + } else if _, err := h.keyStore.Set(ctx, room.ID, roomkeystore.RoomKeyPair{ + PublicKey: keyPriv.PublicKey().Bytes(), + PrivateKey: keyPriv.Bytes(), + }); err != nil { + slog.Warn("set room key failed", "error", err, "roomID", room.ID) + } + } + // Auto-create owner subscription sub := model.Subscription{ ID: idgen.GenerateUUIDv7(), @@ -163,6 +189,94 @@ func (h *Handler) handleCreateRoom(ctx context.Context, data []byte) ([]byte, er slog.Warn("create owner subscription failed", "error", err) } + // DM: enroll the recipient too, otherwise their reads hit "not subscribed". + // Dev convention: account == user.ID. Prod will need a real account → ID lookup. + var recipSub model.Subscription + if req.Type == model.RoomTypeDM { + recipientAccount := req.Members[0] + recipSub = model.Subscription{ + ID: idgen.GenerateUUIDv7(), + User: model.SubscriptionUser{ID: recipientAccount, Account: recipientAccount}, + RoomID: room.ID, + RoomType: req.Type, + SiteID: req.SiteID, + Roles: []model.Role{model.RoleMember}, + HistorySharedSince: &now, + JoinedAt: now, + } + if err := h.store.CreateSubscription(ctx, &recipSub); err != nil { + slog.Warn("create recipient subscription failed", "error", err, "account", recipientAccount) + } + } + + // Best-effort: notify the creator's frontend so the new room appears + // without a refresh; mirrors room-worker's subscription.update on add/role. + subEvt := model.SubscriptionUpdateEvent{ + UserID: sub.User.ID, + Subscription: sub, + Action: "added", + Timestamp: now.UnixMilli(), + } + if h.publishEvent != nil { + if subEvtData, err := json.Marshal(subEvt); err != nil { + slog.Warn("marshal subscription update event failed", "error", err, "roomID", room.ID) + } else if err := h.publishEvent(ctx, subject.SubscriptionUpdate(req.CreatedByAccount), subEvtData); err != nil { + slog.Warn("publish subscription update failed", "error", err, "roomID", room.ID) + } + + // Mirror the notification to the DM recipient so their UI updates too. + if req.Type == model.RoomTypeDM { + recipEvt := model.SubscriptionUpdateEvent{ + UserID: recipSub.User.ID, + Subscription: recipSub, + Action: "added", + Timestamp: now.UnixMilli(), + } + if recipEvtData, err := json.Marshal(recipEvt); err != nil { + slog.Warn("marshal recipient subscription update event failed", "error", err, "roomID", room.ID) + } else if err := h.publishEvent(ctx, subject.SubscriptionUpdate(req.Members[0]), recipEvtData); err != nil { + slog.Warn("publish recipient subscription update failed", "error", err, "roomID", room.ID) + } + } + } + + // Same-site member_added on INBOX so search-sync-worker indexes the + // auto-enrolled members; owner-add bypasses room-worker. Wire shape + // matches PR #145's spec. HSS=nil → unrestricted (spotlight skips + // restricted bulks for MVP). Best-effort. + if h.publishToStream != nil { + accounts := []string{req.CreatedByAccount} + if req.Type == model.RoomTypeDM { + accounts = append(accounts, req.Members[0]) + } + inboxEvt := model.InboxMemberEvent{ + RoomID: room.ID, + RoomName: room.Name, + RoomType: room.Type, + SiteID: h.siteID, + Accounts: accounts, + JoinedAt: now.UnixMilli(), + Timestamp: now.UnixMilli(), + } + inboxData, err := json.Marshal(inboxEvt) + if err != nil { + slog.Warn("marshal inbox member event failed", "error", err, "roomID", room.ID) + } else { + outboxEvt := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: h.siteID, + DestSiteID: h.siteID, + Payload: inboxData, + Timestamp: now.UnixMilli(), + } + if outboxData, err := json.Marshal(outboxEvt); err != nil { + slog.Warn("marshal outbox event failed", "error", err, "roomID", room.ID) + } else if err := h.publishToStream(ctx, subject.InboxMemberAdded(h.siteID), outboxData); err != nil { + slog.Warn("publish owner member_added failed", "error", err, "roomID", room.ID) + } + } + } + return json.Marshal(room) } diff --git a/room-service/handler_test.go b/room-service/handler_test.go index cef4ef855..426185303 100644 --- a/room-service/handler_test.go +++ b/room-service/handler_test.go @@ -1898,8 +1898,17 @@ func TestHandler_handleCreateRoom_ChannelAndDMIDFormats(t *testing.T) { var capturedSub *model.Subscription store.EXPECT().CreateRoom(gomock.Any(), gomock.Any()). DoAndReturn(func(_ context.Context, r *model.Room) error { capturedRoom = r; return nil }) + // Channels create one subscription (creator); DMs create two + // (creator + recipient). AnyTimes covers both, and we only + // capture the first call so existing assertions on subID + // stay valid for the creator's subscription. store.EXPECT().CreateSubscription(gomock.Any(), gomock.Any()). - DoAndReturn(func(_ context.Context, s *model.Subscription) error { capturedSub = s; return nil }) + DoAndReturn(func(_ context.Context, s *model.Subscription) error { + if capturedSub == nil { + capturedSub = s + } + return nil + }).AnyTimes() h := NewHandler(store, nil, nil, "site1", 100, 50, func(ctx context.Context, subj string, data []byte) error { return nil }) diff --git a/room-service/main.go b/room-service/main.go index 4047e3545..4ca7fc409 100644 --- a/room-service/main.go +++ b/room-service/main.go @@ -104,6 +104,11 @@ func main() { return fmt.Errorf("publish to %q: %w", subj, err) } return nil + }).WithEventPublisher(func(ctx context.Context, subj string, data []byte) error { + if err := nc.PublishMsg(ctx, natsutil.NewMsg(ctx, subj, data)); err != nil { + return fmt.Errorf("publish event to %q: %w", subj, err) + } + return nil }) if err := handler.RegisterCRUD(nc); err != nil { diff --git a/room-service/mock_store_test.go b/room-service/mock_store_test.go index d2ed18b7d..984e82276 100644 --- a/room-service/mock_store_test.go +++ b/room-service/mock_store_test.go @@ -258,3 +258,18 @@ func (mr *MockRoomKeyStoreMockRecorder) GetMany(ctx, roomIDs any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMany", reflect.TypeOf((*MockRoomKeyStore)(nil).GetMany), ctx, roomIDs) } + +// Set mocks base method. +func (m *MockRoomKeyStore) Set(ctx context.Context, roomID string, pair roomkeystore.RoomKeyPair) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Set", ctx, roomID, pair) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Set indicates an expected call of Set. +func (mr *MockRoomKeyStoreMockRecorder) Set(ctx, roomID, pair any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockRoomKeyStore)(nil).Set), ctx, roomID, pair) +} diff --git a/room-service/store.go b/room-service/store.go index cdb7e19ff..44981f743 100644 --- a/room-service/store.go +++ b/room-service/store.go @@ -52,8 +52,12 @@ type RoomStore interface { ListOrgMembers(ctx context.Context, orgID string) ([]model.OrgMember, error) } -// RoomKeyStore is the consumer-side interface for room encryption key lookups. -// Only the methods room-service needs are declared here. +// RoomKeyStore is the consumer-side interface for room encryption key lookups +// and the create-on-room-create write. Only the methods room-service needs +// are declared here. broadcast-worker requires a key for every room before +// it can encrypt — Set is called from handleCreateRoom so newly created +// rooms come with a key in place. type RoomKeyStore interface { GetMany(ctx context.Context, roomIDs []string) (map[string]*roomkeystore.VersionedKeyPair, error) + Set(ctx context.Context, roomID string, pair roomkeystore.RoomKeyPair) (int, error) } diff --git a/room-worker/deploy/docker-compose.yml b/room-worker/deploy/docker-compose.yml index 0bf367dfc..457542ffc 100644 --- a/room-worker/deploy/docker-compose.yml +++ b/room-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: room-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds diff --git a/room-worker/handler.go b/room-worker/handler.go index 25efc8d9c..d8f2c7b28 100644 --- a/room-worker/handler.go +++ b/room-worker/handler.go @@ -287,12 +287,15 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove } seed := messageDedupSeed(ctx, "processRemoveIndividual", req.RoomID, fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp)) + // UserID == UserAccount under dev convention; prod needs real account → ID lookup. sysMsg := model.Message{ - ID: idgen.MessageIDFromRequestID(seed, "rmindiv"), - RoomID: req.RoomID, - Type: evtType, - SysMsgData: sysMsgData, - CreatedAt: now, + ID: idgen.MessageIDFromRequestID(seed, "rmindiv"), + RoomID: req.RoomID, + UserID: req.Requester, + UserAccount: req.Requester, + Type: evtType, + SysMsgData: sysMsgData, + CreatedAt: now, } msgEvt := model.MessageEvent{ Message: sysMsg, @@ -412,11 +415,13 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR seed := messageDedupSeed(ctx, "processRemoveOrg", req.RoomID, fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp)) sysMsg := model.Message{ - ID: idgen.MessageIDFromRequestID(seed, "rmorg"), - RoomID: req.RoomID, - Type: "member_removed", - SysMsgData: sysMsgPayload, - CreatedAt: now, + ID: idgen.MessageIDFromRequestID(seed, "rmorg"), + RoomID: req.RoomID, + UserID: req.Requester, + UserAccount: req.Requester, + Type: "member_removed", + SysMsgData: sysMsgPayload, + CreatedAt: now, } msgEvt := model.MessageEvent{ Message: sysMsg, @@ -687,6 +692,43 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error return fmt.Errorf("publish add-members system message: %w", err) } + // 9b. Same-site INBOX member_added so search-sync-worker indexes local + // members. Cross-site keep going through OUTBOX below. PR #145 spec. + sameSiteAccounts := make([]string, 0, len(actualAccounts)) + for _, account := range actualAccounts { + user, ok := userMap[account] + if !ok || user.SiteID != room.SiteID { + continue + } + sameSiteAccounts = append(sameSiteAccounts, account) + } + if len(sameSiteAccounts) > 0 { + inboxEvt := model.InboxMemberEvent{ + RoomID: room.ID, + RoomName: room.Name, + RoomType: room.Type, + SiteID: room.SiteID, + Accounts: sameSiteAccounts, + HistorySharedSince: historySharedSince, + JoinedAt: req.Timestamp, + Timestamp: now.UnixMilli(), + } + inboxData, _ := json.Marshal(inboxEvt) + outboxWrap := model.OutboxEvent{ + Type: model.OutboxMemberAdded, + SiteID: room.SiteID, + DestSiteID: room.SiteID, + Payload: inboxData, + Timestamp: now.UnixMilli(), + } + outboxData, _ := json.Marshal(outboxWrap) + payloadSeed := fmt.Sprintf("%s:%s:%d:local-added", req.RoomID, req.RequesterAccount, req.Timestamp) + dedupID := outboxDedupID(ctx, room.SiteID, payloadSeed) + if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, dedupID); err != nil { + return fmt.Errorf("publish local inbox member_added: %w", err) + } + } + // 10. Outbox for cross-site members — batched by destination site remoteSiteMembers := make(map[string][]string) for _, sub := range subs { diff --git a/search-service/deploy/docker-compose.yml b/search-service/deploy/docker-compose.yml index b4c191c99..42670f60a 100644 --- a/search-service/deploy/docker-compose.yml +++ b/search-service/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: search-service/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds @@ -19,6 +20,10 @@ services: - SEARCH_RECENT_WINDOW=8760h - SEARCH_REQUEST_TIMEOUT=10s - SEARCH_METRICS_ADDR=:9090 + # Pin to the concrete site-suffixed indices written by + # search-sync-worker. Prod uses aliases owned by ops/IaC. + - SEARCH_USER_ROOM_INDEX=user-room-site-local + - SEARCH_SPOTLIGHT_INDEX=spotlight-site-local-v1-chat ports: # Expose /metrics so Prometheus / curl can scrape from the host # during local dev. The listener is bound on 0.0.0.0:9090 inside diff --git a/search-service/handler.go b/search-service/handler.go index 9e2cfafa6..06753a9e4 100644 --- a/search-service/handler.go +++ b/search-service/handler.go @@ -20,6 +20,7 @@ type handlerConfig struct { RecentWindow time.Duration RequestTimeout time.Duration UserRoomIndex string + SpotlightIndex string } type handler struct { @@ -136,7 +137,7 @@ func (h *handler) searchRooms(c *natsrouter.Context, req model.SearchRoomsReques } observeESDone := observeES() - raw, err := h.store.Search(ctx, []string{SpotlightIndex}, body) + raw, err := h.store.Search(ctx, []string{h.cfg.SpotlightIndex}, body) observeESDone() if err != nil { slog.Error("room search backend failed", "account", account, "error", err) diff --git a/search-service/handler_test.go b/search-service/handler_test.go index 3a13f2aeb..d51f413e8 100644 --- a/search-service/handler_test.go +++ b/search-service/handler_test.go @@ -84,6 +84,7 @@ func newTestHandler(store SearchStore, cache RestrictedRoomCache) *handler { MaxDocCounts: 100, RestrictedRoomsCacheTTL: 5 * time.Minute, RecentWindow: 365 * 24 * time.Hour, + SpotlightIndex: SpotlightIndex, }) } diff --git a/search-service/main.go b/search-service/main.go index 2e383b40d..53caea62b 100644 --- a/search-service/main.go +++ b/search-service/main.go @@ -45,7 +45,8 @@ type SearchConfig struct { RestrictedRoomsCacheTTL time.Duration `env:"RESTRICTED_ROOMS_CACHE_TTL" envDefault:"5m"` RecentWindow time.Duration `env:"RECENT_WINDOW" envDefault:"8760h"` RequestTimeout time.Duration `env:"REQUEST_TIMEOUT" envDefault:"10s"` - UserRoomIndex string `env:"USER_ROOM_INDEX" envDefault:""` + UserRoomIndex string `env:"USER_ROOM_INDEX,required"` + SpotlightIndex string `env:"SPOTLIGHT_INDEX,required"` MetricsAddr string `env:"METRICS_ADDR" envDefault:":9090"` } @@ -107,6 +108,7 @@ func main() { RecentWindow: cfg.Search.RecentWindow, RequestTimeout: cfg.Search.RequestTimeout, UserRoomIndex: cfg.Search.UserRoomIndex, + SpotlightIndex: cfg.Search.SpotlightIndex, }) router := natsrouter.New(nc, "search-service") diff --git a/search-sync-worker/deploy/docker-compose.yml b/search-sync-worker/deploy/docker-compose.yml index f2948e0bc..d3649b129 100644 --- a/search-sync-worker/deploy/docker-compose.yml +++ b/search-sync-worker/deploy/docker-compose.yml @@ -5,6 +5,7 @@ services: build: context: ../.. dockerfile: search-sync-worker/deploy/Dockerfile + stop_grace_period: 2s environment: - NATS_URL=nats://nats:4222 - NATS_CREDS_FILE=/etc/nats/backend.creds