Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Trim service build contexts (each uses repo root via `context: ../..`).

.git
.gitignore
.github
.claude
.claude/worktrees
.idea
.vscode
**/.DS_Store

**/node_modules
chat-frontend/dist
chat-frontend/build
chat-frontend/coverage
chat-frontend/.vite
chat-frontend/.cache

docker-local/backend.creds
docker-local/nats.conf
docker-local/.env
docker-local/cassandra/init
bin/
coverage.out
coverage.html
*.log
*.test
*.tmp

*.md
docs/
tools/
29 changes: 25 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: lint fmt test test-integration generate build deps-up deps-down up down
.PHONY: lint fmt test test-integration generate build deps-up deps-down up up-rebuild down seed-users backfill-room-keys

DEPS_COMPOSE := docker-local/compose.deps.yaml
SERVICES_COMPOSE := docker-local/compose.services.yaml
Expand Down Expand Up @@ -67,22 +67,43 @@ deps-up:
deps-down:
docker compose -f $(DEPS_COMPOSE) down

# Start microservices. With SERVICE=<name>, starts just that service's compose;
# without, starts every service via compose.services.yaml. Foreground either way
# so container logs stream to the terminal; Ctrl-C stops.
# Start microservices in foreground. SERVICE=<name> for one, otherwise all.
# `up` reuses images for fast boot; use `up-rebuild` after editing source.
up:
@docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \
echo "Deps are not running. Run 'make deps-up' first."; exit 1; \
}
@test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \
echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \
}
ifdef SERVICE
docker compose -f $(SERVICE)/deploy/docker-compose.yml up
else
docker compose -f $(SERVICES_COMPOSE) up
endif

# Same as `up` but rebuilds images first.
up-rebuild:
@docker container inspect -f '{{.State.Running}}' $(NATS_CONTAINER) 2>/dev/null | grep -q true || { \
echo "Deps are not running. Run 'make deps-up' first."; exit 1; \
}
@test -f $(NATS_CREDS) && test -f $(NATS_CONF) || { \
echo "Missing $(NATS_CREDS) or $(NATS_CONF). Run './docker-local/setup.sh'."; exit 1; \
}
ifdef SERVICE
docker compose -f $(SERVICE)/deploy/docker-compose.yml up --build
else
docker compose -f $(SERVICES_COMPOSE) up --build
endif

# Seed dev-mode users (alice, bob) into Mongo. Idempotent.
seed-users:
./docker-local/seed-users.sh

# Backfill Valkey room keys for rooms created before mint-on-create. Idempotent.
backfill-room-keys:
./docker-local/backfill-room-keys.sh

# Stop microservices. SERVICE=<name> stops one; otherwise stops every service.
down:
ifdef SERVICE
Expand Down
1 change: 1 addition & 0 deletions auth-service/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: auth-service/deploy/Dockerfile
stop_grace_period: 2s
ports:
- "8080:8080"
env_file:
Expand Down
3 changes: 3 additions & 0 deletions broadcast-worker/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
build:
context: ../..
dockerfile: broadcast-worker/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand All @@ -25,6 +26,8 @@ services:
- VALKEY_ADDR=valkey:6379
- VALKEY_KEY_GRACE_PERIOD=24h
- BOOTSTRAP_STREAMS=true
# Local dev only — bundles plaintext for no-crypto frontends. Never enable in prod.
- DEV_MODE=true
volumes:
- ../../docker-local/backend.creds:/etc/nats/backend.creds:ro
networks:
Expand Down
12 changes: 10 additions & 2 deletions broadcast-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type Handler struct {
userStore userstore.UserStore
pub Publisher
keyStore RoomKeyProvider
// devMode bundles plaintext alongside the encrypted payload for local
// frontends without crypto. MUST stay false in prod.
devMode bool
}

func NewHandler(store Store, userStore userstore.UserStore, pub Publisher, keyStore RoomKeyProvider) *Handler {
Expand Down Expand Up @@ -108,7 +111,10 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli
return fmt.Errorf("get room key for room %s: %w", room.ID, err)
}
if key == nil {
return fmt.Errorf("get room key for room %s: %w", room.ID, errNoCurrentKey)
// Permanent: ack-skip so we don't nak-loop forever.
slog.Warn("room missing encryption key — dropping live broadcast",
"roomID", room.ID, "messageID", clientMsg.ID, "err", errNoCurrentKey)
return nil
}

encrypted, err := roomcrypto.Encode(string(msgJSON), key.KeyPair.PublicKey, key.Version)
Expand All @@ -122,7 +128,9 @@ func (h *Handler) publishChannelEvent(ctx context.Context, room *model.Room, cli
}

