diff --git a/broadcast-worker/deploy/docker-compose.yml b/broadcast-worker/deploy/docker-compose.yml index e44332168..2b519e112 100644 --- a/broadcast-worker/deploy/docker-compose.yml +++ b/broadcast-worker/deploy/docker-compose.yml @@ -15,7 +15,7 @@ services: # Set USER_CACHE_SIZE=0 to disable caching. - USER_CACHE_SIZE=10000 - USER_CACHE_TTL=5m - - VALKEY_ADDR=valkey:6379 + - VALKEY_ADDRS=valkey:6379 - VALKEY_KEY_GRACE_PERIOD=24h - BOOTSTRAP_STREAMS=true - ENCRYPTION_ENABLED=${ENCRYPTION_ENABLED:-false} diff --git a/broadcast-worker/main.go b/broadcast-worker/main.go index c284bf2e8..a3960e5dc 100644 --- a/broadcast-worker/main.go +++ b/broadcast-worker/main.go @@ -40,7 +40,7 @@ type config struct { UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` RoomMetaCacheSize int `env:"ROOM_META_CACHE_SIZE" envDefault:"10000"` RoomMetaCacheTTL time.Duration `env:"ROOM_META_CACHE_TTL" envDefault:"2m"` - ValkeyAddr string `env:"VALKEY_ADDR"` + ValkeyAddrs []string `env:"VALKEY_ADDRS" envSeparator:","` ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD" envDefault:"24h"` Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` @@ -88,14 +88,17 @@ func main() { var keyStore roomkeystore.RoomKeyStore if cfg.Encryption.Enabled { - if cfg.ValkeyAddr == "" || cfg.ValkeyKeyGracePeriod <= 0 { - slog.Error("encryption enabled but VALKEY_ADDR is empty or VALKEY_KEY_GRACE_PERIOD is not a positive duration", - "valkey_addr_set", cfg.ValkeyAddr != "", + if len(cfg.ValkeyAddrs) == 0 { + slog.Error("encryption enabled but VALKEY_ADDRS is empty") + os.Exit(1) + } + if cfg.ValkeyKeyGracePeriod <= 0 { + slog.Error("VALKEY_KEY_GRACE_PERIOD must be a positive duration", "valkey_key_grace_period", cfg.ValkeyKeyGracePeriod) os.Exit(1) } - keyStore, err = roomkeystore.NewValkeyStore(roomkeystore.Config{ - Addr: cfg.ValkeyAddr, + keyStore, err = roomkeystore.NewValkeyClusterStore(roomkeystore.ClusterConfig{ + Addrs: cfg.ValkeyAddrs, Password: cfg.ValkeyPassword, GracePeriod: cfg.ValkeyKeyGracePeriod, }) diff --git a/docker-local/compose.deps.yaml b/docker-local/compose.deps.yaml index 696372378..b28dc3f27 100644 --- a/docker-local/compose.deps.yaml +++ b/docker-local/compose.deps.yaml @@ -143,20 +143,33 @@ services: - chat-local # Valkey backs the per-user restricted-rooms cache in search-service (5-min - # TTL, lazy-populated from Elasticsearch on miss). Persistence is disabled: - # the cache is derivative of the authoritative user-room ES doc and survives - # restart only through the lazy-populate path. + # TTL, lazy-populated from Elasticsearch on miss) and room key pairs for + # encryption. Persistence is disabled: the cache is derivative of the + # authoritative user-room ES doc and survives restart only through the + # lazy-populate path. + # Single-node cluster-mode: the entrypoint starts valkey-server with + # --cluster-enabled, waits for it to accept connections, then assigns all + # 16384 hash slots so it forms a valid one-node cluster. valkey: image: valkey/valkey:8-alpine container_name: chat-local-valkey ports: - "6379:6379" - command: ["valkey-server", "--save", "", "--appendonly", "no"] + entrypoint: + - sh + - -c + - | + valkey-server --cluster-enabled yes --cluster-config-file /tmp/nodes.conf --cluster-node-timeout 5000 --save '' --appendonly no & + until valkey-cli ping > /dev/null 2>&1; do sleep 0.1; done + if ! valkey-cli CLUSTER INFO | grep -q 'cluster_slots_assigned:16384'; then + valkey-cli CLUSTER ADDSLOTSRANGE 0 16383 + fi + wait healthcheck: - test: ["CMD", "valkey-cli", "ping"] + test: ["CMD-SHELL", "valkey-cli CLUSTER INFO | grep 'cluster_state:ok'"] interval: 5s timeout: 3s - retries: 5 + retries: 10 networks: - chat-local diff --git a/docs/superpowers/plans/2026-05-19-valkey-cluster-support.md b/docs/superpowers/plans/2026-05-19-valkey-cluster-support.md new file mode 100644 index 000000000..42543cd9c --- /dev/null +++ b/docs/superpowers/plans/2026-05-19-valkey-cluster-support.md @@ -0,0 +1,943 @@ +# Valkey Cluster Support — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Spec:** `docs/superpowers/specs/2026-05-19-valkey-cluster-support-design.md` + +**Goal:** Replace single-node Valkey with a cluster-mode deployment across all sites. Covers hash-tagged key names, cluster adapter, `valkeyutil` cluster support, service config migration from `VALKEY_ADDR` to `VALKEY_ADDRS`, docker-compose updates, and the new room key ensure RPC in `room-service`. + +**Architecture:** `pkg/roomkeystore` gains a `clusterAdapter` + `NewValkeyClusterStore` parallel to the existing standalone path. `pkg/valkeyutil` gains `ConnectCluster`. All five services switch to `VALKEY_ADDRS` + cluster constructors. A new `NatsHandleEnsureRoomKey` handler in `room-service` lets external connectors get or generate a room key via NATS. + +--- + +## File Map + +| File | Action | Responsibility | +|---|---|---| +| `pkg/testutil/testimages/testimages.go` | Modify | Add `ValkeyCluster` image constant | +| `pkg/roomkeystore/roomkeystore.go` | Modify | Hash-tag `roomkey` and `roomprevkey` | +| `pkg/roomkeystore/roomkeystore_test.go` | Modify | Update key name assertions | +| `pkg/roomkeystore/adapter.go` | Modify | Add `clusterAdapter`, `ClusterConfig`, `NewValkeyClusterStore` | +| `pkg/roomkeystore/integration_test.go` | Modify | Add `setupValkeyCluster` + 3 cluster tests | +| `pkg/valkeyutil/valkey.go` | Modify | Add `clusterRedisClient`, `ConnectCluster` | +| `pkg/valkeyutil/valkey_test.go` | Modify | Add cluster integration test | +| `pkg/model/room.go` | Modify | Add `RoomKeyEnsureRequest` | +| `pkg/subject/subject.go` | Modify | Add `RoomKeyEnsure` | +| `room-service/main.go` | Modify | `ValkeyAddr` → `ValkeyAddrs`, cluster constructor | +| `room-service/handler.go` | Modify | Add `NatsHandleEnsureRoomKey`, register in `RegisterCRUD` | +| `room-service/handler_test.go` | Modify | TDD tests for `NatsHandleEnsureRoomKey` | +| `room-service/mock_store_test.go` | Regenerate | `make generate SERVICE=room-service` | +| `room-worker/main.go` | Modify | `ValkeyAddr` → `ValkeyAddrs`, cluster constructor | +| `broadcast-worker/main.go` | Modify | Same | +| `history-service/cmd/main.go` | Modify | Same | +| `search-service/main.go` | Modify | `Valkey.Addr` → `Valkey.Addrs`, `ConnectCluster` | +| `room-service/deploy/docker-compose.yml` | Modify | Single node → cluster, `VALKEY_ADDRS` | +| `room-worker/deploy/docker-compose.yml` | Modify | Same | +| `broadcast-worker/deploy/docker-compose.yml` | Modify | Same | +| `history-service/deploy/docker-compose.yml` | Modify | Same | +| `search-service/deploy/docker-compose.yml` | Modify | Same | + +--- + +## Task 1: Pin ValkeyCluster image constant + +**Files:** +- Modify: `pkg/testutil/testimages/testimages.go` + +- [ ] **Step 1: Add `ValkeyCluster` constant** + +```go +// ValkeyCluster is the image for cluster-mode Valkey integration tests. +ValkeyCluster = "bitnami/valkey-cluster:8" +``` + +- [ ] **Step 2: Verify compile** + +```bash +make build SERVICE=pkg/testutil/testimages +``` + +- [ ] **Step 3: Commit** + +```bash +git add pkg/testutil/testimages/testimages.go +git commit -m "chore(testimages): add ValkeyCluster image constant" +``` + +--- + +## Task 2: Hash-tag key names in `pkg/roomkeystore` + +**Files:** +- Modify: `pkg/roomkeystore/roomkeystore.go` +- Modify: `pkg/roomkeystore/roomkeystore_test.go` + +This is the foundational change that makes the Lua rotate script and `deletePipeline` safe in cluster mode. Both keys for the same room must land on the same slot. + +- [ ] **Step 1: Update `roomkey` and `roomprevkey` in `roomkeystore.go`** + +```go +func roomkey(roomID string) string { + return "room:{" + roomID + "}:key" +} + +func roomprevkey(roomID string) string { + return "room:{" + roomID + "}:key:prev" +} +``` + +- [ ] **Step 2: Check `roomkeystore_test.go` for any hardcoded key name assertions and update them** + +Search for `room:` string literals in `roomkeystore_test.go`. Update any that assert the raw key format to use the hash-tagged form. + +- [ ] **Step 3: Run unit tests and confirm they pass** + +```bash +make test SERVICE=pkg/roomkeystore +``` + +Expected: all existing unit tests pass — the `fakeHashClient` does not validate key name format. + +- [ ] **Step 4: Commit** + +```bash +git add pkg/roomkeystore/roomkeystore.go pkg/roomkeystore/roomkeystore_test.go +git commit -m "fix(roomkeystore): hash-tag key names for Valkey cluster slot consistency" +``` + +--- + +## Task 3: Add `clusterAdapter`, `ClusterConfig`, `NewValkeyClusterStore` + +**Files:** +- Modify: `pkg/roomkeystore/adapter.go` + +- [ ] **Step 1: Add `ClusterConfig` to `adapter.go`** + +```go +// ClusterConfig holds connection config for a Valkey cluster deployment. +// Addrs is a comma-separated list of seed node addresses; go-redis ClusterClient +// discovers all nodes automatically via CLUSTER SLOTS. One address is sufficient +// but listing all masters is more robust. +type ClusterConfig struct { + Addrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + Password string `env:"VALKEY_PASSWORD" envDefault:""` + GracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` +} +``` + +- [ ] **Step 2: Add `clusterAdapter` struct and all `hashCommander` methods** + +```go +type clusterAdapter struct { + c *redis.ClusterClient +} + +func (a *clusterAdapter) hset(ctx context.Context, key string, pub, priv string) error { + return a.c.HSet(ctx, key, "pub", pub, "priv", priv, "ver", "0").Err() +} + +func (a *clusterAdapter) hsetWithVersion(ctx context.Context, key string, pub, priv string, version int) error { + return a.c.HSet(ctx, key, "pub", pub, "priv", priv, "ver", strconv.Itoa(version)).Err() +} + +func (a *clusterAdapter) hgetall(ctx context.Context, key string) (map[string]string, error) { + return a.c.HGetAll(ctx, key).Result() +} + +func (a *clusterAdapter) hgetallMany(ctx context.Context, keys []string) ([]map[string]string, error) { + if len(keys) == 0 { + return nil, nil + } + pipe := a.c.Pipeline() + cmds := make([]*redis.MapStringStringCmd, len(keys)) + for i, k := range keys { + cmds[i] = pipe.HGetAll(ctx, k) + } + if _, err := pipe.Exec(ctx); err != nil { + return nil, err + } + out := make([]map[string]string, len(keys)) + for i, c := range cmds { + m, err := c.Result() + if err != nil { + return nil, err + } + out[i] = m + } + return out, nil +} + +func (a *clusterAdapter) rotatePipeline(ctx context.Context, currentKey, prevKey string, pub, priv string, gracePeriod time.Duration) (int, error) { + graceSec := int(gracePeriod.Seconds()) + if graceSec < 1 { + graceSec = 1 + } + result, err := rotateScript.Run(ctx, a.c, []string{currentKey, prevKey}, pub, priv, graceSec).Int() + if err != nil && strings.Contains(err.Error(), "no current key") { + return 0, ErrNoCurrentKey + } + return result, err +} + +func (a *clusterAdapter) deletePipeline(ctx context.Context, currentKey, prevKey string) error { + return a.c.Del(ctx, currentKey, prevKey).Err() +} + +func (a *clusterAdapter) closeClient() error { + return a.c.Close() +} +``` + +- [ ] **Step 3: Add `NewValkeyClusterStore` constructor** + +```go +// NewValkeyClusterStore creates a valkeyStore backed by a Valkey cluster, +// pings the cluster to verify connectivity, and returns it. +func NewValkeyClusterStore(cfg ClusterConfig) (RoomKeyStore, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: cfg.Addrs, + Password: cfg.Password, + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("valkey cluster connect: %w", err) + } + return &valkeyStore{ + client: &clusterAdapter{c: c}, + closer: c, + gracePeriod: cfg.GracePeriod, + }, nil +} +``` + +- [ ] **Step 4: Run unit tests and confirm they pass** + +```bash +make test SERVICE=pkg/roomkeystore +``` + +Expected: all existing unit tests pass — `clusterAdapter` is not exercised by unit tests (fake is used instead). + +- [ ] **Step 5: Lint** + +```bash +make lint +``` + +- [ ] **Step 6: Commit** + +```bash +git add pkg/roomkeystore/adapter.go +git commit -m "feat(roomkeystore): add ClusterConfig, clusterAdapter, NewValkeyClusterStore" +``` + +--- + +## Task 4: Cluster integration tests for `pkg/roomkeystore` + +**Files:** +- Modify: `pkg/roomkeystore/integration_test.go` + +- [ ] **Step 1: Add `setupValkeyCluster` helper** + +```go +// setupValkeyCluster starts a bitnami/valkey-cluster:8 container (3 masters, +// 0 replicas) and returns a connected RoomKeyStore backed by NewValkeyClusterStore. +func setupValkeyCluster(t *testing.T, gracePeriod time.Duration) RoomKeyStore { + t.Helper() + ctx := context.Background() + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: testimages.ValkeyCluster, + ExposedPorts: []string{"6379/tcp"}, + Env: map[string]string{ + "VALKEY_CLUSTER_REPLICAS": "0", + "ALLOW_EMPTY_PASSWORD": "yes", + }, + WaitingFor: wait.ForLog("Cluster correctly created"), + }, + Started: true, + }) + require.NoError(t, err, "start valkey cluster container") + t.Cleanup(func() { _ = container.Terminate(ctx) }) + + host, err := container.Host(ctx) + require.NoError(t, err) + port, err := container.MappedPort(ctx, "6379") + require.NoError(t, err) + + store, err := NewValkeyClusterStore(ClusterConfig{ + Addrs: []string{fmt.Sprintf("%s:%s", host, port.Port())}, + GracePeriod: gracePeriod, + }) + require.NoError(t, err, "create cluster store") + return store +} +``` + +- [ ] **Step 2: Add `TestValkeyClusterStore_Integration_RoundTrip`** + +```go +func TestValkeyClusterStore_Integration_RoundTrip(t *testing.T) { + store := setupValkeyCluster(t, time.Hour) + ctx := context.Background() + + pubKey := bytes.Repeat([]byte{0xAB}, 65) + privKey := bytes.Repeat([]byte{0xCD}, 32) + pair := RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey} + + ver, err := store.Set(ctx, "room-1", pair) + require.NoError(t, err) + assert.Equal(t, 0, ver) + + got, err := store.Get(ctx, "room-1") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, 0, got.Version) + assert.Equal(t, pubKey, got.KeyPair.PublicKey) + assert.Equal(t, privKey, got.KeyPair.PrivateKey) + + require.NoError(t, store.Delete(ctx, "room-1")) + + got, err = store.Get(ctx, "room-1") + require.NoError(t, err) + assert.Nil(t, got) +} +``` + +- [ ] **Step 3: Add `TestValkeyClusterStore_Integration_RotateRoundTrip`** + +```go +func TestValkeyClusterStore_Integration_RotateRoundTrip(t *testing.T) { + store := setupValkeyCluster(t, time.Hour) + ctx := context.Background() + + oldPub := bytes.Repeat([]byte{0xAA}, 65) + oldPriv := bytes.Repeat([]byte{0xBB}, 32) + newPub := bytes.Repeat([]byte{0xCC}, 65) + newPriv := bytes.Repeat([]byte{0xDD}, 32) + + ver, err := store.Set(ctx, "room-rot", RoomKeyPair{PublicKey: oldPub, PrivateKey: oldPriv}) + require.NoError(t, err) + assert.Equal(t, 0, ver) + + ver, err = store.Rotate(ctx, "room-rot", RoomKeyPair{PublicKey: newPub, PrivateKey: newPriv}) + require.NoError(t, err) + assert.Equal(t, 1, ver) + + got, err := store.Get(ctx, "room-rot") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, 1, got.Version) + assert.Equal(t, newPub, got.KeyPair.PublicKey) + + oldPair, err := store.GetByVersion(ctx, "room-rot", 0) + require.NoError(t, err) + require.NotNil(t, oldPair) + assert.Equal(t, oldPub, oldPair.PublicKey) +} +``` + +- [ ] **Step 4: Add `TestValkeyClusterStore_Integration_HashTagSlotConsistency`** + +```go +func TestValkeyClusterStore_Integration_HashTagSlotConsistency(t *testing.T) { + // Verifies that both Valkey keys for a room hash to the same cluster slot, + // which is required for the Lua rotate script to execute without CROSSSLOT error. + store := setupValkeyCluster(t, time.Hour) + ctx := context.Background() + + // Set a key so both slots are populated, then verify via CLUSTER KEYSLOT. + _, err := store.Set(ctx, "room-slot-test", RoomKeyPair{ + PublicKey: bytes.Repeat([]byte{0x01}, 65), + PrivateKey: bytes.Repeat([]byte{0x02}, 32), + }) + require.NoError(t, err) + + // Use the underlying ClusterClient to call CLUSTER KEYSLOT directly. + vs := store.(*valkeyStore) + ca := vs.client.(*clusterAdapter) + slot1, err := ca.c.Do(ctx, "CLUSTER", "KEYSLOT", roomkey("room-slot-test")).Int() + require.NoError(t, err) + slot2, err := ca.c.Do(ctx, "CLUSTER", "KEYSLOT", roomprevkey("room-slot-test")).Int() + require.NoError(t, err) + + assert.Equal(t, slot1, slot2, "current and previous key must be on the same cluster slot") +} +``` + +- [ ] **Step 5: Run cluster integration tests** + +```bash +make test-integration SERVICE=pkg/roomkeystore +``` + +Expected: all three new cluster tests pass alongside the existing standalone tests. + +- [ ] **Step 6: Commit** + +```bash +git add pkg/roomkeystore/integration_test.go +git commit -m "test(roomkeystore): add cluster integration tests with bitnami/valkey-cluster" +``` + +--- + +## Task 5: `pkg/valkeyutil` cluster support + +**Files:** +- Modify: `pkg/valkeyutil/valkey.go` +- Modify: `pkg/valkeyutil/valkey_test.go` + +- [ ] **Step 1: Add `clusterRedisClient` and `ConnectCluster` to `valkey.go`** + +```go +type clusterRedisClient struct { + c *redis.ClusterClient +} + +func (r *clusterRedisClient) Get(ctx context.Context, key string) (string, error) { + val, err := r.c.Get(ctx, key).Result() + if errors.Is(err, redis.Nil) { + return "", ErrCacheMiss + } + if err != nil { + return "", fmt.Errorf("valkey get: %w", err) + } + return val, nil +} + +func (r *clusterRedisClient) Set(ctx context.Context, key, value string, ttl time.Duration) error { + return r.c.Set(ctx, key, value, ttl).Err() +} + +func (r *clusterRedisClient) Del(ctx context.Context, keys ...string) error { + return r.c.Del(ctx, keys...).Err() +} + +func (r *clusterRedisClient) Close() error { + return r.c.Close() +} + +// ConnectCluster dials a Valkey cluster, verifies connectivity with PING, +// and returns a Client. Seed addresses are discovered via CLUSTER SLOTS so +// a single address is sufficient; listing all masters is more robust. +func ConnectCluster(ctx context.Context, addrs []string, password string) (Client, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + Password: password, + }) + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := c.Ping(pingCtx).Err(); err != nil { + if closeErr := c.Close(); closeErr != nil { + slog.Warn("valkey cluster close after failed connect", "error", closeErr) + } + return nil, fmt.Errorf("valkey cluster connect: %w", err) + } + slog.Info("connected to Valkey cluster", "addrs", addrs) + return &clusterRedisClient{c: c}, nil +} +``` + +- [ ] **Step 2: Run existing unit tests — confirm they pass** + +```bash +make test SERVICE=pkg/valkeyutil +``` + +- [ ] **Step 3: Add `TestConnectCluster_Integration` to `valkey_test.go`** + +```go +//go:build integration + +func TestConnectCluster_Integration_RoundTrip(t *testing.T) { + ctx := context.Background() + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: testimages.ValkeyCluster, + ExposedPorts: []string{"6379/tcp"}, + Env: map[string]string{ + "VALKEY_CLUSTER_REPLICAS": "0", + "ALLOW_EMPTY_PASSWORD": "yes", + }, + WaitingFor: wait.ForLog("Cluster correctly created"), + }, + Started: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = container.Terminate(ctx) }) + + host, err := container.Host(ctx) + require.NoError(t, err) + port, err := container.MappedPort(ctx, "6379") + require.NoError(t, err) + + client, err := valkeyutil.ConnectCluster(ctx, + []string{fmt.Sprintf("%s:%s", host, port.Port())}, "") + require.NoError(t, err) + defer valkeyutil.Disconnect(client) + + type payload struct{ V string } + require.NoError(t, valkeyutil.SetJSONWithTTL(ctx, client, "k1", payload{V: "hello"}, time.Minute)) + + var got payload + require.NoError(t, valkeyutil.GetJSON(ctx, client, "k1", &got)) + assert.Equal(t, "hello", got.V) + + require.NoError(t, client.Del(ctx, "k1")) + err = valkeyutil.GetJSON(ctx, client, "k1", &got) + assert.ErrorIs(t, err, valkeyutil.ErrCacheMiss) +} +``` + +- [ ] **Step 4: Run integration test** + +```bash +make test-integration SERVICE=pkg/valkeyutil +``` + +- [ ] **Step 5: Lint** + +```bash +make lint +``` + +- [ ] **Step 6: Commit** + +```bash +git add pkg/valkeyutil/valkey.go pkg/valkeyutil/valkey_test.go +git commit -m "feat(valkeyutil): add clusterRedisClient and ConnectCluster for cluster mode" +``` + +--- + +## Task 6: Migrate service configs to `VALKEY_ADDRS` + +**Files:** +- Modify: `room-service/main.go` +- Modify: `room-worker/main.go` +- Modify: `broadcast-worker/main.go` +- Modify: `history-service/cmd/main.go` +- Modify: `search-service/main.go` + +For each service, three changes: +1. Config field: `ValkeyAddr string` → `ValkeyAddrs []string` with `env:"VALKEY_ADDRS,required" envSeparator:","` +2. Validation: empty check becomes `len(cfg.ValkeyAddrs) == 0` +3. Constructor call: `NewValkeyStore(Config{Addr: ...})` → `NewValkeyClusterStore(ClusterConfig{Addrs: ...})` + +- [ ] **Step 1: Update `room-service/main.go`** + +```go +// Config change +ValkeyAddrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + +// Constructor change +keyStore, err := roomkeystore.NewValkeyClusterStore(roomkeystore.ClusterConfig{ + Addrs: cfg.ValkeyAddrs, + Password: cfg.ValkeyPassword, + GracePeriod: cfg.ValkeyGracePeriod, +}) +``` + +- [ ] **Step 2: Update `room-worker/main.go`** — same pattern + +- [ ] **Step 3: Update `broadcast-worker/main.go`** — same pattern + +- [ ] **Step 4: Update `history-service/cmd/main.go`** — same pattern + +- [ ] **Step 5: Update `search-service/main.go`** + +```go +// Config change +Addrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + +// Constructor change +valkey, err := valkeyutil.ConnectCluster(ctx, cfg.Valkey.Addrs, cfg.Valkey.Password) +``` + +- [ ] **Step 6: Build all affected services to confirm compilation** + +```bash +make build SERVICE=room-service +make build SERVICE=room-worker +make build SERVICE=broadcast-worker +make build SERVICE=history-service +make build SERVICE=search-service +``` + +- [ ] **Step 7: Run unit tests for all affected services** + +```bash +make test SERVICE=room-service +make test SERVICE=room-worker +make test SERVICE=broadcast-worker +make test SERVICE=history-service +make test SERVICE=search-service +``` + +- [ ] **Step 8: Lint** + +```bash +make lint +``` + +- [ ] **Step 9: Commit** + +```bash +git add room-service/main.go room-worker/main.go broadcast-worker/main.go \ + history-service/cmd/main.go search-service/main.go +git commit -m "feat: migrate all services from VALKEY_ADDR to VALKEY_ADDRS cluster config" +``` + +--- + +## Task 7: Update docker-compose files + +**Files:** +- Modify: `room-service/deploy/docker-compose.yml` +- Modify: `room-worker/deploy/docker-compose.yml` +- Modify: `broadcast-worker/deploy/docker-compose.yml` +- Modify: `history-service/deploy/docker-compose.yml` +- Modify: `search-service/deploy/docker-compose.yml` + +For each docker-compose file: +1. Replace the single `valkey` service with `bitnami/valkey-cluster:8` +2. Replace `VALKEY_ADDR=valkey:6379` with `VALKEY_ADDRS=valkey-cluster:6379` + +- [ ] **Step 1: Update each docker-compose.yml** + +Replace: +```yaml +valkey: + image: valkey/valkey:8 + ports: + - "6379:6379" +``` + +With: +```yaml +valkey-cluster: + image: bitnami/valkey-cluster:8 + environment: + - VALKEY_CLUSTER_REPLICAS=0 + - ALLOW_EMPTY_PASSWORD=yes + ports: + - "6379:6379" +``` + +And replace: +```yaml +- VALKEY_ADDR=valkey:6379 +``` + +With: +```yaml +- VALKEY_ADDRS=valkey-cluster:6379 +``` + +Update any `depends_on: valkey` references to `depends_on: valkey-cluster`. + +- [ ] **Step 2: Commit** + +```bash +git add room-service/deploy/docker-compose.yml \ + room-worker/deploy/docker-compose.yml \ + broadcast-worker/deploy/docker-compose.yml \ + history-service/deploy/docker-compose.yml \ + search-service/deploy/docker-compose.yml +git commit -m "chore: replace single-node Valkey with bitnami/valkey-cluster in docker-compose files" +``` + +--- + +## Task 8: Room key ensure RPC — model + subject (Red) + +**Files:** +- Modify: `pkg/model/room.go` +- Modify: `pkg/subject/subject.go` + +- [ ] **Step 1: Add `RoomKeyEnsureRequest` to `pkg/model/room.go`** + +```go +// RoomKeyEnsureRequest is the request payload for the room key ensure RPC. +// The RPC returns an existing key if one exists, or generates and stores a +// new one if the room has no key yet. +type RoomKeyEnsureRequest struct { + RoomID string `json:"roomId" bson:"roomId"` +} +``` + +- [ ] **Step 2: Add `RoomKeyEnsure` to `pkg/subject/subject.go`** + +```go +// RoomKeyEnsure returns the NATS subject for the room key ensure RPC. +// External services use this subject to get or generate a room key. +func RoomKeyEnsure(siteID string) string { + return fmt.Sprintf("chat.server.request.room.%s.key.ensure", siteID) +} +``` + +- [ ] **Step 3: Compile check** + +```bash +make build SERVICE=pkg/model +make build SERVICE=pkg/subject +``` + +- [ ] **Step 4: Commit** + +```bash +git add pkg/model/room.go pkg/subject/subject.go +git commit -m "feat: add RoomKeyEnsureRequest model and RoomKeyEnsure subject" +``` + +--- + +## Task 9: Room key ensure RPC — failing tests (Red) + +**Files:** +- Modify: `room-service/handler_test.go` + +Write the tests before the implementation so they fail first. + +- [ ] **Step 1: Add table-driven tests for `NatsHandleEnsureRoomKey` in `handler_test.go`** + +```go +func TestHandler_NatsHandleEnsureRoomKey(t *testing.T) { + pubKey := bytes.Repeat([]byte{0xAB}, 65) + privKey := bytes.Repeat([]byte{0xCD}, 32) + existingPair := &roomkeystore.VersionedKeyPair{ + Version: 2, + KeyPair: roomkeystore.RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey}, + } + + tests := []struct { + name string + payload []byte + setupMock func(store *MockRoomKeyStore) + wantErr bool + errContains string + checkReply func(t *testing.T, reply []byte) + }{ + { + name: "key exists — returns existing RoomKeyEvent, no Set called", + payload: mustMarshal(model.RoomKeyEnsureRequest{RoomID: "room-1"}), + setupMock: func(s *MockRoomKeyStore) { + s.EXPECT().Get(gomock.Any(), "room-1").Return(existingPair, nil) + // Set must NOT be called + }, + checkReply: func(t *testing.T, reply []byte) { + var evt model.RoomKeyEvent + require.NoError(t, json.Unmarshal(reply, &evt)) + assert.Equal(t, "room-1", evt.RoomID) + assert.Equal(t, 2, evt.Version) + assert.Equal(t, pubKey, evt.PublicKey) + assert.Equal(t, privKey, evt.PrivateKey) + }, + }, + { + name: "key missing — generates and sets new key, returns RoomKeyEvent", + payload: mustMarshal(model.RoomKeyEnsureRequest{RoomID: "room-2"}), + setupMock: func(s *MockRoomKeyStore) { + s.EXPECT().Get(gomock.Any(), "room-2").Return(nil, nil) + s.EXPECT().Set(gomock.Any(), "room-2", gomock.Any()).Return(0, nil) + }, + checkReply: func(t *testing.T, reply []byte) { + var evt model.RoomKeyEvent + require.NoError(t, json.Unmarshal(reply, &evt)) + assert.Equal(t, "room-2", evt.RoomID) + assert.Equal(t, 0, evt.Version) + assert.Len(t, evt.PublicKey, 65) + assert.Len(t, evt.PrivateKey, 32) + }, + }, + { + name: "Get error — replies with error, no Set called", + payload: mustMarshal(model.RoomKeyEnsureRequest{RoomID: "room-3"}), + setupMock: func(s *MockRoomKeyStore) { + s.EXPECT().Get(gomock.Any(), "room-3").Return(nil, errors.New("valkey down")) + }, + wantErr: true, + errContains: "get room key", + }, + { + name: "Set error — replies with error", + payload: mustMarshal(model.RoomKeyEnsureRequest{RoomID: "room-4"}), + setupMock: func(s *MockRoomKeyStore) { + s.EXPECT().Get(gomock.Any(), "room-4").Return(nil, nil) + s.EXPECT().Set(gomock.Any(), "room-4", gomock.Any()).Return(0, errors.New("valkey write failed")) + }, + wantErr: true, + errContains: "store room key", + }, + { + name: "malformed payload — replies with error", + payload: []byte(`{bad json`), + setupMock: func(s *MockRoomKeyStore) {}, + wantErr: true, + errContains: "decode", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStore := NewMockRoomKeyStore(ctrl) + tt.setupMock(mockStore) + + h := newTestHandler(mockStore) // use existing test helper + reply, err := h.handleEnsureRoomKey(context.Background(), tt.payload) + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + return + } + require.NoError(t, err) + tt.checkReply(t, reply) + }) + } +} +``` + +- [ ] **Step 2: Run tests and confirm they fail (Red)** + +```bash +make test SERVICE=room-service +``` + +Expected: compile error or test failure — `handleEnsureRoomKey` does not exist yet. + +--- + +## Task 10: Room key ensure RPC — implementation (Green) + +**Files:** +- Modify: `room-service/handler.go` + +- [ ] **Step 1: Add `handleEnsureRoomKey` internal method** + +```go +func (h *Handler) handleEnsureRoomKey(ctx context.Context, data []byte) ([]byte, error) { + var req model.RoomKeyEnsureRequest + if err := json.Unmarshal(data, &req); err != nil { + return nil, fmt.Errorf("decode room key ensure request: %w", err) + } + + pair, err := h.keyStore.Get(ctx, req.RoomID) + if err != nil { + return nil, fmt.Errorf("get room key: %w", err) + } + + if pair == nil { + newPair, err := roomkeystore.GenerateKeyPair() + if err != nil { + return nil, fmt.Errorf("generate room key: %w", err) + } + ver, err := h.keyStore.Set(ctx, req.RoomID, newPair) + if err != nil { + return nil, fmt.Errorf("store room key: %w", err) + } + pair = &roomkeystore.VersionedKeyPair{Version: ver, KeyPair: newPair} + } + + evt := model.RoomKeyEvent{ + RoomID: req.RoomID, + Version: pair.Version, + PublicKey: pair.KeyPair.PublicKey, + PrivateKey: pair.KeyPair.PrivateKey, + Timestamp: time.Now().UTC().UnixMilli(), + } + return json.Marshal(evt) +} +``` + +- [ ] **Step 2: Add exported `NatsHandleEnsureRoomKey` NATS wrapper** + +```go +func (h *Handler) NatsHandleEnsureRoomKey(msg *nats.Msg) { + ctx := natsutil.ContextWithRequestIDFromHeaders(context.Background(), msg.Header) + reply, err := h.handleEnsureRoomKey(ctx, msg.Data) + if err != nil { + natsutil.ReplyError(msg, err) + return + } + if err := msg.Respond(reply); err != nil { + slog.ErrorContext(ctx, "respond to room key ensure", "error", err) + } +} +``` + +- [ ] **Step 3: Register in `RegisterCRUD`** + +```go +if _, err := nc.QueueSubscribe(subject.RoomKeyEnsure(h.siteID), queue, h.NatsHandleEnsureRoomKey); err != nil { + return fmt.Errorf("subscribe room key ensure: %w", err) +} +``` + +- [ ] **Step 4: Run tests and confirm they pass (Green)** + +```bash +make test SERVICE=room-service +``` + +Expected: all `TestHandler_NatsHandleEnsureRoomKey` subtests pass. + +- [ ] **Step 5: Check coverage** + +```bash +cd /home/user/chat && go test -race -coverprofile=coverage.out ./room-service/... && go tool cover -func=coverage.out | grep handler +``` + +Expected: `handler.go` coverage ≥ 80%. + +- [ ] **Step 6: Lint** + +```bash +make lint +``` + +- [ ] **Step 7: Commit** + +```bash +git add room-service/handler.go room-service/handler_test.go +git commit -m "feat(room-service): add NatsHandleEnsureRoomKey RPC for external connector key access" +``` + +--- + +## Task 11: Final verification + +- [ ] **Step 1: Run all unit tests** + +```bash +make test +``` + +Expected: all pass. + +- [ ] **Step 2: Run integration tests for changed packages** + +```bash +make test-integration SERVICE=pkg/roomkeystore +make test-integration SERVICE=pkg/valkeyutil +make test-integration SERVICE=room-service +``` + +- [ ] **Step 3: Final lint** + +```bash +make lint +``` + +- [ ] **Step 4: Push** + +```bash +git push -u origin +``` diff --git a/docs/superpowers/specs/2026-05-19-valkey-cluster-support-design.md b/docs/superpowers/specs/2026-05-19-valkey-cluster-support-design.md new file mode 100644 index 000000000..606ebddd3 --- /dev/null +++ b/docs/superpowers/specs/2026-05-19-valkey-cluster-support-design.md @@ -0,0 +1,428 @@ +# Valkey Cluster Support — Design Spec + +**Date:** 2026-05-19 +**Status:** Draft +**Scope:** `pkg/roomkeystore`, `pkg/valkeyutil`, `room-service`, `room-worker`, `broadcast-worker`, `history-service`, `search-service`, per-site `deploy/docker-compose.yml` + +--- + +## Overview + +Replace the single-node Valkey instance used by each site with a Valkey cluster (3-node minimum). Every site (`ftest`, `f18-dev`, production, etc.) runs its own independent Valkey cluster. All services on a site point at that site's cluster. The single-node Valkey instance is fully retired — cluster mode is the only supported deployment going forward. + +This spec covers the code changes required to make `pkg/roomkeystore` and `pkg/valkeyutil` work against a Valkey cluster, the service-level config changes to replace `VALKEY_ADDR` (single address) with `VALKEY_ADDRS` (cluster seed addresses), and the per-site docker-compose changes to run a cluster instead of a single node. + +--- + +## Motivation + +Room encryption keys stored in `pkg/roomkeystore` are critical data. If the single Valkey node goes down and restarts without restoring its data, every room on that site loses its key — subsequent `broadcast-worker` encryptions fail and clients cannot decrypt messages until keys are regenerated out of band. A Valkey cluster tolerates individual node failures without data loss, eliminating this operational risk. + +The search subscription cache (`search-service` via `pkg/valkeyutil`) is less critical — it is ephemeral and rebuilds on cache miss — but running it on the same cluster keeps the infrastructure uniform across services and sites. + +--- + +## Architecture + +Each site runs **one Valkey cluster** shared by all services on that site. The cluster has a minimum of 3 master nodes (the Valkey cluster protocol requires at least 3 masters to elect a new primary after a failure). Replicas per master are a deployment decision; 0 replicas is acceptable for non-production sites. + +```text +site: ftest + ├── valkey-cluster (3 masters, 0 replicas) + │ node-1:6379 + │ node-2:6380 + │ node-3:6381 + ├── room-service → VALKEY_ADDRS=node-1:6379,node-2:6380,node-3:6381 + ├── room-worker → VALKEY_ADDRS=node-1:6379,node-2:6380,node-3:6381 + ├── broadcast-worker → VALKEY_ADDRS=node-1:6379,node-2:6380,node-3:6381 + ├── history-service → VALKEY_ADDRS=node-1:6379,node-2:6380,node-3:6381 + └── search-service → VALKEY_ADDRS=node-1:6379,node-2:6380,node-3:6381 +``` + +Sites are fully independent — no cross-site Valkey connection exists or is introduced by this spec. + +--- + +## Change 1: Key Name Hash Tags — `pkg/roomkeystore` + +### The Problem + +The Lua rotate script in `adapter.go` operates on two keys per room in a single atomic call: + +```text +room:abc123:key +room:abc123:key:prev +``` + +In Valkey cluster mode, every key is assigned to one of 16384 slots based on a CRC16 hash of the key name. A Lua script that touches keys on different slots is rejected with `CROSSSLOT`. Without hash tags, `room:abc123:key` and `room:abc123:key:prev` hash to different slots. + +The `deletePipeline` also issues a single `DEL` on both keys — same constraint applies. + +### The Fix + +Add a hash tag `{roomID}` to both key names. Valkey uses only the substring inside `{...}` for slot assignment, so both keys for the same room always land on the same slot regardless of the `roomID` value. + +```go +// Current +func roomkey(roomID string) string { return "room:" + roomID + ":key" } +func roomprevkey(roomID string) string { return "room:" + roomID + ":key:prev" } + +// After +func roomkey(roomID string) string { return "room:{" + roomID + "}:key" } +func roomprevkey(roomID string) string { return "room:{" + roomID + "}:key:prev" } +``` + +### Migration Impact + +This is a **breaking change for any existing Valkey data**. Keys written under the old format (`room:abc123:key`) are not found after this change because the key name is different. Services will behave as if no key exists for that room — `keyStore.Get` returns `(nil, nil)`. + +Rollout assumption for this spec: **all sites deploying cluster mode start with a fresh Valkey cluster**. No migration of old standalone keys is required. Rooms that were created before this change will have their keys regenerated the next time a member-remove triggers rotation, or via the existing `ErrNoCurrentKey` → `Set` fallback in `room-service`. + +Production sites upgrading from standalone to cluster must account for this: either accept a key-loss window (rooms enter degraded state until next operation triggers regeneration) or run a backfill tool before switching. Backfill tooling is out of scope for this spec. + +--- + +## Change 2: Cluster Adapter — `pkg/roomkeystore` + +### `ClusterConfig` + +A new config struct alongside the existing `Config`: + +```go +// ClusterConfig holds connection config for a Valkey cluster deployment. +// Addrs is a comma-separated list of seed node addresses; the go-redis +// ClusterClient discovers all nodes automatically via CLUSTER SLOTS. +// One address is sufficient but listing all masters is more robust. +type ClusterConfig struct { + Addrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + Password string `env:"VALKEY_PASSWORD" envDefault:""` + GracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` +} +``` + +The existing `Config` (single `Addr string`) is retained unchanged for standalone deployments. + +### `clusterAdapter` + +A new adapter wrapping `*redis.ClusterClient` that satisfies the existing `hashCommander` interface: + +```go +type clusterAdapter struct { + c *redis.ClusterClient +} +``` + +All method signatures are identical to `redisAdapter`. `redis.ClusterClient` exposes the same command API as `redis.Client` (`HSet`, `HGetAll`, `Del`, `Pipeline`, `NewScript().Run`) so the implementation is a direct parallel — the only difference is the underlying client type. + +The Lua `rotateScript` is registered the same way via `redis.NewScript` and executes correctly on `ClusterClient` as long as both keys are hash-tagged to the same slot (guaranteed by Change 1). + +### `NewValkeyClusterStore` + +```go +func NewValkeyClusterStore(cfg ClusterConfig) (RoomKeyStore, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: cfg.Addrs, + Password: cfg.Password, + }) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("valkey cluster connect: %w", err) + } + return &valkeyStore{ + client: &clusterAdapter{c: c}, + closer: c, + gracePeriod: cfg.GracePeriod, + }, nil +} +``` + +`valkeyStore` and all its methods (`Set`, `Get`, `GetMany`, `GetByVersion`, `Rotate`, `Delete`, `SetWithVersion`, `Close`) are **unchanged**. The cluster is entirely transparent at the `valkeyStore` level. + +### `GetMany` pipeline in cluster mode + +`hgetallMany` issues a pipeline of `HGETALL` commands. `go-redis` `ClusterClient` handles cross-slot pipelines automatically — it groups commands by slot and issues separate round-trips to each node in parallel, then reassembles results in order. Because each `HGETALL room:{roomID}:key` touches only one key per room, this works correctly in cluster mode without any code change to `hgetallMany`. + +--- + +## Change 3: `pkg/valkeyutil` Cluster Support + +`search-service` connects via `valkeyutil.Connect` which internally uses `redis.NewClient` (single-node only). A replacement `ConnectCluster` function is added that uses `redis.NewClusterClient`. + +`redis.ClusterClient` and `redis.Client` are distinct types in `go-redis/v9` — they cannot share the same wrapper struct. A new unexported `clusterRedisClient` wrapper is introduced alongside the existing `redisClient`, both satisfying the same `Client` interface: + +```go +type clusterRedisClient struct { + c *redis.ClusterClient +} + +func (r *clusterRedisClient) Get(ctx context.Context, key string) (string, error) +func (r *clusterRedisClient) Set(ctx context.Context, key, value string, ttl time.Duration) error +func (r *clusterRedisClient) Del(ctx context.Context, keys ...string) error +func (r *clusterRedisClient) Close() error +``` + +`ConnectCluster` constructs a `clusterRedisClient`: + +```go +func ConnectCluster(ctx context.Context, addrs []string, password string) (Client, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + Password: password, + }) + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := c.Ping(pingCtx).Err(); err != nil { + if closeErr := c.Close(); closeErr != nil { + slog.Warn("valkey cluster close after failed connect", "error", closeErr) + } + return nil, fmt.Errorf("valkey cluster connect: %w", err) + } + slog.Info("connected to Valkey cluster", "addrs", addrs) + return &clusterRedisClient{c: c}, nil +} +``` + +`valkeyutil.Connect` (standalone) is removed from all service call sites. `ConnectCluster` is the only connection path going forward. + +--- + +## Change 4: Service Config Changes + +Each service switches from `VALKEY_ADDR` (single address) to `VALKEY_ADDRS` (comma-separated list). The `env` tag uses `envSeparator:","` to parse into `[]string`. Services call `NewValkeyClusterStore` instead of `NewValkeyStore`. + +| Service | Config field change | Constructor change | +|---|---|---| +| `room-service` | `ValkeyAddr string` → `ValkeyAddrs []string` | `NewValkeyStore` → `NewValkeyClusterStore` | +| `room-worker` | `ValkeyAddr string` → `ValkeyAddrs []string` | `NewValkeyStore` → `NewValkeyClusterStore` | +| `broadcast-worker` | `ValkeyAddr string` → `ValkeyAddrs []string` | `NewValkeyStore` → `NewValkeyClusterStore` | +| `history-service` | `ValkeyAddr string` → `ValkeyAddrs []string` | `NewValkeyStore` → `NewValkeyClusterStore` | +| `search-service` | `Valkey.Addr string` → `Valkey.Addrs []string` | `valkeyutil.Connect` → `valkeyutil.ConnectCluster` | + +Validation: services that currently fail-fast on empty `VALKEY_ADDR` now fail-fast on empty `VALKEY_ADDRS` (zero-length slice). + +--- + +## Change 5: Per-Site Docker Compose + +Each service's `deploy/docker-compose.yml` currently declares a single Valkey node. This is replaced with a `bitnami/valkey-cluster` service that initialises a 3-master cluster internally. + +```yaml +valkey-cluster: + image: bitnami/valkey-cluster:8 + environment: + - VALKEY_CLUSTER_REPLICAS=0 + - ALLOW_EMPTY_PASSWORD=yes + ports: + - "6379:6379" +``` + +Services that previously had `VALKEY_ADDR=valkey:6379` are updated to: +```yaml +VALKEY_ADDRS=valkey-cluster:6379 +``` + +One seed address is sufficient — `ClusterClient` calls `CLUSTER SLOTS` on connect and discovers all nodes automatically. + +--- + +## Error Handling + +No new error types are introduced. Existing error wrapping conventions apply: + +- `NewValkeyClusterStore` ping failure: `fmt.Errorf("valkey cluster connect: %w", err)` +- All `valkeyStore` method errors are unchanged — they wrap at the `hashCommander` level, cluster vs standalone is invisible above that layer +- `ConnectCluster` ping failure: `fmt.Errorf("valkey cluster connect: %w", err)` — consistent with standalone `Connect`'s `"valkey connect: %w"` pattern + +--- + +## Configuration + +### New env vars + +| Env var | Replaces | Services | Description | +|---|---|---|---| +| `VALKEY_ADDRS` | `VALKEY_ADDR` | all | Comma-separated cluster seed addresses e.g. `node1:6379,node2:6380,node3:6381` | + +`VALKEY_PASSWORD` and `VALKEY_KEY_GRACE_PERIOD` are unchanged. + +### Replacing `VALKEY_ADDR` + +`VALKEY_ADDR` is removed from all service configs and all service `main.go` files. It is not kept as a fallback. `VALKEY_ADDRS` is the only Valkey configuration going forward. The standalone `NewValkeyStore` and `valkeyutil.Connect` constructors remain in the codebase (they are valid library functions) but are no longer called by any service. + +--- + +## Testing + +### Unit tests — no change + +All existing unit tests in `pkg/roomkeystore/roomkeystore_test.go` use the `fakeHashClient` test double which is independent of the real client type. No changes needed. + +### Integration tests — `pkg/roomkeystore/integration_test.go` + +A new `setupValkeyCluster` helper alongside the existing `setupValkey`: + +```go +func setupValkeyCluster(t *testing.T, gracePeriod time.Duration) RoomKeyStore { + // starts bitnami/valkey-cluster:8 with VALKEY_CLUSTER_REPLICAS=0 + // waits for "Cluster correctly created" log line + // calls NewValkeyClusterStore with the mapped port as seed address +} +``` + +New cluster-specific tests: +- `TestValkeyClusterStore_Integration_RoundTrip` — Set → Get → Delete +- `TestValkeyClusterStore_Integration_RotateRoundTrip` — Set → Rotate → Get + GetByVersion +- `TestValkeyClusterStore_Integration_HashTagSlotConsistency` — verifies both key slots are the same (uses `CLUSTER KEYSLOT` command to assert `room:{x}:key` and `room:{x}:key:prev` hash to identical slots) + +### Integration tests — `pkg/valkeyutil` + +A new `TestConnectCluster_Integration` test using a `bitnami/valkey-cluster:8` container: connect → `SetJSONWithTTL` → `GetJSON` → `Del` round-trip. + +### Image constant — `pkg/testutil/testimages/testimages.go` + +```go +// ValkeyCluster is the image for cluster-mode Valkey integration tests. +ValkeyCluster = "bitnami/valkey-cluster:8" +``` + +### Coverage target + +≥ 90% for new code in `pkg/roomkeystore` and `pkg/valkeyutil`. Cluster integration tests run under `//go:build integration` tag. + +--- + +## Change 6: Room Key Ensure RPC — `room-service` + +### Background + +The existing key system is entirely push-based. Keys are generated inside `room-service` at room creation time and fanned out to clients via `room-worker`. No external service can ask for a key — there is no NATS subject that accepts a room ID and returns its key. + +This works for the normal room lifecycle but leaves a gap for external services that operate outside that lifecycle. Specifically, a connector syncing room data from an external MongoDB collection into this system needs the public key for a room to encrypt data it is writing. That connector is not part of this repo but it needs a standard entry point in `room-service` to request a key. + +### What the RPC Does + +A new NATS request/reply handler in `room-service`: + +- **Subject:** `chat.server.request.room.{siteID}.key.ensure` +- **Request payload:** `RoomKeyEnsureRequest{ RoomID string }` +- **Reply payload (success):** `model.RoomKeyEvent{ RoomID, Version, PublicKey, PrivateKey, Timestamp }` +- **Reply payload (error):** `model.ErrorResponse` via `natsutil.ReplyError` + +**Idempotent by design:** if a key already exists in Valkey for the room, it is returned immediately without generating a new one. If no key exists (backfill case), a new key pair is generated, stored in Valkey via `keyStore.Set`, and then returned. + +```text +external connector + │ NATS request: chat.server.request.room.{siteID}.key.ensure + │ payload: { roomId: "abc123" } + ▼ +room-service (NatsHandleEnsureRoomKey) + 1. keyStore.Get(roomID) + → key exists: reply RoomKeyEvent immediately + → nil: GenerateKeyPair() + keyStore.Set() + reply RoomKeyEvent + ▼ +connector receives: { roomId, version, publicKey, privateKey, timestamp } +``` + +### Why `room-service` and not `room-worker` + +Key generation must stay in `room-service` to preserve the single-rotator invariant — only `room-service` calls `GenerateKeyPair` and `keyStore.Set`. `room-worker` only reads keys (via `Get`) and fans them out. Putting this handler in `room-service` is consistent with how `handleCreateRoom` and `handleRemoveMember` already manage key generation. + +### Important: PublicKey is included in the RPC reply + +The normal `buildAndFanOutRoomKey` in `room-worker` intentionally omits `PublicKey` from the `RoomKeyEvent` sent to clients — the public key is server-side only, used by `broadcast-worker` for encryption. However, this RPC is **server-to-server**, not server-to-client. The external connector needs the public key to encrypt data it is writing. The full key pair is therefore returned in the reply. + +### No fan-out from this RPC + +This RPC only ensures the key is in Valkey and returns it to the caller. It does not publish to the ROOMS stream and does not trigger `room-worker` to fan out `RoomKeyEvent` to room members. Fan-out is a room lifecycle concern handled by the existing create/add-member/remove-member flows. The connector uses the key for its own encryption purposes independently. + +### New model type — `pkg/model/room.go` + +```go +// RoomKeyEnsureRequest is the payload for the room key ensure RPC. +type RoomKeyEnsureRequest struct { + RoomID string `json:"roomId" bson:"roomId"` +} +``` + +Reply reuses the existing `model.RoomKeyEvent` (already has `RoomID`, `Version`, `PublicKey`, `PrivateKey`, `Timestamp`). + +### New subject — `pkg/subject/subject.go` + +```go +func RoomKeyEnsure(siteID string) string { + return fmt.Sprintf("chat.server.request.room.%s.key.ensure", siteID) +} +``` + +### Handler — `room-service/handler.go` + +New exported method `NatsHandleEnsureRoomKey` registered in `RegisterCRUD`. The handler: + +1. Decodes `RoomKeyEnsureRequest` from the NATS message +2. Calls `keyStore.Get(roomID)` +3. If key exists: builds and replies `model.RoomKeyEvent` immediately +4. If nil: calls `roomkeystore.GenerateKeyPair()` + `keyStore.Set(roomID, pair)`, then replies +5. On any error: replies via `natsutil.ReplyError` + +The handler extracts request ID from NATS headers via `natsutil.ContextWithRequestIDFromHeaders` before processing, consistent with all other handlers. + +### `room-service/store.go` — RoomKeyStore interface + +The consumer-side `RoomKeyStore` interface in `room-service/store.go` already includes `Get` and `Set`. No changes needed. + +### Testing + +`room-service/handler_test.go` — new table-driven test cases for `NatsHandleEnsureRoomKey`: + +| Scenario | Expected | +|---|---| +| Key exists in Valkey | returns existing `RoomKeyEvent`, no `Set` called | +| Key missing — happy path | `GenerateKeyPair` + `Set` called, returns new `RoomKeyEvent` | +| `keyStore.Get` error | replies with error, no `Set` called | +| `keyStore.Set` error on generation | replies with error | +| Malformed request payload | replies with error | + +Mock expectations updated via `make generate` after any interface change. + +--- + +## Files Changed + +| File | Change | +|---|---| +| `pkg/roomkeystore/roomkeystore.go` | Hash-tag `roomkey` and `roomprevkey` key name functions | +| `pkg/roomkeystore/adapter.go` | Add `clusterAdapter`, `ClusterConfig`, `NewValkeyClusterStore` | +| `pkg/roomkeystore/integration_test.go` | Add `setupValkeyCluster` + 3 cluster integration tests | +| `pkg/roomkeystore/roomkeystore_test.go` | Update key name assertions to expect hash-tagged format | +| `pkg/valkeyutil/valkey.go` | Add `clusterRedisClient`, `ConnectCluster`; `Connect` retained but no longer called by services | +| `pkg/valkeyutil/valkey_test.go` | Add `TestConnectCluster_Integration` | +| `pkg/testutil/testimages/testimages.go` | Add `ValkeyCluster` constant | +| `room-service/main.go` | `ValkeyAddr` → `ValkeyAddrs`, `NewValkeyStore` → `NewValkeyClusterStore` | +| `room-worker/main.go` | Same | +| `broadcast-worker/main.go` | Same | +| `history-service/cmd/main.go` | Same | +| `search-service/main.go` | `Valkey.Addr` → `Valkey.Addrs`, `Connect` → `ConnectCluster` | +| `room-service/deploy/docker-compose.yml` | Single node → `bitnami/valkey-cluster`, `VALKEY_ADDRS` | +| `room-worker/deploy/docker-compose.yml` | Same | +| `broadcast-worker/deploy/docker-compose.yml` | Same | +| `history-service/deploy/docker-compose.yml` | Same | +| `search-service/deploy/docker-compose.yml` | Same | +| `pkg/model/room.go` | Add `RoomKeyEnsureRequest` | +| `pkg/subject/subject.go` | Add `RoomKeyEnsure(siteID string) string` | +| `room-service/handler.go` | Add `NatsHandleEnsureRoomKey`, register in `RegisterCRUD` | +| `room-service/handler_test.go` | TDD tests for `NatsHandleEnsureRoomKey` | +| `room-service/mock_store_test.go` | Regenerated via `make generate` if interface changes | + +--- + +## Out of Scope + +- Migration tooling for existing standalone Valkey data to cluster key format +- Sentinel (HA without sharding) as an alternative — cluster mode is the chosen approach +- Per-purpose Valkey separation (keys vs cache on separate clusters) +- Production Kubernetes manifests — docker-compose covers local and ftest; production infra is managed separately +- Valkey persistence configuration (AOF/RDB) — required for production but an ops/infra concern, not a code concern +- Fan-out of `RoomKeyEvent` to room members from the ensure RPC — that is the room lifecycle's responsibility, not the connector's +- The connector implementation itself — it lives outside this repo and calls the RPC as a black box +- Client-side pull RPC for missed key events — deferred, noted in the existing room encryption keys spec diff --git a/pkg/model/event.go b/pkg/model/event.go index 125f9ad3c..1b1d6bf96 100644 --- a/pkg/model/event.go +++ b/pkg/model/event.go @@ -197,6 +197,19 @@ type RoomKeyEvent struct { Timestamp int64 `json:"timestamp" bson:"timestamp"` } +// RoomKeyEnsureRequest is the payload for the room key ensure RPC. +type RoomKeyEnsureRequest struct { + RoomID string `json:"roomId"` +} + +// RoomKeyEnsureResponse is the reply from the room key ensure RPC. It confirms +// the room has a key pair in Valkey at the given Version. Key bytes are not +// returned — the caller only needs to know the key exists. +type RoomKeyEnsureResponse struct { + RoomID string `json:"roomId"` + Version int `json:"version"` +} + type MemberRemoveEvent struct { Type string `json:"type" bson:"type"` RoomID string `json:"roomId" bson:"roomId"` diff --git a/pkg/model/model_test.go b/pkg/model/model_test.go index a77e330d2..7bd95abb4 100644 --- a/pkg/model/model_test.go +++ b/pkg/model/model_test.go @@ -882,6 +882,16 @@ func TestRoomKeyEventJSON(t *testing.T) { } } +func TestRoomKeyEnsureRequestJSON(t *testing.T) { + src := model.RoomKeyEnsureRequest{RoomID: "room-abc"} + roundTrip(t, &src, &model.RoomKeyEnsureRequest{}) +} + +func TestRoomKeyEnsureResponseJSON(t *testing.T) { + src := model.RoomKeyEnsureResponse{RoomID: "room-abc", Version: 3} + roundTrip(t, &src, &model.RoomKeyEnsureResponse{}) +} + func TestNotificationEventJSON(t *testing.T) { src := model.NotificationEvent{ Type: "new_message", diff --git a/pkg/roomkeystore/adapter.go b/pkg/roomkeystore/adapter.go index 4b1ff81eb..c3c4e600d 100644 --- a/pkg/roomkeystore/adapter.go +++ b/pkg/roomkeystore/adapter.go @@ -3,27 +3,27 @@ package roomkeystore import ( "context" "fmt" + "log/slog" "strconv" - "strings" "time" "github.com/redis/go-redis/v9" ) -// redisAdapter wraps *redis.Client to satisfy hashCommander. -type redisAdapter struct { - c *redis.Client +// clusterAdapter wraps *redis.ClusterClient to satisfy hashCommander. +type clusterAdapter struct { + c *redis.ClusterClient } -func (a *redisAdapter) hset(ctx context.Context, key string, pub, priv string) error { +func (a *clusterAdapter) hset(ctx context.Context, key string, pub, priv string) error { return a.c.HSet(ctx, key, "pub", pub, "priv", priv, "ver", "0").Err() } -func (a *redisAdapter) hsetWithVersion(ctx context.Context, key string, pub, priv string, version int) error { +func (a *clusterAdapter) hsetWithVersion(ctx context.Context, key string, pub, priv string, version int) error { return a.c.HSet(ctx, key, "pub", pub, "priv", priv, "ver", strconv.Itoa(version)).Err() } -func (a *redisAdapter) hgetall(ctx context.Context, key string) (map[string]string, error) { +func (a *clusterAdapter) hgetall(ctx context.Context, key string) (map[string]string, error) { return a.c.HGetAll(ctx, key).Result() } @@ -54,26 +54,34 @@ redis.call('HSET', currentKey, 'pub', newPub, 'priv', newPriv, 'ver', tostring(n return newVer `) -func (a *redisAdapter) rotatePipeline(ctx context.Context, currentKey, prevKey string, pub, priv string, gracePeriod time.Duration) (int, error) { +func (a *clusterAdapter) rotatePipeline(ctx context.Context, currentKey, prevKey string, pub, priv string, gracePeriod time.Duration) (int, error) { graceSec := int(gracePeriod.Seconds()) if graceSec < 1 { graceSec = 1 } result, err := rotateScript.Run(ctx, a.c, []string{currentKey, prevKey}, pub, priv, graceSec).Int() - if err != nil && strings.Contains(err.Error(), "no current key") { + if isLuaNoCurrentKeyErr(err) { return 0, ErrNoCurrentKey } return result, err } -func (a *redisAdapter) deletePipeline(ctx context.Context, currentKey, prevKey string) error { +// isLuaNoCurrentKeyErr reports whether err is the sentinel the rotate Lua +// script emits via redis.error_reply('no current key'). go-redis surfaces +// Lua error_reply values as untyped errors whose message equals the Lua +// string verbatim — typed error wrapping is not available at this boundary. +func isLuaNoCurrentKeyErr(err error) bool { + return err != nil && err.Error() == "no current key" +} + +func (a *clusterAdapter) deletePipeline(ctx context.Context, currentKey, prevKey string) error { return a.c.Del(ctx, currentKey, prevKey).Err() } // hgetallMany issues HGETALL for every key in a single pipeline and returns // one map per input key (in the same order). A missing hash yields an empty // map rather than an error, matching go-redis v9 HGetAll semantics. -func (a *redisAdapter) hgetallMany(ctx context.Context, keys []string) ([]map[string]string, error) { +func (a *clusterAdapter) hgetallMany(ctx context.Context, keys []string) ([]map[string]string, error) { if len(keys) == 0 { return nil, nil } @@ -96,20 +104,41 @@ func (a *redisAdapter) hgetallMany(ctx context.Context, keys []string) ([]map[st return out, nil } -func (a *redisAdapter) closeClient() error { +func (a *clusterAdapter) closeClient() error { return a.c.Close() } -// NewValkeyStore creates a valkeyStore, pings Valkey to verify connectivity, and returns it. -func NewValkeyStore(cfg Config) (RoomKeyStore, error) { - c := redis.NewClient(&redis.Options{ - Addr: cfg.Addr, +// ClusterConfig holds connection config for a Valkey cluster deployment. +// Addrs is a list of seed node addresses; go-redis ClusterClient discovers +// all nodes automatically via CLUSTER SLOTS. One address is sufficient but +// listing all masters is more robust against seed-node downtime at connect time. +type ClusterConfig struct { + Addrs []string + Password string + GracePeriod time.Duration +} + +// NewValkeyClusterStore creates a valkeyStore backed by a Valkey cluster, +// pings the cluster to verify connectivity, and returns it. +func NewValkeyClusterStore(cfg ClusterConfig) (RoomKeyStore, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: cfg.Addrs, Password: cfg.Password, }) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := c.Ping(ctx).Err(); err != nil { - return nil, fmt.Errorf("valkey connect: %w", err) + if closeErr := c.Close(); closeErr != nil { + slog.Warn("valkey cluster close after failed connect", "error", closeErr) + } + return nil, fmt.Errorf("valkey cluster connect: %w", err) } - return &valkeyStore{client: &redisAdapter{c: c}, closer: c, gracePeriod: cfg.GracePeriod}, nil + return &valkeyStore{client: &clusterAdapter{c: c}, closer: c, gracePeriod: cfg.GracePeriod}, nil +} + +// NewValkeyClusterStoreFromClient wraps a pre-built *redis.ClusterClient as a +// RoomKeyStore. Intended for tests that inject a client configured with a +// ClusterSlots override (testcontainer port-mapping workaround). +func NewValkeyClusterStoreFromClient(c *redis.ClusterClient, gracePeriod time.Duration) RoomKeyStore { + return &valkeyStore{client: &clusterAdapter{c: c}, closer: c, gracePeriod: gracePeriod} } diff --git a/pkg/roomkeystore/integration_test.go b/pkg/roomkeystore/integration_test.go index 8ce539a3b..1e42d81b9 100644 --- a/pkg/roomkeystore/integration_test.go +++ b/pkg/roomkeystore/integration_test.go @@ -6,64 +6,42 @@ import ( "bytes" "context" "errors" - "fmt" "testing" "time" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" - "github.com/hmchangw/chat/pkg/testutil/testimages" + "github.com/hmchangw/chat/pkg/testutil" ) -// setupValkey starts a valkey/valkey:8 container and returns a connected valkeyStore. -// The container is terminated via t.Cleanup. -func setupValkey(t *testing.T, gracePeriod time.Duration) RoomKeyStore { +// setupValkey starts a cluster-mode Valkey container via testutil and returns +// a RoomKeyStore plus the raw *redis.ClusterClient (needed for CLUSTER KEYSLOT +// assertions). The container and client are terminated/closed via t.Cleanup. +func setupValkey(t *testing.T, gracePeriod time.Duration) (RoomKeyStore, *redis.ClusterClient) { t.Helper() - ctx := context.Background() - - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: testimages.Valkey, - ExposedPorts: []string{"6379/tcp"}, - WaitingFor: wait.ForLog("Ready to accept connections"), - }, - Started: true, - }) - require.NoError(t, err, "start valkey container") - t.Cleanup(func() { - _ = container.Terminate(ctx) // best-effort; ignore cleanup errors - }) - - host, err := container.Host(ctx) - require.NoError(t, err) - port, err := container.MappedPort(ctx, "6379") - require.NoError(t, err) - - store, err := NewValkeyStore(Config{ - Addr: fmt.Sprintf("%s:%s", host, port.Port()), - GracePeriod: gracePeriod, - }) - require.NoError(t, err, "create valkeyStore") - return store + c := testutil.StartValkeyCluster(t) + store := &valkeyStore{ + client: &clusterAdapter{c: c}, + closer: c, + gracePeriod: gracePeriod, + } + return store, c } func TestValkeyStore_Integration_RoundTrip(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() pubKey := bytes.Repeat([]byte{0xAB}, 65) privKey := bytes.Repeat([]byte{0xCD}, 32) pair := RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey} - // Set ver, err := store.Set(ctx, "room-1", pair) require.NoError(t, err) assert.Equal(t, 0, ver) - // Get — should return the stored pair with version got, err := store.Get(ctx, "room-1") require.NoError(t, err) require.NotNil(t, got) @@ -71,18 +49,16 @@ func TestValkeyStore_Integration_RoundTrip(t *testing.T) { assert.Equal(t, pubKey, got.KeyPair.PublicKey) assert.Equal(t, privKey, got.KeyPair.PrivateKey) - // Delete err = store.Delete(ctx, "room-1") require.NoError(t, err) - // Get after delete — should return nil, nil got, err = store.Get(ctx, "room-1") require.NoError(t, err) assert.Nil(t, got) } func TestValkeyStore_Integration_SetWithVersion(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() pubKey := bytes.Repeat([]byte{0xAB}, 65) @@ -98,7 +74,6 @@ func TestValkeyStore_Integration_SetWithVersion(t *testing.T) { assert.Equal(t, pubKey, got.KeyPair.PublicKey) assert.Equal(t, privKey, got.KeyPair.PrivateKey) - // Overwriting at a higher version is allowed (idempotent for replication catch-up). newPub := bytes.Repeat([]byte{0xEE}, 65) require.NoError(t, store.SetWithVersion(ctx, "room-replicated", RoomKeyPair{PublicKey: newPub, PrivateKey: privKey}, 9)) got, err = store.Get(ctx, "room-replicated") @@ -109,7 +84,7 @@ func TestValkeyStore_Integration_SetWithVersion(t *testing.T) { } func TestValkeyStore_Integration_MissingKey(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() got, err := store.Get(ctx, "nonexistent-room") @@ -118,7 +93,7 @@ func TestValkeyStore_Integration_MissingKey(t *testing.T) { } func TestValkeyStore_Integration_RotateRoundTrip(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() oldPub := bytes.Repeat([]byte{0xAA}, 65) @@ -126,17 +101,14 @@ func TestValkeyStore_Integration_RotateRoundTrip(t *testing.T) { newPub := bytes.Repeat([]byte{0xCC}, 65) newPriv := bytes.Repeat([]byte{0xDD}, 32) - // Set initial key pair. ver, err := store.Set(ctx, "room-rot", RoomKeyPair{PublicKey: oldPub, PrivateKey: oldPriv}) require.NoError(t, err) assert.Equal(t, 0, ver) - // Rotate to new key pair. ver, err = store.Rotate(ctx, "room-rot", RoomKeyPair{PublicKey: newPub, PrivateKey: newPriv}) require.NoError(t, err) assert.Equal(t, 1, ver) - // Get — should return new key pair as current. got, err := store.Get(ctx, "room-rot") require.NoError(t, err) require.NotNil(t, got) @@ -144,29 +116,25 @@ func TestValkeyStore_Integration_RotateRoundTrip(t *testing.T) { assert.Equal(t, newPub, got.KeyPair.PublicKey) assert.Equal(t, newPriv, got.KeyPair.PrivateKey) - // GetByVersion with old version — should return old key pair from previous slot. oldPair, err := store.GetByVersion(ctx, "room-rot", 0) require.NoError(t, err) require.NotNil(t, oldPair) assert.Equal(t, oldPub, oldPair.PublicKey) assert.Equal(t, oldPriv, oldPair.PrivateKey) - // GetByVersion with new version — should return new key pair from current slot. newPair, err := store.GetByVersion(ctx, "room-rot", 1) require.NoError(t, err) require.NotNil(t, newPair) assert.Equal(t, newPub, newPair.PublicKey) assert.Equal(t, newPriv, newPair.PrivateKey) - // GetByVersion with unknown version — should return nil, nil. unknown, err := store.GetByVersion(ctx, "room-rot", 999) require.NoError(t, err) assert.Nil(t, unknown) } func TestValkeyStore_Integration_GracePeriodExpiry(t *testing.T) { - // Use a 1-second grace period so the test completes quickly. - store := setupValkey(t, 1*time.Second) + store, _ := setupValkey(t, 1*time.Second) ctx := context.Background() oldPub := bytes.Repeat([]byte{0x01}, 65) @@ -180,7 +148,6 @@ func TestValkeyStore_Integration_GracePeriodExpiry(t *testing.T) { _, err = store.Rotate(ctx, "room-grace", RoomKeyPair{PublicKey: newPub, PrivateKey: newPriv}) require.NoError(t, err) - // Immediately after rotate, old key should still be retrievable. oldPair, err := store.GetByVersion(ctx, "room-grace", 0) require.NoError(t, err) require.NotNil(t, oldPair, "old key should be retrievable during grace period") @@ -189,12 +156,10 @@ func TestValkeyStore_Integration_GracePeriodExpiry(t *testing.T) { // waiting for an external Valkey TTL, not synchronising goroutines. time.Sleep(2 * time.Second) - // Old key should now be expired. oldPair, err = store.GetByVersion(ctx, "room-grace", 0) require.NoError(t, err) assert.Nil(t, oldPair, "old key should be expired after grace period") - // Current key should still be present (no TTL). got, err := store.Get(ctx, "room-grace") require.NoError(t, err) require.NotNil(t, got) @@ -202,7 +167,7 @@ func TestValkeyStore_Integration_GracePeriodExpiry(t *testing.T) { } func TestValkeyStore_Integration_RotateNoCurrentKey(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() _, err := store.Rotate(ctx, "room-empty", RoomKeyPair{ @@ -214,10 +179,9 @@ func TestValkeyStore_Integration_RotateNoCurrentKey(t *testing.T) { } func TestValkeyStore_Integration_DeleteBothKeys(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() - // Set + Rotate to create both current and previous keys. _, err := store.Set(ctx, "room-del", RoomKeyPair{ PublicKey: bytes.Repeat([]byte{0xAA}, 65), PrivateKey: bytes.Repeat([]byte{0xBB}, 32), @@ -230,23 +194,20 @@ func TestValkeyStore_Integration_DeleteBothKeys(t *testing.T) { }) require.NoError(t, err) - // Delete should remove both. err = store.Delete(ctx, "room-del") require.NoError(t, err) - // Current key should be gone. got, err := store.Get(ctx, "room-del") require.NoError(t, err) assert.Nil(t, got) - // Previous key should also be gone. prev, err := store.GetByVersion(ctx, "room-del", 0) require.NoError(t, err) assert.Nil(t, prev) } func TestValkeyStore_Integration_GetMany(t *testing.T) { - store := setupValkey(t, time.Hour) + store, _ := setupValkey(t, time.Hour) ctx := context.Background() pub1 := bytes.Repeat([]byte{0x01}, 65) @@ -256,37 +217,70 @@ func TestValkeyStore_Integration_GetMany(t *testing.T) { pub3 := bytes.Repeat([]byte{0x05}, 65) priv3 := bytes.Repeat([]byte{0x06}, 32) - _, err := store.Set(ctx, "room-1", RoomKeyPair{PublicKey: pub1, PrivateKey: priv1}) + _, err := store.Set(ctx, "getmany-room-1", RoomKeyPair{PublicKey: pub1, PrivateKey: priv1}) require.NoError(t, err) - _, err = store.Set(ctx, "room-2", RoomKeyPair{PublicKey: pub2, PrivateKey: priv2}) + _, err = store.Set(ctx, "getmany-room-2", RoomKeyPair{PublicKey: pub2, PrivateKey: priv2}) require.NoError(t, err) - _, err = store.Set(ctx, "room-3", RoomKeyPair{PublicKey: pub3, PrivateKey: priv3}) + _, err = store.Set(ctx, "getmany-room-3", RoomKeyPair{PublicKey: pub3, PrivateKey: priv3}) require.NoError(t, err) - got, err := store.GetMany(ctx, []string{"room-1", "room-2", "room-3", "room-missing"}) + got, err := store.GetMany(ctx, []string{"getmany-room-1", "getmany-room-2", "getmany-room-3", "getmany-room-missing"}) require.NoError(t, err) require.Len(t, got, 3, "missing room must be omitted from result") - require.Contains(t, got, "room-1") - assert.Equal(t, 0, got["room-1"].Version) - assert.Equal(t, pub1, got["room-1"].KeyPair.PublicKey) - assert.Equal(t, priv1, got["room-1"].KeyPair.PrivateKey) + require.Contains(t, got, "getmany-room-1") + assert.Equal(t, 0, got["getmany-room-1"].Version) + assert.Equal(t, pub1, got["getmany-room-1"].KeyPair.PublicKey) + assert.Equal(t, priv1, got["getmany-room-1"].KeyPair.PrivateKey) - require.Contains(t, got, "room-2") - assert.Equal(t, 0, got["room-2"].Version) - assert.Equal(t, pub2, got["room-2"].KeyPair.PublicKey) - assert.Equal(t, priv2, got["room-2"].KeyPair.PrivateKey) + require.Contains(t, got, "getmany-room-2") + assert.Equal(t, 0, got["getmany-room-2"].Version) + assert.Equal(t, pub2, got["getmany-room-2"].KeyPair.PublicKey) + assert.Equal(t, priv2, got["getmany-room-2"].KeyPair.PrivateKey) - require.Contains(t, got, "room-3") - assert.Equal(t, 0, got["room-3"].Version) - assert.Equal(t, pub3, got["room-3"].KeyPair.PublicKey) - assert.Equal(t, priv3, got["room-3"].KeyPair.PrivateKey) + require.Contains(t, got, "getmany-room-3") + assert.Equal(t, 0, got["getmany-room-3"].Version) + assert.Equal(t, pub3, got["getmany-room-3"].KeyPair.PublicKey) + assert.Equal(t, priv3, got["getmany-room-3"].KeyPair.PrivateKey) - _, missing := got["room-missing"] - assert.False(t, missing, "room-missing must not be present in result") + _, missing := got["getmany-room-missing"] + assert.False(t, missing, "getmany-room-missing must not be present in result") empty, err := store.GetMany(ctx, []string{}) require.NoError(t, err) require.NotNil(t, empty) assert.Empty(t, empty) } + +// TestValkeyStore_Integration_HashTagSlotConsistency verifies that the +// {roomID} hash tag in both key names forces them onto the same cluster slot. +// Without hash tags the Lua rotate script would fail with a CROSSSLOT error. +func TestValkeyStore_Integration_HashTagSlotConsistency(t *testing.T) { + store, c := setupValkey(t, time.Hour) + ctx := context.Background() + + const roomID = "test-hash-tag-room" + + currentKey := roomkey(roomID) + prevKey := roomprevkey(roomID) + + slotCurrent, err := c.Do(ctx, "CLUSTER", "KEYSLOT", currentKey).Int() + require.NoError(t, err, "CLUSTER KEYSLOT current") + slotPrev, err := c.Do(ctx, "CLUSTER", "KEYSLOT", prevKey).Int() + require.NoError(t, err, "CLUSTER KEYSLOT prev") + + assert.Equal(t, slotCurrent, slotPrev, + "current key %q and prev key %q must hash to the same cluster slot", currentKey, prevKey) + + _, err = store.Set(ctx, roomID, RoomKeyPair{ + PublicKey: bytes.Repeat([]byte{0x01}, 65), + PrivateKey: bytes.Repeat([]byte{0x02}, 32), + }) + require.NoError(t, err) + + _, err = store.Rotate(ctx, roomID, RoomKeyPair{ + PublicKey: bytes.Repeat([]byte{0x03}, 65), + PrivateKey: bytes.Repeat([]byte{0x04}, 32), + }) + require.NoError(t, err, "rotate must not return CROSSSLOT — hash tags ensure both keys share a slot") +} diff --git a/pkg/roomkeystore/roomkeystore.go b/pkg/roomkeystore/roomkeystore.go index 16441a835..a5931c71b 100644 --- a/pkg/roomkeystore/roomkeystore.go +++ b/pkg/roomkeystore/roomkeystore.go @@ -41,13 +41,6 @@ type RoomKeyStore interface { Close() error } -// Config holds Valkey connection and grace period configuration, parsed via caarlos0/env. -type Config struct { - Addr string `env:"VALKEY_ADDR,required"` - Password string `env:"VALKEY_PASSWORD" envDefault:""` - GracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` -} - // hashCommander is a minimal internal interface over the Valkey hash commands used by valkeyStore. // Unexported and command-specific so unit tests can inject a fake without a live Valkey connection. type hashCommander interface { @@ -76,13 +69,15 @@ func (s *valkeyStore) Close() error { } // roomkey returns the Valkey hash key for a room's current key pair. +// The {roomID} hash tag ensures both roomkey and roomprevkey for the same +// room always land on the same cluster slot, which is required for the Lua +// rotate script and DEL pipeline to execute without a CROSSSLOT error. func roomkey(roomID string) string { - return "room:" + roomID + ":key" + return "room:{" + roomID + "}:key" } -// roomprevkey returns the Valkey hash key for a room's previous key pair. func roomprevkey(roomID string) string { - return "room:" + roomID + ":key:prev" + return "room:{" + roomID + "}:key:prev" } // Set stores pair in Valkey as a hash with no TTL, assigning version 0. diff --git a/pkg/roomsubcache/integration_test.go b/pkg/roomsubcache/integration_test.go index f3373fce7..fea394fa4 100644 --- a/pkg/roomsubcache/integration_test.go +++ b/pkg/roomsubcache/integration_test.go @@ -4,48 +4,20 @@ package roomsubcache_test import ( "context" - "fmt" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" "github.com/hmchangw/chat/pkg/roomsubcache" - "github.com/hmchangw/chat/pkg/testutil/testimages" + "github.com/hmchangw/chat/pkg/testutil" "github.com/hmchangw/chat/pkg/valkeyutil" ) -// setupValkey starts a valkey/valkey:8 container and returns a connected -// valkeyutil.Client. The container is terminated via t.Cleanup. func setupValkey(t *testing.T) valkeyutil.Client { t.Helper() - ctx := context.Background() - - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: testimages.Valkey, - ExposedPorts: []string{"6379/tcp"}, - WaitingFor: wait.ForLog("Ready to accept connections"), - }, - Started: true, - }) - require.NoError(t, err, "start valkey container") - t.Cleanup(func() { - _ = container.Terminate(ctx) // best-effort; ignore cleanup errors - }) - - host, err := container.Host(ctx) - require.NoError(t, err) - port, err := container.MappedPort(ctx, "6379") - require.NoError(t, err) - - client, err := valkeyutil.Connect(ctx, fmt.Sprintf("%s:%s", host, port.Port()), "") - require.NoError(t, err, "connect valkey") - t.Cleanup(func() { _ = client.Close() }) // best-effort; ignore cleanup errors - return client + return valkeyutil.WrapClusterClient(testutil.StartValkeyCluster(t)) } func TestValkeyCache_Integration_SetGetInvalidate(t *testing.T) { diff --git a/pkg/subject/subject.go b/pkg/subject/subject.go index 1d0db99b5..b8e0dae33 100644 --- a/pkg/subject/subject.go +++ b/pkg/subject/subject.go @@ -192,6 +192,13 @@ func RoomsInfoBatch(siteID string) string { return fmt.Sprintf("chat.server.request.room.%s.info.batch", siteID) } +// RoomKeyEnsure is the server-to-server request subject for the room key ensure +// RPC. Callers send a RoomKeyEnsureRequest and receive a RoomKeyEnsureResponse +// confirming the room has a key pair in Valkey at the returned version. +func RoomKeyEnsure(siteID string) string { + return fmt.Sprintf("chat.server.request.room.%s.key.ensure", siteID) +} + // RoomCreateDMSync is the server-to-server request subject for synchronous DM/botDM creation. func RoomCreateDMSync(siteID string) string { return fmt.Sprintf("chat.server.request.room.%s.create.dm", siteID) diff --git a/pkg/testutil/valkey.go b/pkg/testutil/valkey.go new file mode 100644 index 000000000..6a447a161 --- /dev/null +++ b/pkg/testutil/valkey.go @@ -0,0 +1,83 @@ +//go:build integration + +package testutil + +import ( + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + + "github.com/hmchangw/chat/pkg/testutil/testimages" +) + +// StartValkeyCluster starts a single-node cluster-mode Valkey container, +// assigns all 16384 hash slots to that node, and returns a connected +// *redis.ClusterClient. The ClusterSlots override routes traffic to the +// externally-mapped address rather than the internal 127.0.0.1:6379 that +// the node announces to peers — required for testcontainer port mapping. +// The container and client are terminated/closed via t.Cleanup. +func StartValkeyCluster(t *testing.T) *redis.ClusterClient { + t.Helper() + ctx := context.Background() + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: testimages.Valkey, + ExposedPorts: []string{"6379/tcp"}, + Cmd: []string{ + "valkey-server", + "--cluster-enabled", "yes", + "--cluster-config-file", "nodes.conf", + "--cluster-node-timeout", "5000", + "--save", "", + }, + WaitingFor: wait.ForLog("Ready to accept connections"), + }, + Started: true, + }) + require.NoError(t, err, "start valkey cluster container") + t.Cleanup(func() { _ = container.Terminate(ctx) }) + + host, err := container.Host(ctx) + require.NoError(t, err) + port, err := container.MappedPort(ctx, "6379") + require.NoError(t, err) + addr := fmt.Sprintf("%s:%s", host, port.Port()) + + exitCode, _, err := container.Exec(ctx, []string{"valkey-cli", "CLUSTER", "ADDSLOTSRANGE", "0", "16383"}) + require.NoError(t, err, "exec cluster addslotsrange") + require.Equal(t, 0, exitCode, "cluster addslotsrange must exit 0") + + require.Eventually(t, func() bool { + _, out, execErr := container.Exec(ctx, []string{"valkey-cli", "CLUSTER", "INFO"}) + if execErr != nil { + return false + } + buf, _ := io.ReadAll(out) + return strings.Contains(string(buf), "cluster_state:ok") + }, 10*time.Second, 100*time.Millisecond, "cluster must reach ok state") + + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{addr}, + ClusterSlots: func(_ context.Context) ([]redis.ClusterSlot, error) { + return []redis.ClusterSlot{ + {Start: 0, End: 16383, Nodes: []redis.ClusterNode{{Addr: addr}}}, + }, nil + }, + }) + t.Cleanup(func() { _ = c.Close() }) + + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + require.NoError(t, c.Ping(pingCtx).Err(), "ping valkey cluster") + + return c +} diff --git a/pkg/valkeyutil/integration_test.go b/pkg/valkeyutil/integration_test.go new file mode 100644 index 000000000..84bb5531a --- /dev/null +++ b/pkg/valkeyutil/integration_test.go @@ -0,0 +1,57 @@ +//go:build integration + +package valkeyutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hmchangw/chat/pkg/testutil" +) + +// setupClusterClient starts a cluster-mode Valkey container via the shared +// testutil helper and returns a Client backed by clusterClient. ConnectCluster +// itself cannot be used here because its auto-discovery follows CLUSTER SLOTS, +// which returns the container-internal 127.0.0.1:6379 — unreachable from the +// host. testutil.StartValkeyCluster applies the ClusterSlots override; we then +// wrap the resulting *redis.ClusterClient directly (same-package access). +// ConnectCluster's error-wrapping path is covered by TestConnectCluster_ErrorPath. +func setupClusterClient(t *testing.T) Client { + t.Helper() + return &clusterClient{c: testutil.StartValkeyCluster(t)} +} + +func TestClusterRedisClient_Integration_GetSetDel(t *testing.T) { + client := setupClusterClient(t) + ctx := context.Background() + + require.NoError(t, client.Set(ctx, "k1", "hello", time.Hour)) + + val, err := client.Get(ctx, "k1") + require.NoError(t, err) + assert.Equal(t, "hello", val) + + require.NoError(t, client.Del(ctx, "k1")) + + _, err = client.Get(ctx, "k1") + assert.ErrorIs(t, err, ErrCacheMiss) +} + +func TestClusterRedisClient_Integration_CacheMiss(t *testing.T) { + client := setupClusterClient(t) + ctx := context.Background() + + _, err := client.Get(ctx, "no-such-key") + assert.ErrorIs(t, err, ErrCacheMiss) +} + +func TestClusterRedisClient_Integration_DelEmpty(t *testing.T) { + client := setupClusterClient(t) + ctx := context.Background() + + require.NoError(t, client.Del(ctx)) +} diff --git a/pkg/valkeyutil/valkey.go b/pkg/valkeyutil/valkey.go index 0830b3039..00526ac57 100644 --- a/pkg/valkeyutil/valkey.go +++ b/pkg/valkeyutil/valkey.go @@ -18,8 +18,8 @@ import ( "github.com/redis/go-redis/v9" ) -// Client is the interface exposed by Connect. Tests can substitute their -// own implementation without depending on go-redis directly. +// Client is the interface exposed by ConnectCluster. Tests can substitute +// their own implementation without depending on go-redis directly. type Client interface { Get(ctx context.Context, key string) (string, error) Set(ctx context.Context, key, value string, ttl time.Duration) error @@ -30,31 +30,36 @@ type Client interface { // ErrCacheMiss is returned by Get and GetJSON when the key does not exist. var ErrCacheMiss = errors.New("valkey: cache miss") -type redisClient struct { - c *redis.Client +type clusterClient struct { + c *redis.ClusterClient } -// Connect dials Valkey/Redis, verifies connectivity with PING, and returns -// a Client. Follows the same `fmt.Errorf("… connect: %w", err)` wrapping -// style used by pkg/mongoutil so upstream logs are consistent. -func Connect(ctx context.Context, addr, password string) (Client, error) { - c := redis.NewClient(&redis.Options{ - Addr: addr, +// ConnectCluster dials a Valkey cluster via the provided seed addresses, +// verifies connectivity with PING, and returns a Client. +func ConnectCluster(ctx context.Context, addrs []string, password string) (Client, error) { + c := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, Password: password, }) pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := c.Ping(pingCtx).Err(); err != nil { // Close the half-constructed client on the ping-failure path so - // repeated connect failures (e.g. startup retry loops against - // an unreachable addr) don't leak internal go-redis pool state. + // unreachable addrs don't leak internal go-redis pool state. if closeErr := c.Close(); closeErr != nil { - slog.Warn("valkey close after failed connect", "error", closeErr) + slog.Warn("valkey cluster close after failed connect", "error", closeErr) } - return nil, fmt.Errorf("valkey connect: %w", err) + return nil, fmt.Errorf("valkey cluster connect: %w", err) } - slog.Info("connected to Valkey", "addr", addr) - return &redisClient{c: c}, nil + slog.Info("connected to Valkey cluster", "addrs", addrs) + return &clusterClient{c: c}, nil +} + +// WrapClusterClient wraps a pre-built *redis.ClusterClient as a Client. +// Intended for tests that need to inject a client configured with a +// ClusterSlots override (testcontainer port-mapping workaround). +func WrapClusterClient(c *redis.ClusterClient) Client { + return &clusterClient{c: c} } // Disconnect closes the client and logs any failure at ERROR. @@ -67,7 +72,7 @@ func Disconnect(client Client) { } } -func (r *redisClient) Get(ctx context.Context, key string) (string, error) { +func (r *clusterClient) Get(ctx context.Context, key string) (string, error) { val, err := r.c.Get(ctx, key).Result() if errors.Is(err, redis.Nil) { return "", ErrCacheMiss @@ -78,14 +83,14 @@ func (r *redisClient) Get(ctx context.Context, key string) (string, error) { return val, nil } -func (r *redisClient) Set(ctx context.Context, key, value string, ttl time.Duration) error { +func (r *clusterClient) Set(ctx context.Context, key, value string, ttl time.Duration) error { if err := r.c.Set(ctx, key, value, ttl).Err(); err != nil { return fmt.Errorf("valkey set: %w", err) } return nil } -func (r *redisClient) Del(ctx context.Context, keys ...string) error { +func (r *clusterClient) Del(ctx context.Context, keys ...string) error { if len(keys) == 0 { return nil } @@ -95,7 +100,7 @@ func (r *redisClient) Del(ctx context.Context, keys ...string) error { return nil } -func (r *redisClient) Close() error { +func (r *clusterClient) Close() error { return r.c.Close() } diff --git a/pkg/valkeyutil/valkey_test.go b/pkg/valkeyutil/valkey_test.go index 6837f4bad..b0dba8e2f 100644 --- a/pkg/valkeyutil/valkey_test.go +++ b/pkg/valkeyutil/valkey_test.go @@ -128,11 +128,8 @@ func TestDisconnect(t *testing.T) { }) } -func TestConnect_ErrorPath(t *testing.T) { - // Point at a port that refuses connections (port 1 is well-known to - // reject without a listener). The internal Ping must fail fast and - // Connect must return a wrapped error — no real Valkey needed. - _, err := valkeyutil.Connect(context.Background(), "127.0.0.1:1", "") +func TestConnectCluster_ErrorPath(t *testing.T) { + _, err := valkeyutil.ConnectCluster(context.Background(), []string{"127.0.0.1:1"}, "") require.Error(t, err) - assert.Contains(t, err.Error(), "valkey connect") + assert.Contains(t, err.Error(), "valkey cluster connect") } diff --git a/room-service/deploy/docker-compose.yml b/room-service/deploy/docker-compose.yml index 4a23fe33a..f6ca2ba39 100644 --- a/room-service/deploy/docker-compose.yml +++ b/room-service/deploy/docker-compose.yml @@ -13,7 +13,7 @@ services: - MONGO_DB=chat - MAX_ROOM_SIZE=1000 - MAX_BATCH_SIZE=500 - - VALKEY_ADDR=valkey:6379 + - VALKEY_ADDRS=valkey:6379 - VALKEY_KEY_GRACE_PERIOD=24h - CASSANDRA_HOSTS=cassandra - CASSANDRA_KEYSPACE=chat diff --git a/room-service/handler.go b/room-service/handler.go index 6d5818f6c..c904d6e37 100644 --- a/room-service/handler.go +++ b/room-service/handler.go @@ -29,7 +29,7 @@ import ( type Handler struct { store RoomStore - // keyStore is set when VALKEY_ADDR is configured (always in production; tests may pass nil). + // keyStore is set when VALKEY_ADDRS is configured (always in production; tests may pass nil). keyStore RoomKeyStore memberListClient MemberListClient msgReader MessageReader @@ -95,6 +95,9 @@ func (h *Handler) RegisterCRUD(nc *otelnats.Conn) error { if _, err := nc.QueueSubscribe(subject.OrgMembersWildcard(), queue, h.natsListOrgMembers); err != nil { return fmt.Errorf("subscribe org members: %w", err) } + if _, err := nc.QueueSubscribe(subject.RoomKeyEnsure(h.siteID), queue, h.NatsHandleEnsureRoomKey); err != nil { + return fmt.Errorf("subscribe room key ensure: %w", err) + } return nil } @@ -1159,3 +1162,58 @@ func (h *Handler) handleMessageReadReceipt(ctx context.Context, subj string, dat return json.Marshal(model.ReadReceiptResponse{Readers: entries}) } + +// NatsHandleEnsureRoomKey handles server-to-server requests to ensure a room +// has an encryption key pair in Valkey. Generates and stores a new pair if +// missing. The reply confirms the room and version but does not return key +// bytes — encryption/decryption is performed by broadcast-worker and clients, +// which read keys from Valkey directly. +func (h *Handler) NatsHandleEnsureRoomKey(m otelnats.Msg) { + ctx := wrappedCtx(m) + resp, err := h.handleEnsureRoomKey(ctx, m.Msg.Data) + if err != nil { + slog.Error("ensure room key failed", "error", err) + natsutil.ReplyError(m.Msg, sanitizeError(err)) + return + } + if err := m.Msg.Respond(resp); err != nil { + slog.Error("failed to respond to ensure room key", "error", err) + } +} + +func (h *Handler) handleEnsureRoomKey(ctx context.Context, data []byte) ([]byte, error) { + if h.keyStore == nil { + return nil, fmt.Errorf("ensure room key: key store not configured") + } + var req model.RoomKeyEnsureRequest + if err := json.Unmarshal(data, &req); err != nil { + return nil, fmt.Errorf("ensure room key: decode request: %w", err) + } + if req.RoomID == "" { + return nil, fmt.Errorf("ensure room key: roomId is required") + } + + existing, err := h.keyStore.Get(ctx, req.RoomID) + if err != nil { + return nil, fmt.Errorf("ensure room key: get: %w", err) + } + if existing != nil { + return json.Marshal(model.RoomKeyEnsureResponse{ + RoomID: req.RoomID, + Version: existing.Version, + }) + } + + newPair, err := roomkeystore.GenerateKeyPair() + if err != nil { + return nil, fmt.Errorf("ensure room key: generate key pair: %w", err) + } + ver, err := h.keyStore.Set(ctx, req.RoomID, newPair) + if err != nil { + return nil, fmt.Errorf("ensure room key: set: %w", err) + } + return json.Marshal(model.RoomKeyEnsureResponse{ + RoomID: req.RoomID, + Version: ver, + }) +} diff --git a/room-service/handler_test.go b/room-service/handler_test.go index 6d1d12747..9df2b9281 100644 --- a/room-service/handler_test.go +++ b/room-service/handler_test.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/base64" "encoding/json" @@ -3087,3 +3088,116 @@ func TestHandler_RemoveMember_StampsBaseKeyVersion(t *testing.T) { require.NoError(t, err) assert.Equal(t, 4, captured.BaseKeyVersion, "BaseKeyVersion must be stamped from the current Valkey version") } + +func TestHandler_EnsureRoomKey_KeyExists(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + + existing := &roomkeystore.VersionedKeyPair{ + Version: 7, + KeyPair: roomkeystore.RoomKeyPair{ + PublicKey: bytes.Repeat([]byte{0xAB}, 65), + PrivateKey: bytes.Repeat([]byte{0xCD}, 32), + }, + } + keyStore.EXPECT().Get(gomock.Any(), "room-abc").Return(existing, nil) + + h := &Handler{keyStore: keyStore, siteID: "site-local"} + req := model.RoomKeyEnsureRequest{RoomID: "room-abc"} + data, _ := json.Marshal(req) + + resp, err := h.handleEnsureRoomKey(context.Background(), data) + require.NoError(t, err) + + var result model.RoomKeyEnsureResponse + require.NoError(t, json.Unmarshal(resp, &result)) + assert.Equal(t, "room-abc", result.RoomID) + assert.Equal(t, 7, result.Version) + + assert.NotContains(t, string(resp), "publicKey", "response must not include public key bytes") + assert.NotContains(t, string(resp), "privateKey", "response must not include private key bytes") +} + +func TestHandler_EnsureRoomKey_KeyNotFound_SetsNew(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + + keyStore.EXPECT().Get(gomock.Any(), "room-new").Return(nil, nil) + + var capturedPair roomkeystore.RoomKeyPair + keyStore.EXPECT(). + Set(gomock.Any(), "room-new", gomock.Any()). + DoAndReturn(func(_ context.Context, _ string, p roomkeystore.RoomKeyPair) (int, error) { + capturedPair = p + return 0, nil + }) + + h := &Handler{keyStore: keyStore, siteID: "site-local"} + req := model.RoomKeyEnsureRequest{RoomID: "room-new"} + data, _ := json.Marshal(req) + + resp, err := h.handleEnsureRoomKey(context.Background(), data) + require.NoError(t, err) + + var result model.RoomKeyEnsureResponse + require.NoError(t, json.Unmarshal(resp, &result)) + assert.Equal(t, "room-new", result.RoomID) + assert.Equal(t, 0, result.Version) + + assert.Len(t, capturedPair.PublicKey, 65, "P-256 public key must be 65 bytes (uncompressed) — stored in Valkey") + assert.Len(t, capturedPair.PrivateKey, 32, "P-256 private key must be 32 bytes — stored in Valkey") + assert.NotContains(t, string(resp), "publicKey", "response must not include public key bytes") + assert.NotContains(t, string(resp), "privateKey", "response must not include private key bytes") +} + +func TestHandler_EnsureRoomKey_MalformedRequest(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + h := &Handler{keyStore: keyStore, siteID: "site-local"} + + _, err := h.handleEnsureRoomKey(context.Background(), []byte("{not json")) + require.Error(t, err) +} + +func TestHandler_EnsureRoomKey_MissingRoomID(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + h := &Handler{keyStore: keyStore, siteID: "site-local"} + + data, _ := json.Marshal(model.RoomKeyEnsureRequest{RoomID: ""}) + _, err := h.handleEnsureRoomKey(context.Background(), data) + require.Error(t, err) +} + +func TestHandler_EnsureRoomKey_GetError(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + keyStore.EXPECT().Get(gomock.Any(), "room-err").Return(nil, errors.New("valkey down")) + + h := &Handler{keyStore: keyStore, siteID: "site-local"} + data, _ := json.Marshal(model.RoomKeyEnsureRequest{RoomID: "room-err"}) + + _, err := h.handleEnsureRoomKey(context.Background(), data) + require.Error(t, err) +} + +func TestHandler_EnsureRoomKey_SetError(t *testing.T) { + ctrl := gomock.NewController(t) + keyStore := NewMockRoomKeyStore(ctrl) + keyStore.EXPECT().Get(gomock.Any(), "room-setfail").Return(nil, nil) + keyStore.EXPECT().Set(gomock.Any(), "room-setfail", gomock.Any()).Return(0, errors.New("write failed")) + + h := &Handler{keyStore: keyStore, siteID: "site-local"} + data, _ := json.Marshal(model.RoomKeyEnsureRequest{RoomID: "room-setfail"}) + + _, err := h.handleEnsureRoomKey(context.Background(), data) + require.Error(t, err) +} + +func TestHandler_EnsureRoomKey_NilKeyStore(t *testing.T) { + h := &Handler{keyStore: nil, siteID: "site-local"} + data, _ := json.Marshal(model.RoomKeyEnsureRequest{RoomID: "room-abc"}) + + _, err := h.handleEnsureRoomKey(context.Background(), data) + require.Error(t, err) +} diff --git a/room-service/integration_test.go b/room-service/integration_test.go index cc2c9cfbc..810483556 100644 --- a/room-service/integration_test.go +++ b/room-service/integration_test.go @@ -17,9 +17,7 @@ import ( "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" natsmod "github.com/testcontainers/testcontainers-go/modules/nats" - "github.com/testcontainers/testcontainers-go/wait" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" @@ -36,27 +34,9 @@ func setupMongo(t *testing.T) *mongo.Database { return testutil.MongoDB(t, "room_service_test") } -func setupValkey(t *testing.T) *roomkeystore.Config { +func setupValkey(t *testing.T) roomkeystore.RoomKeyStore { t.Helper() - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: testimages.Valkey, - ExposedPorts: []string{"6379/tcp"}, - WaitingFor: wait.ForLog("Ready to accept connections"), - }, - Started: true, - }) - require.NoError(t, err) - t.Cleanup(func() { _ = container.Terminate(ctx) }) - host, err := container.Host(ctx) - require.NoError(t, err) - port, err := container.MappedPort(ctx, "6379") - require.NoError(t, err) - return &roomkeystore.Config{ - Addr: fmt.Sprintf("%s:%s", host, port.Port()), - GracePeriod: time.Hour, - } + return roomkeystore.NewValkeyClusterStoreFromClient(testutil.StartValkeyCluster(t), time.Hour) } func setupCassandra(t *testing.T) *gocql.Session { @@ -856,10 +836,7 @@ func TestAddMembers_SameSiteChannel_RoomMembersPath(t *testing.T) { } db := setupMongo(t) - valCfg := setupValkey(t) - - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) store := NewMongoStore(db) ctx := context.Background() @@ -869,7 +846,7 @@ func TestAddMembers_SameSiteChannel_RoomMembersPath(t *testing.T) { // Source channel on site-a: seed room_members explicitly so ListRoomMembers takes the room_members // branch (not the subscriptions fallback); also seed users so ResolveAccounts can find them. require.NoError(t, store.CreateRoom(ctx, &model.Room{ID: "source", Type: model.RoomTypeChannel, SiteID: "site-a"})) - _, err = db.Collection("users").InsertMany(ctx, []interface{}{ + _, err := db.Collection("users").InsertMany(ctx, []interface{}{ model.User{ID: "u1", Account: "bob", SiteID: "site-a"}, model.User{ID: "u2", Account: "carol", SiteID: "site-a"}, model.User{ID: "u3", Account: "dave", SiteID: "site-a", SectID: "eng-org"}, @@ -932,10 +909,7 @@ func TestAddMembers_SameSiteChannel_SubscriptionsFallback(t *testing.T) { } db := setupMongo(t) - valCfg := setupValkey(t) - - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) store := NewMongoStore(db) ctx := context.Background() @@ -943,7 +917,7 @@ func TestAddMembers_SameSiteChannel_SubscriptionsFallback(t *testing.T) { require.NoError(t, store.CreateRoom(ctx, &model.Room{ID: "target", Type: model.RoomTypeChannel, SiteID: "site-a"})) require.NoError(t, store.CreateRoom(ctx, &model.Room{ID: "source", Type: model.RoomTypeChannel, SiteID: "site-a"})) // Seed users so ResolveAccounts can find them. - _, err = db.Collection("users").InsertMany(ctx, []interface{}{ + _, err := db.Collection("users").InsertMany(ctx, []interface{}{ model.User{ID: "u1", Account: "bob", SiteID: "site-a"}, model.User{ID: "u2", Account: "carol", SiteID: "site-a"}, model.User{ID: "u3", Account: "dave", SiteID: "site-a"}, @@ -996,10 +970,7 @@ func TestAddMembers_RequesterNotSubscribed_Rejected(t *testing.T) { } db := setupMongo(t) - valCfg := setupValkey(t) - - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) store := NewMongoStore(db) ctx := context.Background() @@ -1032,10 +1003,7 @@ func TestAddMembers_TwoSiteEndToEnd(t *testing.T) { dbA := testutil.MongoDB(t, "room_service_test_a") dbB := testutil.MongoDB(t, "room_service_test_b") natsURLb := setupNATS(t) - valCfg := setupValkey(t) - - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) storeA := NewMongoStore(dbA) storeB := NewMongoStore(dbB) @@ -1121,10 +1089,7 @@ func TestAddMembers_CrossSiteTimeout(t *testing.T) { db := setupMongo(t) natsURL := setupNATS(t) - valCfg := setupValkey(t) - - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) store := NewMongoStore(db) otelNC, err := otelnats.Connect(natsURL) require.NoError(t, err) @@ -1171,12 +1136,9 @@ func TestAddMembers_CrossSiteTimeout(t *testing.T) { func TestRoomsInfoBatchRPC(t *testing.T) { db := setupMongo(t) - valCfg := setupValkey(t) + keyStore := setupValkey(t) natsURL := setupNATS(t) - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) - store := NewMongoStore(db) ctx := context.Background() @@ -1194,7 +1156,7 @@ func TestRoomsInfoBatchRPC(t *testing.T) { pubKey := bytes.Repeat([]byte{0xAB}, 65) privKey1 := bytes.Repeat([]byte{0x01}, 32) privKey2 := bytes.Repeat([]byte{0x02}, 32) - _, err = keyStore.Set(ctx, "r1", roomkeystore.RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey1}) + _, err := keyStore.Set(ctx, "r1", roomkeystore.RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey1}) require.NoError(t, err) _, err = keyStore.Set(ctx, "r2", roomkeystore.RoomKeyPair{PublicKey: pubKey, PrivateKey: privKey2}) require.NoError(t, err) @@ -1263,9 +1225,7 @@ func TestIntegration_CreateRoom_PersistsKeyInValkey(t *testing.T) { store := NewMongoStore(db) require.NoError(t, store.EnsureIndexes(ctx)) - valCfg := setupValkey(t) - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) mustInsertUser(t, db, &model.User{ ID: "u_alice", Account: "alice", SiteID: "site-A", @@ -1350,9 +1310,7 @@ func TestCreateRoomChannelEndToEnd(t *testing.T) { store := NewMongoStore(db) require.NoError(t, store.EnsureIndexes(ctx)) - valCfg := setupValkey(t) - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) mustInsertUser(t, db, &model.User{ ID: "u_alice", Account: "alice", SiteID: "site-A", @@ -1401,9 +1359,7 @@ func TestCreateRoomDMAlreadyExists(t *testing.T) { store := NewMongoStore(db) require.NoError(t, store.EnsureIndexes(ctx)) - valCfg := setupValkey(t) - keyStore, err := roomkeystore.NewValkeyStore(*valCfg) - require.NoError(t, err) + keyStore := setupValkey(t) mustInsertUser(t, db, &model.User{ID: "u_alice", Account: "alice", EngName: "Alice", ChineseName: "爱丽丝", SiteID: "site-A"}) diff --git a/room-service/main.go b/room-service/main.go index 36c4b4d1b..8bbb37cb3 100644 --- a/room-service/main.go +++ b/room-service/main.go @@ -30,7 +30,7 @@ type config struct { MaxRoomSize int `env:"MAX_ROOM_SIZE" envDefault:"1000"` MaxBatchSize int `env:"MAX_BATCH_SIZE" envDefault:"1000"` MemberListTimeout time.Duration `env:"MEMBER_LIST_TIMEOUT" envDefault:"5s"` - ValkeyAddr string `env:"VALKEY_ADDR,required"` + ValkeyAddrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` ValkeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD,required"` CassandraHosts string `env:"CASSANDRA_HOSTS,required"` @@ -79,8 +79,8 @@ func main() { } db := mongoClient.Database(cfg.MongoDB) - keyStore, err := roomkeystore.NewValkeyStore(roomkeystore.Config{ - Addr: cfg.ValkeyAddr, + keyStore, err := roomkeystore.NewValkeyClusterStore(roomkeystore.ClusterConfig{ + Addrs: cfg.ValkeyAddrs, Password: cfg.ValkeyPassword, GracePeriod: cfg.ValkeyGracePeriod, }) diff --git a/room-worker/deploy/docker-compose.yml b/room-worker/deploy/docker-compose.yml index 580917820..b549cfeb6 100644 --- a/room-worker/deploy/docker-compose.yml +++ b/room-worker/deploy/docker-compose.yml @@ -11,9 +11,9 @@ services: - SITE_ID=site-local - MONGO_URI=mongodb://mongodb:27017 - MONGO_DB=chat - # Valkey is required (room-worker refuses to start without VALKEY_ADDR). + # Valkey cluster is required (room-worker refuses to start without VALKEY_ADDRS). # Provided by docker-local/compose.deps.yaml; production deploys must supply it externally. - - VALKEY_ADDR=valkey:6379 + - VALKEY_ADDRS=valkey:6379 - VALKEY_KEY_GRACE_PERIOD=24h - BOOTSTRAP_STREAMS=true volumes: diff --git a/room-worker/integration_test.go b/room-worker/integration_test.go index 18488bc31..8e1f60383 100644 --- a/room-worker/integration_test.go +++ b/room-worker/integration_test.go @@ -5,7 +5,6 @@ package main import ( "context" "encoding/json" - "fmt" "slices" "strings" "sync" @@ -16,8 +15,6 @@ import ( "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -29,7 +26,6 @@ import ( "github.com/hmchangw/chat/pkg/roomkeystore" "github.com/hmchangw/chat/pkg/subject" "github.com/hmchangw/chat/pkg/testutil" - "github.com/hmchangw/chat/pkg/testutil/testimages" ) // capturedPublish records a single publish call for later assertion. @@ -1166,34 +1162,9 @@ func TestSyncCreateDM_CrossSite_OutboxPayloadConverges(t *testing.T) { "replay must produce identical Nats-Msg-Id so broker dedup blocks duplicate cross-site events") } -// setupValkey starts a Valkey testcontainer and returns a connected full key store. -// The returned store satisfies both roomkeystore.RoomKeyStore (for seeding) and the -// local RoomKeyStore interface accepted by NewHandler (Get-only subset). func setupValkey(t *testing.T) roomkeystore.RoomKeyStore { t.Helper() - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: testimages.Valkey, - ExposedPorts: []string{"6379/tcp"}, - WaitingFor: wait.ForLog("Ready to accept connections"), - }, - Started: true, - }) - require.NoError(t, err) - t.Cleanup(func() { _ = container.Terminate(ctx) }) - host, err := container.Host(ctx) - require.NoError(t, err) - port, err := container.MappedPort(ctx, "6379") - require.NoError(t, err) - cfg := roomkeystore.Config{ - Addr: fmt.Sprintf("%s:%s", host, port.Port()), - GracePeriod: time.Hour, - } - ks, err := roomkeystore.NewValkeyStore(cfg) - require.NoError(t, err) - t.Cleanup(func() { _ = ks.Close() }) - return ks + return roomkeystore.NewValkeyClusterStoreFromClient(testutil.StartValkeyCluster(t), time.Hour) } // startEmbeddedNATS starts an in-process NATS server and returns a connected client. diff --git a/room-worker/main.go b/room-worker/main.go index a5427835a..037dd0ffb 100644 --- a/room-worker/main.go +++ b/room-worker/main.go @@ -36,8 +36,8 @@ type config struct { Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` // Required: room-worker reads/rotates the room key on every create/add/remove path. - ValkeyAddr string `env:"VALKEY_ADDR,required"` - ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` + ValkeyAddrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` // TTL on the :prev key slot after a rotation. ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD" envDefault:"24h"` } @@ -93,8 +93,8 @@ func main() { os.Exit(1) } - keyStore, err := roomkeystore.NewValkeyStore(roomkeystore.Config{ - Addr: cfg.ValkeyAddr, + keyStore, err := roomkeystore.NewValkeyClusterStore(roomkeystore.ClusterConfig{ + Addrs: cfg.ValkeyAddrs, Password: cfg.ValkeyPassword, GracePeriod: cfg.ValkeyKeyGracePeriod, }) diff --git a/room-worker/mock_publisher_test.go b/room-worker/mock_publisher_test.go index 1eb39921d..e95d13b9b 100644 --- a/room-worker/mock_publisher_test.go +++ b/room-worker/mock_publisher_test.go @@ -31,7 +31,7 @@ func (p *mockPublisher) publishCount() int { } // stubRoomKeyStore is a zero-config RoomKeyStore for tests that don't exercise -// key behavior (production now requires Valkey via the VALKEY_ADDR=required +// key behavior (production now requires Valkey via the VALKEY_ADDRS=required // gate, so the Handler can no longer be constructed with a nil keyStore). // Tests that DO exercise key behavior should build their own MockRoomKeyStore // with explicit EXPECTations rather than using this stub. diff --git a/search-service/deploy/docker-compose.yml b/search-service/deploy/docker-compose.yml index 2850f06e9..1f154ff52 100644 --- a/search-service/deploy/docker-compose.yml +++ b/search-service/deploy/docker-compose.yml @@ -11,7 +11,7 @@ services: - SITE_ID=site-local - SEARCH_URL=http://elasticsearch:9200 - SEARCH_BACKEND=elasticsearch - - VALKEY_ADDR=valkey:6379 + - VALKEY_ADDRS=valkey:6379 - VALKEY_PASSWORD= - MONGO_URI=mongodb://mongodb:27017 - MONGO_DB=chat diff --git a/search-service/integration_test.go b/search-service/integration_test.go index 14dcd5780..9432f04c1 100644 --- a/search-service/integration_test.go +++ b/search-service/integration_test.go @@ -103,11 +103,9 @@ func setupCCSFixture(t *testing.T) *ccsFixture { require.NoError(t, err, "build searchengine for remote") t.Logf("CCS fixture: starting valkey") - valkeyAddr := startValkey(t) - valkeyClient, err := valkeyutil.Connect(ctx, valkeyAddr, "") - require.NoError(t, err, "connect valkey") + valkeyClient := valkeyutil.WrapClusterClient(testutil.StartValkeyCluster(t)) t.Cleanup(func() { valkeyutil.Disconnect(valkeyClient) }) - t.Logf("CCS fixture: valkey at %s", valkeyAddr) + t.Logf("CCS fixture: valkey started") t.Logf("CCS fixture: starting NATS") natsURL := startNATS(t) @@ -197,28 +195,6 @@ func startESForCCS(t *testing.T, nw *testcontainers.DockerNetwork, alias, cluste return fmt.Sprintf("http://%s:%s", host, port.Port()) } -func startValkey(t *testing.T) string { - t.Helper() - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: testimages.Valkey, - ExposedPorts: []string{"6379/tcp"}, - Cmd: []string{"valkey-server", "--save", "", "--appendonly", "no"}, - WaitingFor: wait.ForLog("Ready to accept connections").WithStartupTimeout(30 * time.Second), - }, - Started: true, - }) - require.NoError(t, err, "start valkey") - t.Cleanup(func() { _ = container.Terminate(ctx) }) - - host, err := container.Host(ctx) - require.NoError(t, err) - port, err := container.MappedPort(ctx, "6379") - require.NoError(t, err) - return fmt.Sprintf("%s:%s", host, port.Port()) -} - func startNATS(t *testing.T) string { t.Helper() ctx := context.Background() @@ -968,15 +944,9 @@ func setupRoomsFixture(t *testing.T) *roomsFixture { return &roomsFixture{clientNATS: clientNC, esURL: esURL} } -// newSubsValkeyClient starts a Valkey testcontainer and returns a connected -// client for use by the subs fixture. Reuses the existing startValkey helper. func newSubsValkeyClient(t *testing.T) valkeyutil.Client { t.Helper() - addr := startValkey(t) - client, err := valkeyutil.Connect(context.Background(), addr, "") - require.NoError(t, err, "connect valkey for subs fixture") - t.Cleanup(func() { valkeyutil.Disconnect(client) }) - return client + return valkeyutil.WrapClusterClient(testutil.StartValkeyCluster(t)) } // putTestSpotlightIndex creates a minimal spotlight index in ES with the diff --git a/search-service/main.go b/search-service/main.go index 5d80a928f..308b2efb2 100644 --- a/search-service/main.go +++ b/search-service/main.go @@ -34,8 +34,8 @@ type ESConfig struct { } type ValkeyConfig struct { - Addr string `env:"ADDR,required"` - Password string `env:"PASSWORD" envDefault:""` + Addrs []string `env:"ADDRS,required" envSeparator:","` + Password string `env:"PASSWORD" envDefault:""` } type NATSConfig struct { @@ -61,7 +61,7 @@ type UsersAPIConfig struct { // SearchConfig groups the request-shape knobs — size caps, cache TTL, and // the recent-window filter bound. All optional with sane defaults so a -// minimal environment only needs URL + NATS_URL + VALKEY_ADDR. +// minimal environment only needs URL + NATS_URL + VALKEY_ADDRS. type SearchConfig struct { DocCounts int `env:"DOC_COUNTS" envDefault:"25"` MaxDocCounts int `env:"MAX_DOC_COUNTS" envDefault:"100"` @@ -125,7 +125,7 @@ func main() { os.Exit(1) } - valkey, err := valkeyutil.Connect(ctx, cfg.Valkey.Addr, cfg.Valkey.Password) + valkey, err := valkeyutil.ConnectCluster(ctx, cfg.Valkey.Addrs, cfg.Valkey.Password) if err != nil { slog.Error("valkey connect failed", "error", err) os.Exit(1) @@ -202,7 +202,7 @@ func main() { slog.Info("search-service running", "site", cfg.SiteID, "backend", cfg.ES.Backend, - "valkey", cfg.Valkey.Addr, + "valkey", cfg.Valkey.Addrs, ) shutdown.Wait(ctx, 25*time.Second, diff --git a/tools/loadgen/main.go b/tools/loadgen/main.go index aee88a3af..5702cc466 100644 --- a/tools/loadgen/main.go +++ b/tools/loadgen/main.go @@ -30,18 +30,18 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL,required"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"site-local"` - MongoURI string `env:"MONGO_URI,required"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MetricsAddr string `env:"METRICS_ADDR" envDefault:":9099"` - MaxInFlight int `env:"MAX_IN_FLIGHT" envDefault:"200"` - PProfAddr string `env:"PPROF_ADDR" envDefault:""` - ValkeyAddr string `env:"VALKEY_ADDR,required"` - ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` + NatsURL string `env:"NATS_URL,required"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"site-local"` + MongoURI string `env:"MONGO_URI,required"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MetricsAddr string `env:"METRICS_ADDR" envDefault:":9099"` + MaxInFlight int `env:"MAX_IN_FLIGHT" envDefault:"200"` + PProfAddr string `env:"PPROF_ADDR" envDefault:""` + ValkeyAddrs []string `env:"VALKEY_ADDRS,required" envSeparator:","` + ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` } func main() { @@ -179,8 +179,8 @@ func connectStores(ctx context.Context, cfg *config) (*mongo.Database, roomkeyst } func connectKeyStore(cfg *config) (roomkeystore.RoomKeyStore, error) { - return roomkeystore.NewValkeyStore(roomkeystore.Config{ - Addr: cfg.ValkeyAddr, + return roomkeystore.NewValkeyClusterStore(roomkeystore.ClusterConfig{ + Addrs: cfg.ValkeyAddrs, Password: cfg.ValkeyPassword, GracePeriod: time.Hour, })