diff --git a/broadcast-worker/consumer_config_test.go b/broadcast-worker/consumer_config_test.go new file mode 100644 index 000000000..cf4411769 --- /dev/null +++ b/broadcast-worker/consumer_config_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestBuildConsumerConfig(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }) + + assert.Equal(t, "broadcast-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 250, + }) + + assert.Equal(t, "broadcast-worker", cc.Durable) + assert.Equal(t, 250, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/broadcast-worker/main.go b/broadcast-worker/main.go index c4bf37291..37ac754c4 100644 --- a/broadcast-worker/main.go +++ b/broadcast-worker/main.go @@ -28,21 +28,22 @@ type encryptionConfig struct { } type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"default"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` - UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` - ValkeyAddr string `env:"VALKEY_ADDR"` - ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` - ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD" envDefault:"24h"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` - Encryption encryptionConfig `envPrefix:"ENCRYPTION_"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"default"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + UserCacheSize int `env:"USER_CACHE_SIZE" envDefault:"10000"` + UserCacheTTL time.Duration `env:"USER_CACHE_TTL" envDefault:"5m"` + ValkeyAddr string `env:"VALKEY_ADDR"` + ValkeyPassword string `env:"VALKEY_PASSWORD" envDefault:""` + ValkeyKeyGracePeriod time.Duration `env:"VALKEY_KEY_GRACE_PERIOD" envDefault:"24h"` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + Encryption encryptionConfig `envPrefix:"ENCRYPTION_"` } func main() { @@ -115,10 +116,7 @@ func main() { canonicalCfg := stream.MessagesCanonical(cfg.SiteID) - cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ - Durable: "broadcast-worker", - AckPolicy: jetstream.AckExplicitPolicy, - }) + cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig(cfg.Consumer)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -203,3 +201,11 @@ func (p *natsPublisher) Publish(ctx context.Context, subject string, data []byte } return nil } + +// buildConsumerConfig returns the durable consumer config for +// broadcast-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig(s stream.ConsumerSettings) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "broadcast-worker" + return cc +} diff --git a/docs/superpowers/plans/2026-05-08-jetstream-consumer-defaults.md b/docs/superpowers/plans/2026-05-08-jetstream-consumer-defaults.md new file mode 100644 index 000000000..c678e8b33 --- /dev/null +++ b/docs/superpowers/plans/2026-05-08-jetstream-consumer-defaults.md @@ -0,0 +1,917 @@ +# JetStream Consumer Defaults Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Standardize JetStream durable consumer configuration across all worker services with a shared `pkg/stream` helper and per-service `MaxAckPending` recommendations. + +**Architecture:** Add `DurableConsumerDefaults()` to `pkg/stream` returning a baseline `jetstream.ConsumerConfig` with project-wide defaults. Each worker service extracts an unexported `buildConsumerConfig` helper that overlays the service's own `Durable`, `MaxAckPending`, and any service-specific overrides on top of the defaults. Helpers are unit-tested; `main.go` uses them at consumer-creation sites. + +**Tech Stack:** Go 1.25, `github.com/nats-io/nats.go/jetstream`, `testify` for assertions. + +**Spec:** `docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md` + +**Branch:** `claude/jetstream-consumer-config-JTIKh` (already checked out) + +--- + +## File Structure + +| File | Action | Responsibility | +|---|---|---| +| `pkg/stream/consumer.go` | Create | `DurableConsumerDefaults()` + exported constants | +| `pkg/stream/consumer_test.go` | Create | Unit tests for the defaults helper | +| `message-gatekeeper/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `message-gatekeeper/consumer_config_test.go` | Create | Unit test for service helper | +| `broadcast-worker/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `broadcast-worker/consumer_config_test.go` | Create | Unit test for service helper | +| `message-worker/main.go` | Modify | Replace inline `ConsumerConfig`, drop `MaxRedeliver` | +| `message-worker/consumer_config_test.go` | Create | Unit test for service helper | +| `notification-worker/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `notification-worker/consumer_config_test.go` | Create | Unit test for service helper | +| `room-worker/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `room-worker/consumer_config_test.go` | Create | Unit test for service helper | +| `inbox-worker/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `inbox-worker/consumer_config_test.go` | Create | Unit test for service helper | +| `search-sync-worker/main.go` | Modify | Replace inline `ConsumerConfig` with helper | +| `search-sync-worker/consumer_config_test.go` | Create | Unit test for service helper | + +Each service places its `buildConsumerConfig` helper in the existing `main.go` (small enough to keep there; matches the project's flat layout). Tests live in a sibling `consumer_config_test.go` to keep the helper test isolated from the larger `bootstrap_test.go` / `handler_test.go` files. + +--- + +## Task 1: Add `DurableConsumerDefaults` helper to `pkg/stream` + +**Files:** +- Create: `pkg/stream/consumer.go` +- Create: `pkg/stream/consumer_test.go` + +- [ ] **Step 1.1: Write the failing test** + +Create `pkg/stream/consumer_test.go`: + +```go +package stream_test + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestDurableConsumerDefaults(t *testing.T) { + cc := stream.DurableConsumerDefaults() + + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy, "AckPolicy") + assert.Equal(t, 30*time.Second, cc.AckWait, "AckWait") + assert.Equal(t, 5, cc.MaxDeliver, "MaxDeliver") + assert.Equal(t, 512, cc.MaxWaiting, "MaxWaiting") + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy, "DeliverPolicy") + + // Caller-owned fields are intentionally zero. + assert.Empty(t, cc.Durable, "Durable must be set by caller") + assert.Zero(t, cc.MaxAckPending, "MaxAckPending must be set by caller") + assert.Empty(t, cc.FilterSubjects, "FilterSubjects must be set by caller if needed") +} + +func TestDurableConsumerDefaultsConstants(t *testing.T) { + assert.Equal(t, 30*time.Second, stream.DefaultAckWait) + assert.Equal(t, 5, stream.DefaultMaxDeliver) + assert.Equal(t, 512, stream.DefaultMaxWaiting) +} +``` + +- [ ] **Step 1.2: Run test to verify it fails** + +Run: `make test SERVICE=pkg/stream` +Expected: FAIL — `undefined: stream.DurableConsumerDefaults`, `undefined: stream.DefaultAckWait`, etc. + +- [ ] **Step 1.3: Write the helper** + +Create `pkg/stream/consumer.go`: + +```go +package stream + +import ( + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// Project-wide defaults for durable JetStream consumers. Exported so +// individual services can reference them in tests and documentation. +const ( + DefaultAckWait = 30 * time.Second + DefaultMaxDeliver = 5 + DefaultMaxWaiting = 512 // NATS 2.10 default +) + +// DurableConsumerDefaults returns a ConsumerConfig populated with the +// project-wide standard knobs for durable JetStream consumers. +// +// Callers MUST set Durable. Callers SHOULD set MaxAckPending sized for +// their service's pull concurrency, and FilterSubjects if they need to +// scope the consumer to a subset of the stream's subjects. +// +// DeliverPolicy is honored only at consumer creation. Updating an +// existing durable via js.CreateOrUpdateConsumer does not reset its +// cursor position. +func DurableConsumerDefaults() jetstream.ConsumerConfig { + return jetstream.ConsumerConfig{ + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: DefaultAckWait, + MaxDeliver: DefaultMaxDeliver, + MaxWaiting: DefaultMaxWaiting, + DeliverPolicy: jetstream.DeliverNewPolicy, + } +} +``` + +- [ ] **Step 1.4: Run test to verify it passes** + +Run: `make test SERVICE=pkg/stream` +Expected: PASS for both `TestDurableConsumerDefaults` and `TestDurableConsumerDefaultsConstants`. + +- [ ] **Step 1.5: Lint** + +Run: `make lint` +Expected: clean. + +- [ ] **Step 1.6: Commit** + +```bash +git add pkg/stream/consumer.go pkg/stream/consumer_test.go +git commit -m "feat(pkg/stream): add DurableConsumerDefaults helper" +``` + +--- + +## Task 2: Apply defaults in `message-gatekeeper` + +**Files:** +- Modify: `message-gatekeeper/main.go:95-103` (consumer creation) +- Create: `message-gatekeeper/consumer_config_test.go` + +- [ ] **Step 2.1: Write the failing test** + +Create `message-gatekeeper/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestBuildConsumerConfig(t *testing.T) { + cc := buildConsumerConfig() + + assert.Equal(t, "message-gatekeeper", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 2.2: Run test to verify it fails** + +Run: `make test SERVICE=message-gatekeeper` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 2.3: Add the helper and rewire `main.go`** + +Open `message-gatekeeper/main.go`. At line 95-103, the current code is: + +```go +messagesCfg := stream.Messages(cfg.SiteID) +cons, err := js.CreateOrUpdateConsumer(ctx, messagesCfg.Name, jetstream.ConsumerConfig{ + Durable: "message-gatekeeper", + AckPolicy: jetstream.AckExplicitPolicy, +}) +``` + +Replace with: + +```go +messagesCfg := stream.Messages(cfg.SiteID) +cons, err := js.CreateOrUpdateConsumer(ctx, messagesCfg.Name, buildConsumerConfig()) +``` + +Add this helper to the bottom of `message-gatekeeper/main.go` (after `main()`): + +```go +// buildConsumerConfig returns the durable consumer config for +// message-gatekeeper. Centralized so it is unit-testable without NATS. +func buildConsumerConfig() jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "message-gatekeeper" + cc.MaxAckPending = 1000 + return cc +} +``` + +- [ ] **Step 2.4: Run test to verify it passes** + +Run: `make test SERVICE=message-gatekeeper` +Expected: PASS for `TestBuildConsumerConfig`. All other unit tests in the package continue to pass. + +- [ ] **Step 2.5: Verify the service still builds** + +Run: `make build SERVICE=message-gatekeeper` +Expected: clean build, no errors. + +- [ ] **Step 2.6: Lint** + +Run: `make lint` +Expected: clean. + +- [ ] **Step 2.7: Commit** + +```bash +git add message-gatekeeper/main.go message-gatekeeper/consumer_config_test.go +git commit -m "feat(message-gatekeeper): apply standard durable consumer defaults" +``` + +--- + +## Task 3: Apply defaults in `broadcast-worker` + +**Files:** +- Modify: `broadcast-worker/main.go:116-121` (consumer creation) +- Create: `broadcast-worker/consumer_config_test.go` + +- [ ] **Step 3.1: Write the failing test** + +Create `broadcast-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestBuildConsumerConfig(t *testing.T) { + cc := buildConsumerConfig() + + assert.Equal(t, "broadcast-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 3.2: Run test to verify it fails** + +Run: `make test SERVICE=broadcast-worker` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 3.3: Add the helper and rewire `main.go`** + +In `broadcast-worker/main.go` at line 116-121, the current code is: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ + Durable: "broadcast-worker", + AckPolicy: jetstream.AckExplicitPolicy, +}) +``` + +Replace with: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig()) +``` + +Add this helper to the bottom of `broadcast-worker/main.go`: + +```go +// buildConsumerConfig returns the durable consumer config for +// broadcast-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig() jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "broadcast-worker" + cc.MaxAckPending = 1000 + return cc +} +``` + +- [ ] **Step 3.4: Run test to verify it passes** + +Run: `make test SERVICE=broadcast-worker` +Expected: PASS. + +- [ ] **Step 3.5: Build and lint** + +Run: `make build SERVICE=broadcast-worker && make lint` +Expected: clean. + +- [ ] **Step 3.6: Commit** + +```bash +git add broadcast-worker/main.go broadcast-worker/consumer_config_test.go +git commit -m "feat(broadcast-worker): apply standard durable consumer defaults" +``` + +--- + +## Task 4: Apply defaults in `message-worker` and remove `MaxRedeliver` + +**Context:** `message-worker` has a custom `MaxRedeliver` config field (default `5`) referenced only in `main.go` lines 34 and 118. No deploy manifests set `MAX_REDELIVER`. Verified by `grep -rn "MAX_REDELIVER" /home/user/chat/` — only the spec doc and the field/usage. The prior code computed `MaxDeliver = MaxRedeliver + 1 = 6` (1 initial + 5 retries). The unified default `MaxDeliver = 5` (1 initial + 4 retries) is a deliberate 1-attempt reduction in retry budget — accepted as part of unifying the project standard. + +**Files:** +- Modify: `message-worker/main.go:34` (remove `MaxRedeliver` config field) +- Modify: `message-worker/main.go:113-119` (consumer creation) +- Create: `message-worker/consumer_config_test.go` + +- [ ] **Step 4.1: Write the failing test** + +Create `message-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestBuildConsumerConfig(t *testing.T) { + cc := buildConsumerConfig() + + assert.Equal(t, "message-worker", cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 4.2: Run test to verify it fails** + +Run: `make test SERVICE=message-worker` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 4.3: Remove the `MaxRedeliver` config field** + +In `message-worker/main.go` at line 34, remove this line entirely: + +```go + MaxRedeliver int `env:"MAX_REDELIVER" envDefault:"5"` +``` + +- [ ] **Step 4.4: Add the helper and rewire `main.go`** + +In `message-worker/main.go` at line 113-119, the current code is: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ + Durable: "message-worker", + AckPolicy: jetstream.AckExplicitPolicy, + MaxDeliver: cfg.MaxRedeliver + 1, // initial delivery + MaxRedeliver retries +}) +``` + +Replace with: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig()) +``` + +Add this helper to the bottom of `message-worker/main.go`: + +```go +// buildConsumerConfig returns the durable consumer config for +// message-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig() jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "message-worker" + cc.MaxAckPending = 500 + return cc +} +``` + +- [ ] **Step 4.5: Run tests to verify** + +Run: `make test SERVICE=message-worker` +Expected: PASS for `TestBuildConsumerConfig`. All other tests continue to pass — confirm `MaxRedeliver` is not referenced in any test file. + +If a test references `MaxRedeliver` (search with `grep -n MaxRedeliver message-worker/`), update it to drop the reference. The expected pre-edit grep output is empty outside `main.go`. + +- [ ] **Step 4.6: Build and lint** + +Run: `make build SERVICE=message-worker && make lint` +Expected: clean. + +- [ ] **Step 4.7: Commit** + +```bash +git add message-worker/main.go message-worker/consumer_config_test.go +git commit -m "feat(message-worker): apply standard consumer defaults, drop MaxRedeliver" +``` + +--- + +## Task 5: Apply defaults in `notification-worker` + +**Files:** +- Modify: `notification-worker/main.go:100-105` (consumer creation) +- Create: `notification-worker/consumer_config_test.go` + +- [ ] **Step 5.1: Write the failing test** + +Create `notification-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestBuildConsumerConfig(t *testing.T) { + cc := buildConsumerConfig() + + assert.Equal(t, "notification-worker", cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 5.2: Run test to verify it fails** + +Run: `make test SERVICE=notification-worker` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 5.3: Add the helper and rewire `main.go`** + +In `notification-worker/main.go` at line 100-105, the current code is: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ + Durable: "notification-worker", + AckPolicy: jetstream.AckExplicitPolicy, +}) +``` + +Replace with: + +```go +canonicalCfg := stream.MessagesCanonical(cfg.SiteID) + +cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig()) +``` + +Add this helper to the bottom of `notification-worker/main.go`: + +```go +// buildConsumerConfig returns the durable consumer config for +// notification-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig() jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "notification-worker" + cc.MaxAckPending = 500 + return cc +} +``` + +- [ ] **Step 5.4: Run test, build, lint** + +Run: `make test SERVICE=notification-worker && make build SERVICE=notification-worker && make lint` +Expected: PASS, clean build, clean lint. + +- [ ] **Step 5.5: Commit** + +```bash +git add notification-worker/main.go notification-worker/consumer_config_test.go +git commit -m "feat(notification-worker): apply standard durable consumer defaults" +``` + +--- + +## Task 6: Apply defaults in `room-worker` + +**Files:** +- Modify: `room-worker/main.go:99-101` (consumer creation) +- Create: `room-worker/consumer_config_test.go` + +- [ ] **Step 6.1: Write the failing test** + +Create `room-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestBuildConsumerConfig(t *testing.T) { + cc := buildConsumerConfig() + + assert.Equal(t, "room-worker", cc.Durable) + assert.Equal(t, 200, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 6.2: Run test to verify it fails** + +Run: `make test SERVICE=room-worker` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 6.3: Add the helper and rewire `main.go`** + +In `room-worker/main.go` at line 99-101, the current code is: + +```go +cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, jetstream.ConsumerConfig{ + Durable: "room-worker", AckPolicy: jetstream.AckExplicitPolicy, +}) +``` + +Replace with: + +```go +cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, buildConsumerConfig()) +``` + +Add this helper to the bottom of `room-worker/main.go`: + +```go +// buildConsumerConfig returns the durable consumer config for +// room-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig() jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "room-worker" + cc.MaxAckPending = 200 + return cc +} +``` + +- [ ] **Step 6.4: Run test, build, lint** + +Run: `make test SERVICE=room-worker && make build SERVICE=room-worker && make lint` +Expected: PASS, clean build, clean lint. + +- [ ] **Step 6.5: Commit** + +```bash +git add room-worker/main.go room-worker/consumer_config_test.go +git commit -m "feat(room-worker): apply standard durable consumer defaults" +``` + +--- + +## Task 7: Apply defaults in `inbox-worker` + +**Files:** +- Modify: `inbox-worker/main.go:228-235` (consumer creation, preserving `FilterSubjects`) +- Create: `inbox-worker/consumer_config_test.go` + +- [ ] **Step 7.1: Write the failing test** + +Create `inbox-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/subject" +) + +func TestBuildConsumerConfig(t *testing.T) { + siteID := "site-a" + cc := buildConsumerConfig(siteID) + + assert.Equal(t, "inbox-worker", cc.Durable) + assert.Equal(t, 100, cc.MaxAckPending) + assert.Equal(t, []string{subject.InboxAggregateAll(siteID)}, cc.FilterSubjects) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) +} +``` + +- [ ] **Step 7.2: Run test to verify it fails** + +Run: `make test SERVICE=inbox-worker` +Expected: FAIL — `undefined: buildConsumerConfig`. + +- [ ] **Step 7.3: Add the helper and rewire `main.go`** + +In `inbox-worker/main.go` at line 228-235, the current code is: + +```go +inboxCfg := stream.Inbox(cfg.SiteID) + +// Local lane is reserved for search-sync-worker; scope to aggregate.> only. +cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{ + Durable: "inbox-worker", + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubjects: []string{subject.InboxAggregateAll(cfg.SiteID)}, +}) +``` + +Replace with: + +```go +inboxCfg := stream.Inbox(cfg.SiteID) + +// Local lane is reserved for search-sync-worker; scope to aggregate.> only. +cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, buildConsumerConfig(cfg.SiteID)) +``` + +Add this helper to the bottom of `inbox-worker/main.go`: + +```go +// buildConsumerConfig returns the durable consumer config for +// inbox-worker. The site-scoped FilterSubjects keeps inbox-worker on the +// federated `aggregate.>` lane only; same-site direct publishes are +// reserved for search-sync-worker. +func buildConsumerConfig(siteID string) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = "inbox-worker" + cc.MaxAckPending = 100 + cc.FilterSubjects = []string{subject.InboxAggregateAll(siteID)} + return cc +} +``` + +- [ ] **Step 7.4: Run test, build, lint** + +Run: `make test SERVICE=inbox-worker && make build SERVICE=inbox-worker && make lint` +Expected: PASS, clean build, clean lint. + +- [ ] **Step 7.5: Commit** + +```bash +git add inbox-worker/main.go inbox-worker/consumer_config_test.go +git commit -m "feat(inbox-worker): apply standard durable consumer defaults" +``` + +--- + +## Task 8: Apply defaults in `search-sync-worker` (3 consumers) + +**Context:** `search-sync-worker` creates one consumer per collection inside a loop. The helper takes the collection and `siteID` and preserves the existing `BackOff: [1s, 5s, 30s]` and per-collection `FilterSubjects`. With `MaxDeliver = 5` (from defaults) and 3 `BackOff` entries, the 4th and 5th retries reuse the last entry (30s) — this is intentional NATS behavior. + +**Files:** +- Modify: `search-sync-worker/main.go:194-202` (consumer config construction inside the loop) +- Create: `search-sync-worker/consumer_config_test.go` + +**Reference:** `Collection` is defined in `search-sync-worker/collection.go:14`. The relevant methods on it are `ConsumerName() string` and `FilterSubjects(siteID string) []string`. The package already has fake/stub collections in `handler_test.go` (`stubCollection`, `fanOutCollection`); we'll use a similar local fake in our new test file. + +- [ ] **Step 8.1: Write the failing test** + +Create `search-sync-worker/consumer_config_test.go`: + +```go +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +type fakeCollection struct { + name string + filters []string +} + +func (f fakeCollection) ConsumerName() string { return f.name } +func (f fakeCollection) FilterSubjects(_ string) []string { return f.filters } + +func TestBuildConsumerConfig(t *testing.T) { + tests := []struct { + name string + coll fakeCollection + siteID string + wantFilters []string + }{ + { + name: "with filters", + coll: fakeCollection{name: "message-sync", filters: []string{"chat.msg.canonical.site-a.created"}}, + siteID: "site-a", + wantFilters: []string{"chat.msg.canonical.site-a.created"}, + }, + { + name: "without filters", + coll: fakeCollection{name: "spotlight-sync", filters: nil}, + siteID: "site-a", + wantFilters: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cc := buildConsumerConfig(tt.coll, tt.siteID) + + assert.Equal(t, tt.coll.name, cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, tt.wantFilters, cc.FilterSubjects) + assert.Equal(t, []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, cc.BackOff) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + } +} +``` + +`fakeCollection` satisfies enough of the `Collection` interface for `buildConsumerConfig` (which only calls `ConsumerName()` and `FilterSubjects()`). `buildConsumerConfig` accepts `Collection` as its parameter type, so the fake must implement the full `Collection` interface — but since Go interfaces are structural and `buildConsumerConfig` only invokes those two methods, define the helper to take a narrower interface: + +```go +type consumerSource interface { + ConsumerName() string + FilterSubjects(siteID string) []string +} +``` + +Use `consumerSource` as the helper's parameter type. This keeps the helper testable without implementing the full `Collection` interface in tests, and the call site in `main.go` still passes a `Collection` value, which automatically satisfies `consumerSource`. + +- [ ] **Step 8.2: Run test to verify it fails** + +Run: `make test SERVICE=search-sync-worker` +Expected: FAIL — `undefined: buildConsumerConfig`, `undefined: consumerSource`. + +- [ ] **Step 8.3: Add the helper and rewire the loop in `main.go`** + +In `search-sync-worker/main.go` at line 194-202, the current code is: + +```go +consumerCfg := jetstream.ConsumerConfig{ + Durable: coll.ConsumerName(), + AckPolicy: jetstream.AckExplicitPolicy, + BackOff: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, +} +if filters := coll.FilterSubjects(cfg.SiteID); len(filters) > 0 { + consumerCfg.FilterSubjects = filters +} +cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, consumerCfg) +``` + +Replace with: + +```go +cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, buildConsumerConfig(coll, cfg.SiteID)) +``` + +Add this helper plus the narrow `consumerSource` interface to the bottom of `search-sync-worker/main.go`: + +```go +// consumerSource is the subset of Collection that buildConsumerConfig +// needs. Narrowing keeps the helper unit-testable with a small fake. +type consumerSource interface { + ConsumerName() string + FilterSubjects(siteID string) []string +} + +// buildConsumerConfig returns the durable consumer config for one +// search-sync-worker collection. Custom BackOff is intentional: ES +// indexing benefits from progressive retries on transient failures. +// With MaxDeliver=5 from defaults and 3 BackOff entries, NATS reuses +// the last entry (30s) for the 4th and 5th retries — do not extend +// BackOff to length 5 to "fix" this; the reuse is the intended pattern. +func buildConsumerConfig(coll consumerSource, siteID string) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults() + cc.Durable = coll.ConsumerName() + cc.MaxAckPending = 500 + cc.BackOff = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second} + if filters := coll.FilterSubjects(siteID); len(filters) > 0 { + cc.FilterSubjects = filters + } + return cc +} +``` + +- [ ] **Step 8.4: Run test, build, lint** + +Run: `make test SERVICE=search-sync-worker && make build SERVICE=search-sync-worker && make lint` +Expected: PASS, clean build, clean lint. + +- [ ] **Step 8.5: Commit** + +```bash +git add search-sync-worker/main.go search-sync-worker/consumer_config_test.go +git commit -m "feat(search-sync-worker): apply standard consumer defaults" +``` + +--- + +## Task 9: Final verification + +**Files:** none modified — verification only. + +- [ ] **Step 9.1: Run the full unit test suite with race detector** + +Run: `make test` +Expected: PASS for all packages. + +- [ ] **Step 9.2: Run linter on the full repo** + +Run: `make lint` +Expected: clean. + +- [ ] **Step 9.3: Run integration tests for affected services** + +Run integration tests for the services we modified. The Makefile supports per-service runs: + +```bash +make test-integration SERVICE=message-gatekeeper +make test-integration SERVICE=broadcast-worker +make test-integration SERVICE=message-worker +make test-integration SERVICE=notification-worker +make test-integration SERVICE=room-worker +make test-integration SERVICE=inbox-worker +make test-integration SERVICE=search-sync-worker +``` + +Expected: PASS for each. Integration tests already exercise consumer creation with testcontainers; they should pass unchanged because: +- `AckPolicy` is unchanged. +- `AckWait = 30s` matches the prior NATS default. +- `MaxDeliver = 5` is permissive enough for any test. +- `MaxAckPending` is set well above each service's in-flight ceiling. +- `DeliverPolicy = DeliverNewPolicy` is honored at create-time only; testcontainer-spawned consumers are always fresh. + +If any integration test fails, investigate the specific failure before proceeding — do NOT relax the defaults to make tests pass. + +- [ ] **Step 9.4: Push the branch** + +```bash +git push -u origin claude/jetstream-consumer-config-JTIKh +``` + +Expected: push succeeds; remote prints PR URL. Do NOT open a PR — the user will do that manually after reviewing the branch. + +--- + +## Notes for the implementer + +- **Order matters**: do Task 1 first; every other task imports `stream.DurableConsumerDefaults()`. Tasks 2-8 are independent of each other and may be done in any order, but follow the order listed for clean commit history. +- **Each service commit should be self-contained**: tests, helper, and call-site update in the same commit so each commit leaves the repo in a working state. +- **No deploy manifest changes needed**: `MAX_REDELIVER` was the only env var being removed and verification confirmed nothing else sets it. +- **No `docs/client-api.md` updates**: this change does not touch any client-facing handler. +- **`make generate` is NOT required**: no store interfaces change. diff --git a/docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md b/docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md new file mode 100644 index 000000000..16ee8667d --- /dev/null +++ b/docs/superpowers/specs/2026-05-08-jetstream-consumer-defaults-design.md @@ -0,0 +1,217 @@ +# JetStream Durable Consumer Defaults — Design + +**Date:** 2026-05-08 +**Branch:** `claude/jetstream-consumer-config-JTIKh` +**Status:** Draft, awaiting user review + +## Summary + +Standardize JetStream durable consumer configuration across all worker +services by introducing a shared default builder in `pkg/stream` and +applying per-service `MaxAckPending` recommendations. Today, services set +only `AckPolicy: Explicit` and rely on NATS defaults for everything else; +this design makes the relevant knobs explicit, uniform, and discoverable. + +## Goals + +1. A single source of truth for project-wide consumer defaults + (`AckPolicy`, `AckWait`, `MaxDeliver`, `MaxWaiting`, `DeliverPolicy`). +2. Per-service `MaxAckPending` values sized to the service's pull + concurrency and per-message cost. +3. No runtime impact on existing durable consumers — NATS honors + `DeliverPolicy` only at consumer creation, so cursor positions are not + reset. +4. Preserve existing per-service customizations that already serve a + purpose (`search-sync-worker`'s progressive `BackOff`, `inbox-worker`'s + `FilterSubjects`). + +## Non-Goals + +- Changing stream configurations or the existing `bootstrap.go` opt-in + pattern. +- Touching non-JetStream NATS subscriptions (`nc.QueueSubscribe`, + request/reply handlers). +- Making `MaxAckPending` env-driven; values stay as code constants per + service to keep tuning visible in version control. +- Migrating or resetting any currently-running durable consumer. + +## Standard Defaults + +Every durable consumer in the repo gets the following baseline: + +| Field | Value | Rationale | +|-----------------|--------------------------------|---------------------------------------------------------------------------| +| `AckPolicy` | `AckExplicitPolicy` | Already the project convention; required for at-least-once semantics. | +| `AckWait` | `30 * time.Second` | Matches NATS default; long enough for Cassandra/Mongo writes + ES index. | +| `MaxDeliver` | `5` | Bounded retries before terminal failure; pairs with DLQ/log-and-drop. | +| `MaxWaiting` | `512` | NATS 2.10 default for max in-flight pull requests. | +| `DeliverPolicy` | `DeliverNewPolicy` | New consumers start at the stream head; existing consumers unaffected. | + +## Per-Service `MaxAckPending` + +Sizing rule: high-throughput services pull `2 × MAX_WORKERS` (default +`200`) into the iterator and process up to `MAX_WORKERS` (`100`) +concurrently. `MaxAckPending` must be `≥ 200` to avoid throttling the +iterator. Final values are tuned for per-message cost. + +| Service | Pattern | `MaxAckPending` | Rationale | +|----------------------------|--------------------------|-----------------|--------------------------------------------------------------------------------------------| +| `message-gatekeeper` | High-throughput pull | `1000` | Lightest per-msg work (validate + republish); allow large bursts on inbound. | +| `broadcast-worker` | High-throughput pull | `1000` | Fan-out to in-memory subscribers; fast, bursty. | +| `message-worker` | High-throughput pull | `500` | Cassandra writes are I/O bound; smaller cap limits unbounded backlog if Cassandra slows. | +| `notification-worker` | High-throughput pull | `500` | May call external push providers; bound exposure to provider latency. | +| `room-worker` | High-throughput pull | `200` | Low-volume admin/membership stream; matches in-flight ceiling exactly. | +| `inbox-worker` | Sequential `Consume()` | `100` | One-at-a-time callback; cap prefetch to avoid stale federated events. | +| `search-sync-worker` (×3) | Batch `Fetch()` | `500` each | ES indexing is batched; supports existing batch flush thresholds with headroom. | + +## Architecture & Code Layout + +### New file: `pkg/stream/consumer.go` + +```go +package stream + +import ( + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +const ( + DefaultAckWait = 30 * time.Second + DefaultMaxDeliver = 5 + DefaultMaxWaiting = 512 // NATS 2.10 default +) + +// DurableConsumerDefaults returns the project-wide standard ConsumerConfig +// for durable JetStream consumers. Callers must set Durable, and should set +// MaxAckPending and FilterSubjects as appropriate for the service. +// +// DeliverPolicy is honored only at consumer creation; updating an existing +// durable consumer does not reset its cursor. +func DurableConsumerDefaults() jetstream.ConsumerConfig { + return jetstream.ConsumerConfig{ + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: DefaultAckWait, + MaxDeliver: DefaultMaxDeliver, + MaxWaiting: DefaultMaxWaiting, + DeliverPolicy: jetstream.DeliverNewPolicy, + } +} +``` + +### Per-service `main.go` updates + +Each consumer creation site changes from: + +```go +cons, err := js.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{ + Durable: "broadcast-worker", + AckPolicy: jetstream.AckExplicitPolicy, +}) +``` + +to: + +```go +cc := stream.DurableConsumerDefaults() +cc.Durable = "broadcast-worker" +cc.MaxAckPending = 1000 +cons, err := js.CreateOrUpdateConsumer(ctx, streamName, cc) +``` + +### Service-specific overrides retained + +- **`message-worker`**: drop the existing `MaxDeliver: cfg.MaxRedeliver+1` + override; the unified `MaxDeliver = 5` from defaults applies. The + `MaxRedeliver` config field can be removed if it is not referenced + elsewhere. Note: the prior code computed `MaxDeliver = MaxRedeliver + 1` + with `MaxRedeliver` defaulting to `5`, yielding `MaxDeliver = 6` (1 + initial + 5 retries). The new project-wide default of `5` total + deliveries (1 initial + 4 retries) is a deliberate 1-attempt + reduction in `message-worker`'s retry budget — accepted as part of + unifying the project standard. +- **`inbox-worker`**: keep `FilterSubjects: + ["chat.inbox.{siteID}.aggregate.>"]`. +- **`search-sync-worker`** (all three consumers): keep `BackOff: [1s, 5s, + 30s]` and per-collection `FilterSubjects`. With `MaxDeliver = 5` and 3 + `BackOff` entries, the 4th and 5th retry intervals reuse the last entry + (`30s`), which is the documented NATS behavior. + +## Safety: Existing Consumers + +`js.CreateOrUpdateConsumer` updates mutable fields on an existing durable +but does not reset its cursor. `DeliverPolicy` is a creation-only field; +NATS ignores changes to it on update. Therefore: + +- Currently-running consumers retain their cursor positions. +- Pending/redelivered messages already queued for those consumers are not + dropped. +- New consumers (e.g., a new `siteID` deployment) start from the stream + head per `DeliverNewPolicy`. + +No migration step or operator action is required. + +## Testing + +### Unit tests in `pkg/stream/consumer_test.go` + +Table-driven assertions on `DurableConsumerDefaults()`: + +- `AckPolicy == AckExplicitPolicy` +- `AckWait == 30 * time.Second` +- `MaxDeliver == 5` +- `MaxWaiting == 512` +- `DeliverPolicy == DeliverNewPolicy` +- Returned struct does not set `Durable`, `MaxAckPending`, or + `FilterSubjects` (callers own these). + +### Per-service tests + +Extend each worker service's existing `*_test.go` to assert the constructed +`ConsumerConfig` carries the expected `MaxAckPending` for that service. +Where the consumer is built inline in `main.go`, extract the +config-construction into a small unexported helper (`buildConsumerConfig() +jetstream.ConsumerConfig`) so it can be unit-tested without standing up +NATS. This is a localized refactor consistent with the project's +testability conventions. + +### Integration tests + +No new integration tests required. Existing integration tests already +exercise the consumer end-to-end; they continue to pass with the new +defaults because: + +- `AckPolicy` is unchanged. +- `AckWait = 30s` matches the prior NATS default. +- `MaxDeliver = 5` is permissive enough for any test that previously + relied on default unlimited redeliveries (none exist in the codebase). +- `MaxAckPending` is set well above each service's in-flight ceiling. +- `DeliverPolicy = DeliverNewPolicy` only affects fresh consumer + creation, which testcontainer setups already do. + +## Rollout + +1. Land the `pkg/stream/consumer.go` helper with tests. +2. Update services in this order (each in its own commit on the same + branch): `message-gatekeeper`, `broadcast-worker`, `message-worker`, + `notification-worker`, `room-worker`, `inbox-worker`, + `search-sync-worker`. +3. Run `make lint` and `make test` after each commit; run + `make test-integration` once at the end. +4. Open a single PR with the full set of changes. + +## Risks and Open Questions + +- **`message-worker.MaxRedeliver` config field**: if removed, ensure no + deploy manifest still sets `MAX_REDELIVER`. The implementation step + should grep deploy YAML/Helm before removal. +- **`MaxAckPending` ceilings under sustained backlog**: chosen values + assume current load profiles. If a future load test reveals throttling + on `message-worker` at `500`, raise to `1000`. No design change needed — + it's a one-line constant. +- **`search-sync-worker` `BackOff` length vs. `MaxDeliver`**: NATS reuses + the last `BackOff` entry for retries beyond the array length. This is + the desired behavior here, but the implementation plan should add a + comment in `search-sync-worker/main.go` documenting the interaction so + future maintainers do not "fix" it by extending `BackOff` to length 5. diff --git a/inbox-worker/consumer_config_test.go b/inbox-worker/consumer_config_test.go new file mode 100644 index 000000000..633d476a7 --- /dev/null +++ b/inbox-worker/consumer_config_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" + "github.com/hmchangw/chat/pkg/subject" +) + +func TestBuildConsumerConfig(t *testing.T) { + siteID := "site-a" + + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }, siteID) + + assert.Equal(t, "inbox-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, []string{subject.InboxAggregateAll(siteID)}, cc.FilterSubjects) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 100, + }, siteID) + + assert.Equal(t, "inbox-worker", cc.Durable) + assert.Equal(t, 100, cc.MaxAckPending) + assert.Equal(t, []string{subject.InboxAggregateAll(siteID)}, cc.FilterSubjects) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/inbox-worker/main.go b/inbox-worker/main.go index a7c59d86d..f18163624 100644 --- a/inbox-worker/main.go +++ b/inbox-worker/main.go @@ -25,14 +25,15 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"default"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"default"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } // mongoInboxStore implements InboxStore using MongoDB. @@ -228,11 +229,7 @@ func main() { inboxCfg := stream.Inbox(cfg.SiteID) // Local lane is reserved for search-sync-worker; scope to aggregate.> only. - cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{ - Durable: "inbox-worker", - AckPolicy: jetstream.AckExplicitPolicy, - FilterSubjects: []string{subject.InboxAggregateAll(cfg.SiteID)}, - }) + cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, buildConsumerConfig(cfg.Consumer, cfg.SiteID)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -270,3 +267,14 @@ func main() { func(ctx context.Context) error { mongoutil.Disconnect(ctx, mongoClient); return nil }, ) } + +// buildConsumerConfig returns the durable consumer config for +// inbox-worker. The site-scoped FilterSubjects keeps inbox-worker on the +// federated `aggregate.>` lane only; same-site direct publishes are +// reserved for search-sync-worker. +func buildConsumerConfig(s stream.ConsumerSettings, siteID string) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "inbox-worker" + cc.FilterSubjects = []string{subject.InboxAggregateAll(siteID)} + return cc +} diff --git a/message-gatekeeper/consumer_config_test.go b/message-gatekeeper/consumer_config_test.go new file mode 100644 index 000000000..5c5d4f8ad --- /dev/null +++ b/message-gatekeeper/consumer_config_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestBuildConsumerConfig(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }) + + assert.Equal(t, "message-gatekeeper", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 250, + }) + + assert.Equal(t, "message-gatekeeper", cc.Durable) + assert.Equal(t, 250, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/message-gatekeeper/main.go b/message-gatekeeper/main.go index f36128066..b69d84e42 100644 --- a/message-gatekeeper/main.go +++ b/message-gatekeeper/main.go @@ -22,17 +22,18 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL,required"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID,required"` - MongoURI string `env:"MONGO_URI,required"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - LargeRoomThreshold int `env:"LARGE_ROOM_THRESHOLD" envDefault:"500"` - ChatBaseURL string `env:"CHAT_BASE_URL" envDefault:"http://localhost:3000"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL,required"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID,required"` + MongoURI string `env:"MONGO_URI,required"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + LargeRoomThreshold int `env:"LARGE_ROOM_THRESHOLD" envDefault:"500"` + ChatBaseURL string `env:"CHAT_BASE_URL" envDefault:"http://localhost:3000"` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -93,10 +94,7 @@ func main() { } messagesCfg := stream.Messages(cfg.SiteID) - cons, err := js.CreateOrUpdateConsumer(ctx, messagesCfg.Name, jetstream.ConsumerConfig{ - Durable: "message-gatekeeper", - AckPolicy: jetstream.AckExplicitPolicy, - }) + cons, err := js.CreateOrUpdateConsumer(ctx, messagesCfg.Name, buildConsumerConfig(cfg.Consumer)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -152,3 +150,11 @@ func main() { func(ctx context.Context) error { mongoutil.Disconnect(ctx, mongoClient); return nil }, ) } + +// buildConsumerConfig returns the durable consumer config for +// message-gatekeeper. Centralized so it is unit-testable without NATS. +func buildConsumerConfig(s stream.ConsumerSettings) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "message-gatekeeper" + return cc +} diff --git a/message-worker/consumer_config_test.go b/message-worker/consumer_config_test.go new file mode 100644 index 000000000..b78260b79 --- /dev/null +++ b/message-worker/consumer_config_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestBuildConsumerConfig(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }) + + assert.Equal(t, "message-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 500, + }) + + assert.Equal(t, "message-worker", cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/message-worker/main.go b/message-worker/main.go index 31845f591..ca9db7571 100644 --- a/message-worker/main.go +++ b/message-worker/main.go @@ -23,20 +23,20 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL,required"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID,required"` - CassandraHosts string `env:"CASSANDRA_HOSTS" envDefault:"localhost"` - CassandraKeyspace string `env:"CASSANDRA_KEYSPACE" envDefault:"chat"` - CassandraUsername string `env:"CASSANDRA_USERNAME" envDefault:""` - CassandraPassword string `env:"CASSANDRA_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - MaxRedeliver int `env:"MAX_REDELIVER" envDefault:"5"` - MongoURI string `env:"MONGO_URI,required"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL,required"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID,required"` + CassandraHosts string `env:"CASSANDRA_HOSTS" envDefault:"localhost"` + CassandraKeyspace string `env:"CASSANDRA_KEYSPACE" envDefault:"chat"` + CassandraUsername string `env:"CASSANDRA_USERNAME" envDefault:""` + CassandraPassword string `env:"CASSANDRA_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + MongoURI string `env:"MONGO_URI,required"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -112,11 +112,7 @@ func main() { canonicalCfg := stream.MessagesCanonical(cfg.SiteID) - cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ - Durable: "message-worker", - AckPolicy: jetstream.AckExplicitPolicy, - MaxDeliver: cfg.MaxRedeliver + 1, // initial delivery + MaxRedeliver retries - }) + cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig(cfg.Consumer)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -173,3 +169,11 @@ func main() { func(ctx context.Context) error { mongoutil.Disconnect(ctx, mongoClient); return nil }, ) } + +// buildConsumerConfig returns the durable consumer config for +// message-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig(s stream.ConsumerSettings) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "message-worker" + return cc +} diff --git a/notification-worker/consumer_config_test.go b/notification-worker/consumer_config_test.go new file mode 100644 index 000000000..f62860097 --- /dev/null +++ b/notification-worker/consumer_config_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestBuildConsumerConfig(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }) + + assert.Equal(t, "notification-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 500, + }) + + assert.Equal(t, "notification-worker", cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/notification-worker/main.go b/notification-worker/main.go index ace7713ba..bf2dcf723 100644 --- a/notification-worker/main.go +++ b/notification-worker/main.go @@ -24,15 +24,16 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"default"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"default"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } // mongoMemberLookup implements MemberLookup using MongoDB. @@ -99,10 +100,7 @@ func main() { canonicalCfg := stream.MessagesCanonical(cfg.SiteID) - cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, jetstream.ConsumerConfig{ - Durable: "notification-worker", - AckPolicy: jetstream.AckExplicitPolicy, - }) + cons, err := js.CreateOrUpdateConsumer(ctx, canonicalCfg.Name, buildConsumerConfig(cfg.Consumer)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -182,3 +180,11 @@ func (p *natsPublisher) Publish(ctx context.Context, subject string, data []byte } return nil } + +// buildConsumerConfig returns the durable consumer config for +// notification-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig(s stream.ConsumerSettings) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "notification-worker" + return cc +} diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go new file mode 100644 index 000000000..66996e9d4 --- /dev/null +++ b/pkg/stream/consumer.go @@ -0,0 +1,41 @@ +package stream + +import ( + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +// ConsumerSettings holds the env-driven knobs for durable JetStream +// consumers. Embed in each service's Config with envPrefix:"CONSUMER_". +// +// Defaults are set on the struct tags so caarlos0/env supplies them when +// the env vars are unset. Operators tune per-service values via the +// service's deployment env (e.g. CONSUMER_MAX_ACK_PENDING). +type ConsumerSettings struct { + AckWait time.Duration `env:"ACK_WAIT" envDefault:"30s"` + MaxDeliver int `env:"MAX_DELIVER" envDefault:"5"` + MaxWaiting int `env:"MAX_WAITING" envDefault:"512"` + MaxAckPending int `env:"MAX_ACK_PENDING" envDefault:"1000"` +} + +// DurableConsumerDefaults returns a ConsumerConfig populated from the +// supplied ConsumerSettings plus the project-wide architectural +// invariants (AckPolicy=Explicit, DeliverPolicy=New). +// +// Callers MUST set Durable. Callers MAY set FilterSubjects to scope the +// consumer to a subset of the stream's subjects. +// +// DeliverPolicy is honored only at consumer creation. Updating an +// existing durable via js.CreateOrUpdateConsumer does not reset its +// cursor position. +func DurableConsumerDefaults(s ConsumerSettings) jetstream.ConsumerConfig { + return jetstream.ConsumerConfig{ + AckPolicy: jetstream.AckExplicitPolicy, + DeliverPolicy: jetstream.DeliverNewPolicy, + AckWait: s.AckWait, + MaxDeliver: s.MaxDeliver, + MaxWaiting: s.MaxWaiting, + MaxAckPending: s.MaxAckPending, + } +} diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go new file mode 100644 index 000000000..737e5db1f --- /dev/null +++ b/pkg/stream/consumer_test.go @@ -0,0 +1,79 @@ +package stream_test + +import ( + "testing" + "time" + + "github.com/caarlos0/env/v11" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestDurableConsumerDefaults(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + s := stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 750, + } + cc := stream.DurableConsumerDefaults(s) + + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy, "AckPolicy invariant") + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy, "DeliverPolicy invariant") + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + assert.Equal(t, 750, cc.MaxAckPending) + + assert.Empty(t, cc.Durable, "Durable must be set by caller") + assert.Empty(t, cc.FilterSubjects, "FilterSubjects must be set by caller if needed") + }) + + t.Run("zero settings produce zero values", func(t *testing.T) { + cc := stream.DurableConsumerDefaults(stream.ConsumerSettings{}) + + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + assert.Zero(t, cc.AckWait) + assert.Zero(t, cc.MaxDeliver) + assert.Zero(t, cc.MaxWaiting) + assert.Zero(t, cc.MaxAckPending) + }) +} + +func TestConsumerSettingsEnvDefaults(t *testing.T) { + type holder struct { + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + } + + var h holder + require.NoError(t, env.Parse(&h)) + + assert.Equal(t, 30*time.Second, h.Consumer.AckWait) + assert.Equal(t, 5, h.Consumer.MaxDeliver) + assert.Equal(t, 512, h.Consumer.MaxWaiting) + assert.Equal(t, 1000, h.Consumer.MaxAckPending) +} + +func TestConsumerSettingsEnvOverrides(t *testing.T) { + type holder struct { + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + } + + t.Setenv("CONSUMER_ACK_WAIT", "10s") + t.Setenv("CONSUMER_MAX_DELIVER", "7") + t.Setenv("CONSUMER_MAX_WAITING", "1024") + t.Setenv("CONSUMER_MAX_ACK_PENDING", "250") + + var h holder + require.NoError(t, env.Parse(&h)) + + assert.Equal(t, 10*time.Second, h.Consumer.AckWait) + assert.Equal(t, 7, h.Consumer.MaxDeliver) + assert.Equal(t, 1024, h.Consumer.MaxWaiting) + assert.Equal(t, 250, h.Consumer.MaxAckPending) +} diff --git a/room-worker/consumer_config_test.go b/room-worker/consumer_config_test.go new file mode 100644 index 000000000..755f7d0f7 --- /dev/null +++ b/room-worker/consumer_config_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +func TestBuildConsumerConfig(t *testing.T) { + t.Run("propagates settings", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + }) + + assert.Equal(t, "room-worker", cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + + t.Run("overrides flow through", func(t *testing.T) { + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 200, + }) + + assert.Equal(t, "room-worker", cc.Durable) + assert.Equal(t, 200, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + }) +} diff --git a/room-worker/main.go b/room-worker/main.go index 2775fbe20..c384c12b9 100644 --- a/room-worker/main.go +++ b/room-worker/main.go @@ -22,15 +22,16 @@ import ( ) type config struct { - NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` - NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` - SiteID string `env:"SITE_ID" envDefault:"site-local"` - MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` - MongoDB string `env:"MONGO_DB" envDefault:"chat"` - MongoUsername string `env:"MONGO_USERNAME" envDefault:""` - MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` - MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` + NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` + SiteID string `env:"SITE_ID" envDefault:"site-local"` + MongoURI string `env:"MONGO_URI" envDefault:"mongodb://localhost:27017"` + MongoDB string `env:"MONGO_DB" envDefault:"chat"` + MongoUsername string `env:"MONGO_USERNAME" envDefault:""` + MongoPassword string `env:"MONGO_PASSWORD" envDefault:""` + MaxWorkers int `env:"MAX_WORKERS" envDefault:"100"` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -96,9 +97,7 @@ func main() { os.Exit(1) } - cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, jetstream.ConsumerConfig{ - Durable: "room-worker", AckPolicy: jetstream.AckExplicitPolicy, - }) + cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, buildConsumerConfig(cfg.Consumer)) if err != nil { slog.Error("create consumer failed", "error", err) os.Exit(1) @@ -154,3 +153,11 @@ func main() { func(ctx context.Context) error { mongoutil.Disconnect(ctx, mongoClient); return nil }, ) } + +// buildConsumerConfig returns the durable consumer config for +// room-worker. Centralized so it is unit-testable without NATS. +func buildConsumerConfig(s stream.ConsumerSettings) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = "room-worker" + return cc +} diff --git a/search-sync-worker/consumer_config_test.go b/search-sync-worker/consumer_config_test.go new file mode 100644 index 000000000..877ea7402 --- /dev/null +++ b/search-sync-worker/consumer_config_test.go @@ -0,0 +1,84 @@ +package main + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" + + "github.com/hmchangw/chat/pkg/stream" +) + +type fakeCollection struct { + name string + filters []string +} + +func (f fakeCollection) ConsumerName() string { return f.name } +func (f fakeCollection) FilterSubjects(_ string) []string { return f.filters } + +func TestBuildConsumerConfig(t *testing.T) { + defaultSettings := stream.ConsumerSettings{ + AckWait: 30 * time.Second, + MaxDeliver: 5, + MaxWaiting: 512, + MaxAckPending: 1000, + } + + t.Run("propagates settings", func(t *testing.T) { + tests := []struct { + name string + coll fakeCollection + siteID string + wantFilters []string + }{ + { + name: "with filters", + coll: fakeCollection{name: "message-sync", filters: []string{"chat.msg.canonical.site-a.created"}}, + siteID: "site-a", + wantFilters: []string{"chat.msg.canonical.site-a.created"}, + }, + { + name: "without filters", + coll: fakeCollection{name: "spotlight-sync", filters: nil}, + siteID: "site-a", + wantFilters: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cc := buildConsumerConfig(defaultSettings, tt.coll, tt.siteID) + + assert.Equal(t, tt.coll.name, cc.Durable) + assert.Equal(t, 1000, cc.MaxAckPending) + assert.Equal(t, tt.wantFilters, cc.FilterSubjects) + assert.Equal(t, []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, cc.BackOff) + assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy) + assert.Equal(t, 30*time.Second, cc.AckWait) + assert.Equal(t, 5, cc.MaxDeliver) + assert.Equal(t, 512, cc.MaxWaiting) + assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy) + }) + } + }) + + t.Run("overrides flow through", func(t *testing.T) { + coll := fakeCollection{name: "message-sync", filters: []string{"chat.msg.canonical.site-a.created"}} + cc := buildConsumerConfig(stream.ConsumerSettings{ + AckWait: 45 * time.Second, + MaxDeliver: 3, + MaxWaiting: 256, + MaxAckPending: 500, + }, coll, "site-a") + + assert.Equal(t, "message-sync", cc.Durable) + assert.Equal(t, 500, cc.MaxAckPending) + assert.Equal(t, 45*time.Second, cc.AckWait) + assert.Equal(t, 3, cc.MaxDeliver) + assert.Equal(t, 256, cc.MaxWaiting) + // BackOff is hardcoded by buildConsumerConfig, not from settings. + assert.Equal(t, []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, cc.BackOff) + }) +} diff --git a/search-sync-worker/main.go b/search-sync-worker/main.go index 7c78dfe7e..2d98a687b 100644 --- a/search-sync-worker/main.go +++ b/search-sync-worker/main.go @@ -75,7 +75,8 @@ type config struct { // idle / low-traffic periods. BulkFlushInterval int `env:"BULK_FLUSH_INTERVAL" envDefault:"5"` - Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` + Consumer stream.ConsumerSettings `envPrefix:"CONSUMER_"` + Bootstrap bootstrapConfig `envPrefix:"BOOTSTRAP_"` } func main() { @@ -191,14 +192,7 @@ func main() { } } - consumerCfg := jetstream.ConsumerConfig{ - Durable: coll.ConsumerName(), - AckPolicy: jetstream.AckExplicitPolicy, - BackOff: []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}, - } - if filters := coll.FilterSubjects(cfg.SiteID); len(filters) > 0 { - consumerCfg.FilterSubjects = filters - } + consumerCfg := buildConsumerConfig(cfg.Consumer, coll, cfg.SiteID) cons, err := js.CreateOrUpdateConsumer(ctx, streamCfg.Name, consumerCfg) if err != nil { slog.Error("create consumer failed", @@ -357,3 +351,26 @@ type engineAdapter struct { func (a *engineAdapter) Bulk(ctx context.Context, actions []searchengine.BulkAction) ([]searchengine.BulkResult, error) { return a.engine.Bulk(ctx, actions) } + +// consumerSource is the subset of Collection that buildConsumerConfig +// needs. Narrowing keeps the helper unit-testable with a small fake. +type consumerSource interface { + ConsumerName() string + FilterSubjects(siteID string) []string +} + +// buildConsumerConfig returns the durable consumer config for one +// search-sync-worker collection. Custom BackOff is intentional: ES +// indexing benefits from progressive retries on transient failures. +// With MaxDeliver=5 from defaults and 3 BackOff entries, NATS reuses +// the last entry (30s) for the 4th and 5th retries — do not extend +// BackOff to length 5 to "fix" this; the reuse is the intended pattern. +func buildConsumerConfig(s stream.ConsumerSettings, coll consumerSource, siteID string) jetstream.ConsumerConfig { + cc := stream.DurableConsumerDefaults(s) + cc.Durable = coll.ConsumerName() + cc.BackOff = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second} + if filters := coll.FilterSubjects(siteID); len(filters) > 0 { + cc.FilterSubjects = filters + } + return cc +}