evt.EncryptedMessage = json.RawMessage(encJSON)
evt.Message = nil
if !h.devMode {
evt.Message = nil
}

payload, err := json.Marshal(evt)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions broadcast-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,10 @@ func TestHandler_HandleMessage_ChannelRoom_Encryption(t *testing.T) {

h := NewHandler(store, us, pub, keyStore)
err := h.HandleMessage(context.Background(), makeMessageEvent("room-1", "hello", msgTime))
require.Error(t, err)
assert.ErrorIs(t, err, errNoCurrentKey)
// A keyless room is treated as a permanent broadcast failure: the
// handler logs and returns nil so the caller acks (avoiding the
// JetStream redelivery loop). The fan-out is dropped — no publish.
require.NoError(t, err)
assert.Empty(t, pub.records)
})

Expand Down
34 changes: 20 additions & 14 deletions broadcast-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ import (
)

type config struct {
NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"`
NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""`
SiteID string `env:"SITE_ID" envDefault:"default"`
MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"`
MongoDB string `env:"MONGO_DB" envDefault:"chat"`
MongoUsername string `env:"MONGO_USERNAME" envDefault:""`
MongoPassword string `env:"MONGO_PASSWORD" envDefault:""`
MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"`
UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"`
UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"`
ValkeyAddr string `env:"VALKEY_ADDR,required"`
ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""`
ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"`
Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"`
NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"`
NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""`
SiteID string `env:"SITE_ID" envDefault:"default"`
MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"`
MongoDB string `env:"MONGO_DB" envDefault:"chat"`
MongoUsername string `env:"MONGO_USERNAME" envDefault:""`
MongoPassword string `env:"MONGO_PASSWORD" envDefault:""`
MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"`
UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"`
UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"`
ValkeyAddr string `env:"VALKEY_ADDR,required"`
ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""`
ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"`
// DevMode bundles plaintext for local frontends without crypto. MUST stay false in prod.
DevMode bool `env:"DEV_MODE" envDefault:"false"`
Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"`
}

func main() {
Expand Down Expand Up @@ -112,6 +114,10 @@ func main() {

publisher := &natsPublisher{nc: nc}
handler := NewHandler(store, us, publisher, keyStore)
handler.devMode = cfg.DevMode
if cfg.DevMode {
slog.Warn("DEV_MODE enabled — plaintext message bundled in channel events; do NOT enable in production")
}

iter, err := cons.Messages(jetstream.PullMaxMessages(2 * cfg.MaxWorkers))
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions docker-local/backfill-room-keys.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash
#
# Mint a P-256 room key in Valkey for any room without one. For rooms
# created before room-service started minting on create. Idempotent.

set -euo pipefail

MONGO_CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}"
VALKEY_CONTAINER="${VALKEY_CONTAINER:-chat-local-valkey}"
DB="${MONGO_DB:-chat}"

if ! docker container inspect -f '{{.State.Running}}' "$MONGO_CONTAINER" 2>/dev/null | grep -q true; then
echo "ERROR: $MONGO_CONTAINER not running" >&2; exit 1
fi
if ! docker container inspect -f '{{.State.Running}}' "$VALKEY_CONTAINER" 2>/dev/null | grep -q true; then
echo "ERROR: $VALKEY_CONTAINER not running" >&2; exit 1
fi

room_ids=$(docker exec "$MONGO_CONTAINER" mongosh "$DB" --quiet --eval 'db.rooms.find({}, {_id:1}).forEach(r => print(r._id))')

if [ -z "$room_ids" ]; then
echo "(no rooms to check)"
exit 0
fi

# Single shared P-256 key for all backfilled rooms; dev only.
tmpdir=$(mktemp -d)
trap 'rm -rf "$tmpdir"' EXIT

openssl ecparam -name prime256v1 -genkey -noout -out "$tmpdir/priv.pem" 2>/dev/null
priv_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \
| awk '/priv:/{flag=1; next} /pub:/{flag=0} flag' \
| tr -d ' :\n' \
| xxd -r -p \
| base64)
pub_b64=$(openssl ec -in "$tmpdir/priv.pem" -text -noout 2>/dev/null \
| awk '/pub:/{flag=1; next} /ASN1 OID:/{flag=0} flag' \
| tr -d ' :\n' \
| xxd -r -p \
| base64)

if [ -z "$priv_b64" ] || [ -z "$pub_b64" ]; then
echo "ERROR: failed to extract P-256 key bytes via openssl" >&2; exit 1
fi

added=0
skipped=0
for rid in $room_ids; do
key="room:${rid}:key"
exists=$(docker exec "$VALKEY_CONTAINER" valkey-cli exists "$key")
if [ "$exists" = "1" ]; then
skipped=$((skipped + 1))
continue
fi
docker exec "$VALKEY_CONTAINER" valkey-cli hset "$key" pub "$pub_b64" priv "$priv_b64" ver 0 > /dev/null
added=$((added + 1))
done

echo "rooms with new key: $added | already had key: $skipped"
7 changes: 4 additions & 3 deletions docker-local/compose.deps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ services:
- ./nats.conf:/etc/nats/nats.conf:ro
- nats-data:/data/jetstream
command: ["-c", "/etc/nats/nats.conf"]
# js-server-only skips the deep stream check; default 503s on fresh volumes.
healthcheck:
test: ["CMD-SHELL", "wget -qO- http://localhost:8222/healthz || exit 1"]
test: ["CMD-SHELL", "wget -qO- 'http://localhost:8222/healthz?js-server-only=true' || exit 1"]
interval: 5s
timeout: 3s
retries: 10
start_period: 5s
retries: 12
start_period: 15s
networks:
- chat-local

Expand Down
33 changes: 33 additions & 0 deletions docker-local/seed-users.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash
#
# Seed the `users` collection with dev fixtures (alice, bob). dev-auth
# accepts any account at login but doesn't insert into Mongo, so workers
# crash with "user not found". _id == account matches frontend convention.
# Idempotent.

set -euo pipefail

CONTAINER="${MONGO_CONTAINER:-chat-local-mongodb}"
DB="${MONGO_DB:-chat}"
SITE_ID="${SITE_ID:-site-local}"

if ! docker container inspect -f '{{.State.Running}}' "$CONTAINER" 2>/dev/null | grep -q true; then
echo "ERROR: $CONTAINER is not running. Run 'make deps-up' first." >&2
exit 1
fi

echo "Seeding users into $CONTAINER/$DB (siteId=$SITE_ID)..."

docker exec "$CONTAINER" mongosh --quiet "$DB" --eval "
const users = [
{ _id: 'alice', account: 'alice', siteId: '$SITE_ID', engName: 'Alice', chineseName: 'Alice', employeeId: 'E0001', sectId: 'dev', sectName: 'Dev' },
{ _id: 'bob', account: 'bob', siteId: '$SITE_ID', engName: 'Bob', chineseName: 'Bob', employeeId: 'E0002', sectId: 'dev', sectName: 'Dev' }
];
const ops = users.map(u => ({
updateOne: { filter: { _id: u._id }, update: { \$set: u }, upsert: true }
}));
const res = db.users.bulkWrite(ops);
print('upserted: ' + res.upsertedCount + ', modified: ' + res.modifiedCount + ', matched: ' + res.matchedCount);
"

echo "Done. Login as 'alice' or 'bob' (siteId=$SITE_ID) in dev mode."
1 change: 1 addition & 0 deletions history-service/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: history-service/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand Down
1 change: 1 addition & 0 deletions inbox-worker/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: inbox-worker/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand Down
1 change: 1 addition & 0 deletions message-gatekeeper/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: message-gatekeeper/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand Down
1 change: 1 addition & 0 deletions message-worker/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: message-worker/deploy/Dockerfile
stop_grace_period: 2s
pull_policy: build
environment:
- NATS_URL=nats://nats:4222
Expand Down
1 change: 1 addition & 0 deletions notification-worker/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ services:
build:
context: ../..
dockerfile: notification-worker/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand Down
10 changes: 8 additions & 2 deletions pkg/otelutil/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otelutil
import (
"context"
"fmt"
"os"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
Expand All @@ -14,9 +15,14 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

// InitTracer creates and registers a TracerProvider with OTLP gRPC exporter.
// Returns a shutdown function.
// InitTracer registers a TracerProvider with OTLP gRPC exporter. Returns a
// shutdown function. Skipped (noop provider) when no OTLP endpoint env is set.
func InitTracer(ctx context.Context, serviceName string) (func(context.Context) error, error) {
if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" && os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") == "" {
otel.SetTextMapPropagator(propagation.TraceContext{})
return func(context.Context) error { return nil }, nil
}

exp, err := otlptracegrpc.New(ctx)
if err != nil {
return nil, fmt.Errorf("otlp exporter: %w", err)
Expand Down
1 change: 1 addition & 0 deletions room-service/deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
build:
context: ../..
dockerfile: room-service/deploy/Dockerfile
stop_grace_period: 2s
environment:
- NATS_URL=nats://nats:4222
- NATS_CREDS_FILE=/etc/nats/backend.creds
Expand Down
Loading
Loading