From 6732297ef03104a3a2dbf6558cd0a9ff7aca7a5f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 05:55:05 +0000 Subject: [PATCH 1/4] fix(loadgen): stamp X-Request-ID header on members-sustained publishes The members-sustained (and members-capacity) generators mint a corrID via idgen.GenerateRequestID() but the canonical and frontdoor member publishers never put it on the NATS message. Backend services require the header: room-worker rejects canonical events with no X-Request-ID as a permanent error, and room-service requires it on the frontdoor request. Stamp corrID as the X-Request-ID header on both publish paths so requests are accepted and trace end-to-end, matching the daily scenario's pattern. https://claude.ai/code/session_014eeLP8ENEQxFhC4SjaAY27 --- tools/loadgen/members_publisher.go | 22 ++++++-- tools/loadgen/members_publisher_test.go | 67 +++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/tools/loadgen/members_publisher.go b/tools/loadgen/members_publisher.go index fca2c7df4..48c85db47 100644 --- a/tools/loadgen/members_publisher.go +++ b/tools/loadgen/members_publisher.go @@ -10,6 +10,7 @@ import ( "github.com/nats-io/nats.go/jetstream" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/subject" ) @@ -30,12 +31,19 @@ func newCanonicalMemberPublisher(js jetstream.JetStream, siteID string) *canonic } func (p *canonicalMemberPublisher) Publish(ctx context.Context, _ string, _ string, - req *model.AddMembersRequest, _ string) error { + req *model.AddMembersRequest, corrID string) error { data, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshal add-members canonical event: %w", err) } - if _, err := p.js.Publish(ctx, subject.RoomCanonical(p.siteID, "member.add"), data); err != nil { + // room-worker rejects canonical events without an X-Request-ID header as a + // permanent error; stamp corrID so the event carries one and traces end-to-end. + msg := &nats.Msg{ + Subject: subject.RoomCanonical(p.siteID, "member.add"), + Data: data, + Header: nats.Header{natsutil.RequestIDHeader: []string{corrID}}, + } + if _, err := p.js.PublishMsg(ctx, msg); err != nil { return fmt.Errorf("jetstream publish: %w", err) } return nil @@ -89,7 +97,15 @@ func (p *frontdoorMemberPublisher) Publish(_ context.Context, requesterAccount, } subj := subject.MemberAdd(requesterAccount, roomID, p.siteID) reply := p.inboxPrefix + "." + corrID - if err := p.nc.PublishRequest(subj, reply, data); err != nil { + // room-service requires an X-Request-ID header on the request; stamp corrID + // so the request ID and reply-correlation token are the same across the pipeline. + msg := &nats.Msg{ + Subject: subj, + Reply: reply, + Data: data, + Header: nats.Header{natsutil.RequestIDHeader: []string{corrID}}, + } + if err := p.nc.PublishMsg(msg); err != nil { return fmt.Errorf("nats publish request: %w", err) } return nil diff --git a/tools/loadgen/members_publisher_test.go b/tools/loadgen/members_publisher_test.go index 0a9698cea..1df5c3260 100644 --- a/tools/loadgen/members_publisher_test.go +++ b/tools/loadgen/members_publisher_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/stream" "github.com/hmchangw/chat/pkg/subject" ) @@ -76,6 +77,72 @@ func TestCanonicalMemberPublisher_PublishesToRoomCanonical(t *testing.T) { } } +func TestCanonicalMemberPublisher_SetsRequestIDHeader(t *testing.T) { + nc, js := startEmbeddedJetStream(t) + + siteID := "site-A" + _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: stream.Rooms(siteID).Name, + Subjects: stream.Rooms(siteID).Subjects, + }) + require.NoError(t, err) + + captured := make(chan *nats.Msg, 1) + sub, err := nc.Subscribe(subject.RoomCanonicalWildcard(siteID), func(m *nats.Msg) { + captured <- m + }) + require.NoError(t, err) + defer func() { _ = sub.Unsubscribe() }() + + p := newCanonicalMemberPublisher(js, siteID) + + req := &model.AddMembersRequest{ + RoomID: "room-1", + Users: []string{"u1", "u2"}, + RequesterAccount: "owner-1", + Timestamp: time.Now().UTC().UnixMilli(), + } + const corrID = "01970a4f-8c2d-7c9a-abcd-e0123456789f" + require.NoError(t, p.Publish(context.Background(), "owner-1", "room-1", req, corrID)) + + select { + case m := <-captured: + assert.Equal(t, corrID, m.Header.Get(natsutil.RequestIDHeader), + "canonical publish must carry the corrID as the X-Request-ID header") + case <-time.After(2 * time.Second): + t.Fatal("did not receive canonical publish within 2s") + } +} + +func TestFrontdoorMemberPublisher_SetsRequestIDHeader(t *testing.T) { + nc, _ := startEmbeddedJetStream(t) + siteID := "site-A" + + captured := make(chan *nats.Msg, 1) + sub, err := nc.Subscribe(subject.MemberAddWildcard(siteID), func(m *nats.Msg) { + captured <- m + _ = m.Respond([]byte(`{"status":"accepted"}`)) + }) + require.NoError(t, err) + defer func() { _ = sub.Unsubscribe() }() + + p, err := newFrontdoorMemberPublisher(nc, siteID, func(string, []byte, time.Time) {}) + require.NoError(t, err) + defer p.Close() + + req := &model.AddMembersRequest{RoomID: "room-X", Users: []string{"u1"}} + const corrID = "01970a4f-8c2d-7c9a-abcd-e0123456789f" + require.NoError(t, p.Publish(context.Background(), "owner-9", "room-X", req, corrID)) + + select { + case m := <-captured: + assert.Equal(t, corrID, m.Header.Get(natsutil.RequestIDHeader), + "frontdoor publish must carry the corrID as the X-Request-ID header") + case <-time.After(2 * time.Second): + t.Fatal("never received request") + } +} + func TestFrontdoorMemberPublisher_RequestReply(t *testing.T) { nc, _ := startEmbeddedJetStream(t) siteID := "site-A" From 02e7a8698ce85804c9e35f2c31bd388854021d66 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 07:21:58 +0000 Subject: [PATCH 2/4] fix(loadgen): prevent members-sustained pool exhaustion at default config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Candidate accounts are single-use (once added they're room members and can't be re-added; baseline+pool is capped at MAX_ROOM_SIZE), so a sustained run can make at most rooms × floor(pool/usersPerAdd) publishes. The default invocation (rate=100, duration=60s, users-per-add=10 = 6000 ops) exceeded every preset's budget, so the run always aborted with ErrPoolsExhausted — members-medium ran ~50s before failing, smaller presets exhausted during warmup. - Add a preflight capacity guard (SustainableOps + ValidateSustainedCapacity) that fails fast with exit 2 before any NATS/store work, printing the achievable max --rate/--duration for the preset (or steering to a larger preset when it's too small to sustain any sensible run). - Right-size members-medium's candidate pool 500 -> 900 (baseline 100 + 900 = 1000 = MAX_ROOM_SIZE) so the documented default command completes with margin (9000-op budget vs 6000-op demand). - Update README preset table and the over-capacity guidance. https://claude.ai/code/session_014eeLP8ENEQxFhC4SjaAY27 --- tools/loadgen/README.md | 18 +++-- tools/loadgen/main.go | 11 ++- tools/loadgen/members.go | 2 +- tools/loadgen/members_capacity_test.go | 107 +++++++++++++++++++++++++ tools/loadgen/members_generator.go | 58 ++++++++++++++ 5 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 tools/loadgen/members_capacity_test.go diff --git a/tools/loadgen/README.md b/tools/loadgen/README.md index a86c2b277..2a35b1c3a 100644 --- a/tools/loadgen/README.md +++ b/tools/loadgen/README.md @@ -114,9 +114,16 @@ make -C tools/loadgen/deploy reset-members PRESET=members-medium | preset | rooms | baseline | candidate pool | use case | |--------------------|-------|----------|----------------|-----------------------------------------| | `members-small` | 5 | 10 | 50 | smoke / dev | -| `members-medium` | 100 | 100 | 500 | sustained-throughput default | +| `members-medium` | 100 | 100 | 900 | sustained-throughput default | | `members-capacity` | 5 | 1 | 990 | capacity-growth, fills up to ~MAX_ROOM_SIZE | +A candidate is single-use — once added it's a room member and can't be +re-added, and `baseline + candidate pool` is capped at `MAX_ROOM_SIZE` (1000). +So a sustained run can make at most `rooms × ⌊candidate pool ÷ users-per-add⌋` +add-member publishes total. `members-medium` (100 × ⌊900÷10⌋ = 9000 ops) +sustains the default `RATE=100 DURATION=60s` (6000 ops) with margin; +`members-small` is a smoke preset and cannot sustain that load. + ### Subcommands - `loadgen seed --workload=members --preset=` — populate Mongo @@ -143,10 +150,11 @@ for the rationale and the v2 plan. - **Sustained mode**: `final_pending == 0` on room-worker + zero errors → pipeline is sustaining the target rate. Climbing `final_pending` or - non-zero errors → over capacity. If you see `aborted early — pools - exhausted` in the logs, your `rate × duration × users-per-add` exceeded - the preset's `CandidatePool` budget; pick a bigger preset or shorter - duration. + non-zero errors → over capacity. If `rate × duration` would exceed the + preset's pool budget (see the preset table above), the command now + **refuses to start** and prints the achievable max `--rate`/`--duration` + for the preset — lower one of them or pick a bigger preset. (The old + behaviour ran for ~50s and then logged `aborted early — pools exhausted`.) - **Capacity mode**: the size-bucket table shows latency at four size ranges; the `final sizes` block confirms each room hit `--target-size`. A row with `count > 0` whose `e2_p99` is much larger diff --git a/tools/loadgen/main.go b/tools/loadgen/main.go index 9204b9c0b..9bb04ecdf 100644 --- a/tools/loadgen/main.go +++ b/tools/loadgen/main.go @@ -318,6 +318,16 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int { return 2 } + // Preflight: a candidate is single-use, so rate*duration cannot exceed + // the preset's pool budget. Build the deterministic fixtures up front and + // fail fast with an actionable message before any NATS/store work rather + // than aborting mid-run on exhaustion. + fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) + if err := ValidateSustainedCapacity(p.Name, pools, *rate, *duration, *usersPerAdd); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + return 2 + } + nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) if err != nil { slog.Error("nats connect", "error", err) @@ -341,7 +351,6 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int { } }() - fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) owners := OwnersByRoom(&fixtures) collector := NewMemberCollector(metrics, p.Name, injectMode) diff --git a/tools/loadgen/members.go b/tools/loadgen/members.go index dbb8ca7bd..4879234b3 100644 --- a/tools/loadgen/members.go +++ b/tools/loadgen/members.go @@ -47,7 +47,7 @@ var builtinMembersPresets = map[string]MembersPreset{ }, "members-medium": { Name: "members-medium", Users: 5000, Rooms: 100, - BaselineSize: 100, CandidatePool: 500, + BaselineSize: 100, CandidatePool: 900, // baseline+pool = 1000 = MAX_ROOM_SIZE }, "members-capacity": { Name: "members-capacity", Users: 12000, Rooms: 5, diff --git a/tools/loadgen/members_capacity_test.go b/tools/loadgen/members_capacity_test.go new file mode 100644 index 000000000..230e03b5a --- /dev/null +++ b/tools/loadgen/members_capacity_test.go @@ -0,0 +1,107 @@ +package main + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSustainableOps(t *testing.T) { + tests := []struct { + name string + pools CandidatePools + usersPerAdd int + want int + }{ + {"nil pools", nil, 10, 0}, + {"empty pools", CandidatePools{}, 10, 0}, + {"zero usersPerAdd", CandidatePools{"r1": make([]string, 50)}, 0, 0}, + {"negative usersPerAdd", CandidatePools{"r1": make([]string, 50)}, -1, 0}, + {"single room exact", CandidatePools{"r1": make([]string, 50)}, 10, 5}, + {"single room floor", CandidatePools{"r1": make([]string, 55)}, 10, 5}, + {"multi room", CandidatePools{"r1": make([]string, 50), "r2": make([]string, 30)}, 10, 8}, + {"room below threshold contributes zero", CandidatePools{"r1": make([]string, 9)}, 10, 0}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, SustainableOps(tc.pools, tc.usersPerAdd)) + }) + } +} + +func TestValidateSustainedCapacity_WithinBudget(t *testing.T) { + // capacity = 2 rooms * floor(50/10) = 10 ops; demand = 5/s * 1s = 5 ops. + pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 5, time.Second, 10) + assert.NoError(t, err) +} + +func TestValidateSustainedCapacity_AtBudgetBoundary(t *testing.T) { + // capacity = 10 ops; demand = 10/s * 1s = 10 ops — exactly equal must pass. + pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 10) + assert.NoError(t, err) +} + +func TestValidateSustainedCapacity_Oversubscribed(t *testing.T) { + // capacity = floor(60000/10) = 6000 ops; demand = 100/s * 120s = 12000 ops. + // Achievable bounds are non-degenerate: maxRate=50, maxDuration=60s. + pools := CandidatePools{"r1": make([]string, 60000)} + err := ValidateSustainedCapacity("members-x", pools, 100, 120*time.Second, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool), + "oversubscribed validation must wrap ErrInsufficientPool") + // The message must be actionable: name the preset and the achievable bounds. + msg := err.Error() + assert.Contains(t, msg, "members-x") + assert.Contains(t, msg, "--rate 50") + assert.Contains(t, msg, "--duration 1m0s") +} + +func TestValidateSustainedCapacity_TooSmallForWorkload(t *testing.T) { + // capacity = 25 ops; at rate=100/60s the achievable bounds round to zero, + // so the message must steer to a larger preset rather than "--rate 0". + pools := CandidatePools{"r1": make([]string, 250)} + err := ValidateSustainedCapacity("members-small", pools, 100, 60*time.Second, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) + msg := err.Error() + assert.Contains(t, msg, "members-small") + assert.Contains(t, msg, "larger preset") + assert.NotContains(t, msg, "--rate 0", "must not suggest a nonsensical zero rate") + assert.NotContains(t, msg, "--duration 0s", "must not suggest a nonsensical zero duration") +} + +func TestValidateSustainedCapacity_ZeroUsersPerAdd(t *testing.T) { + pools := CandidatePools{"r1": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 0) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) +} + +// The default members-sustained invocation (rate=100, duration=60s, +// usersPerAdd=10) must be satisfiable by the members-medium preset so the +// documented out-of-box command completes instead of aborting on exhaustion. +func TestMembersMedium_SustainsDefaultInvocation(t *testing.T) { + p, ok := BuiltinMembersPreset("members-medium") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + const ( + defaultRate = 100 + defaultDuration = 60 * time.Second + defaultUsersPerAdd = 10 + ) + require.NoError(t, + ValidateSustainedCapacity(p.Name, pools, defaultRate, defaultDuration, defaultUsersPerAdd), + "members-medium must sustain the default 100rps/60s/10-per-add invocation") + + // Stays under MAX_ROOM_SIZE (1000): baseline members + full pool per room. + for roomID, pool := range pools { + assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000, + "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) + } +} diff --git a/tools/loadgen/members_generator.go b/tools/loadgen/members_generator.go index 709e3ca59..192e01669 100644 --- a/tools/loadgen/members_generator.go +++ b/tools/loadgen/members_generator.go @@ -49,6 +49,64 @@ type SustainedMembersGenerator struct { // fewer than UsersPerAdd accounts remaining. var ErrPoolsExhausted = errors.New("candidate pool exhausted on every room: rate * duration * usersPerAdd exceeded preset CandidatePool") +// ErrInsufficientPool is returned by ValidateSustainedCapacity (and wrapped +// with the specifics) when the requested rate/duration would consume more +// candidates than the preset's pools can supply. It exists so callers can +// match the preflight failure with errors.Is. +var ErrInsufficientPool = errors.New("insufficient candidate pool for requested rate and duration") + +// SustainableOps returns the maximum number of add-member publishes the given +// candidate pools can supply at usersPerAdd accounts each. A candidate is +// single-use (once added, the account is a room member and cannot be re-added), +// so this is a hard ceiling on the publishes a sustained run can ever make. +func SustainableOps(pools CandidatePools, usersPerAdd int) int { + if usersPerAdd <= 0 { + return 0 + } + total := 0 + for _, pool := range pools { + total += len(pool) / usersPerAdd + } + return total +} + +// ValidateSustainedCapacity is the preflight guard for members-sustained. It +// fails fast (before any NATS/store work) when rate*duration would exhaust the +// preset's candidate pools, returning an actionable error wrapping +// ErrInsufficientPool that names the achievable max --rate and --duration. +func ValidateSustainedCapacity(presetName string, pools CandidatePools, rate int, duration time.Duration, usersPerAdd int) error { + capacity := SustainableOps(pools, usersPerAdd) + durSecs := duration.Seconds() + demand := int(float64(rate) * durSecs) + if demand <= capacity { + return nil + } + maxRate := 0 + if durSecs > 0 { + maxRate = int(float64(capacity) / durSecs) + } + maxDuration := time.Duration(0) + if rate > 0 { + maxDuration = (time.Duration(capacity) * time.Second) / time.Duration(rate) + } + // When the achievable bounds round to zero the preset simply cannot sustain + // this workload (a smoke preset asked to run a real load). Steer to a bigger + // preset / smaller --users-per-add instead of suggesting "--rate 0". + var advice string + if maxRate >= 1 && maxDuration >= time.Second { + advice = fmt.Sprintf("lower to --rate %d (at this duration) or --duration %s (at this rate), or pick a larger preset", + maxRate, maxDuration.Round(time.Second)) + } else { + advice = fmt.Sprintf("too small for this workload: at --rate %d it sustains only ~%s — pick a larger preset or lower --users-per-add (and --duration)", + rate, maxDuration.Round(time.Millisecond)) + } + return fmt.Errorf( + "preset %q candidate pools supply at most %d add-member ops at users-per-add=%d, "+ + "but rate=%d/s × duration=%s needs ~%d ops; %s: %w", + presetName, capacity, usersPerAdd, rate, duration, demand, advice, ErrInsufficientPool, + ) +} + // NewSustainedMembersGenerator clones the candidate pools so the input is // not mutated. func NewSustainedMembersGenerator(cfg *SustainedMembersConfig, seed int64) *SustainedMembersGenerator { From 7e1a39fdb427c14fb8d5fdbf644bbb4c8b4d8131 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 08:06:56 +0000 Subject: [PATCH 3/4] loadgen: add members-heavy preset for high-rate sustained runs 700 rooms x baseline 10 x candidate pool 990 yields 69,300 add-member ops, enough to sustain rate=1000/60s (60,000 ops) at the default users-per-add=10. Pool stays capped at MAX_ROOM_SIZE; capacity scales via room count. https://claude.ai/code/session_014eeLP8ENEQxFhC4SjaAY27 --- tools/loadgen/README.md | 13 +++++++++++++ tools/loadgen/members.go | 4 ++++ tools/loadgen/members_capacity_test.go | 23 +++++++++++++++++++++++ tools/loadgen/members_test.go | 2 +- 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/tools/loadgen/README.md b/tools/loadgen/README.md index 2a35b1c3a..866ee623b 100644 --- a/tools/loadgen/README.md +++ b/tools/loadgen/README.md @@ -115,6 +115,7 @@ make -C tools/loadgen/deploy reset-members PRESET=members-medium |--------------------|-------|----------|----------------|-----------------------------------------| | `members-small` | 5 | 10 | 50 | smoke / dev | | `members-medium` | 100 | 100 | 900 | sustained-throughput default | +| `members-heavy` | 700 | 10 | 990 | high-rate sustained (≈1000 req/s) | | `members-capacity` | 5 | 1 | 990 | capacity-growth, fills up to ~MAX_ROOM_SIZE | A candidate is single-use — once added it's a room member and can't be @@ -124,6 +125,18 @@ add-member publishes total. `members-medium` (100 × ⌊900÷10⌋ = 9000 ops) sustains the default `RATE=100 DURATION=60s` (6000 ops) with margin; `members-small` is a smoke preset and cannot sustain that load. +For higher rates, add rooms rather than pool (pool is capped per room). To +sustain **1000 req/s for 60s** (60,000 ops) at the default `users-per-add=10`, +use `members-heavy` (700 × ⌊990÷10⌋ = 69,300 ops, ≈69s of headroom): + +``` +make -C tools/loadgen/deploy seed-members PRESET=members-heavy +make -C tools/loadgen/deploy run-sustained PRESET=members-heavy RATE=1000 DURATION=60s +``` + +If instead each request need only add one member, `members-medium` at +`USERS_PER_ADD=1` already supplies 90,000 ops — no heavy preset required. + ### Subcommands - `loadgen seed --workload=members --preset=` — populate Mongo diff --git a/tools/loadgen/members.go b/tools/loadgen/members.go index 4879234b3..67406d59f 100644 --- a/tools/loadgen/members.go +++ b/tools/loadgen/members.go @@ -53,6 +53,10 @@ var builtinMembersPresets = map[string]MembersPreset{ Name: "members-capacity", Users: 12000, Rooms: 5, BaselineSize: 1, CandidatePool: 990, // fits under MAX_ROOM_SIZE=1000 }, + "members-heavy": { + Name: "members-heavy", Users: 5000, Rooms: 700, + BaselineSize: 10, CandidatePool: 990, // 700 × ⌊990/10⌋ = 69,300 ops ≈ 69s at 1000/s + }, } // BuiltinMembersPreset looks up a preset by name. diff --git a/tools/loadgen/members_capacity_test.go b/tools/loadgen/members_capacity_test.go index 230e03b5a..d4b9bf7a9 100644 --- a/tools/loadgen/members_capacity_test.go +++ b/tools/loadgen/members_capacity_test.go @@ -105,3 +105,26 @@ func TestMembersMedium_SustainsDefaultInvocation(t *testing.T) { "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) } } + +// members-heavy is the high-rate preset: it must sustain rate=1000 for 60s at +// the default users-per-add=10 (60,000 ops) so a 1000 req/s steady-state run +// clears the preflight guard instead of being capped to a short burst. +func TestMembersHeavy_SustainsThousandRPS(t *testing.T) { + p, ok := BuiltinMembersPreset("members-heavy") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + const ( + rate = 1000 + duration = 60 * time.Second + usersPerAdd = 10 + ) + require.NoError(t, + ValidateSustainedCapacity(p.Name, pools, rate, duration, usersPerAdd), + "members-heavy must sustain rate=1000/60s at users-per-add=10") + + for roomID, pool := range pools { + assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000, + "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) + } +} diff --git a/tools/loadgen/members_test.go b/tools/loadgen/members_test.go index 08ed51010..74c04ebc3 100644 --- a/tools/loadgen/members_test.go +++ b/tools/loadgen/members_test.go @@ -66,7 +66,7 @@ func TestValidateInjectShape(t *testing.T) { } func TestBuiltinMembersPreset(t *testing.T) { - cases := []string{"members-small", "members-medium", "members-capacity"} + cases := []string{"members-small", "members-medium", "members-capacity", "members-heavy"} for _, name := range cases { t.Run(name, func(t *testing.T) { p, ok := BuiltinMembersPreset(name) From 785e8bd0ca97e5ad2cd3b57dabccfb0bb2165e55 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 05:12:20 +0000 Subject: [PATCH 4/4] loadgen: preflight guard for members-capacity unreachable target Capacity mode grows each room to --target-size from its single-use pool and never sends a partial batch, so a room with pool P reaches at most baseline + floor(P/users-per-add)*users-per-add. ValidateCapacityTarget rejects an unreachable target before any NATS/store work, naming how many rooms fall short and the reachable ceiling, instead of silently under-filling rooms. https://claude.ai/code/session_014eeLP8ENEQxFhC4SjaAY27 --- tools/loadgen/README.md | 6 +- tools/loadgen/main.go | 10 +++- tools/loadgen/members_capacity_test.go | 76 ++++++++++++++++++++++++++ tools/loadgen/members_generator.go | 37 ++++++++++++- 4 files changed, 126 insertions(+), 3 deletions(-) diff --git a/tools/loadgen/README.md b/tools/loadgen/README.md index 866ee623b..98d3f299e 100644 --- a/tools/loadgen/README.md +++ b/tools/loadgen/README.md @@ -171,7 +171,11 @@ for the rationale and the v2 plan. - **Capacity mode**: the size-bucket table shows latency at four size ranges; the `final sizes` block confirms each room hit `--target-size`. A row with `count > 0` whose `e2_p99` is much larger - than smaller-size buckets indicates a per-room-size degradation. + than smaller-size buckets indicates a per-room-size degradation. Like + sustained mode, capacity mode **refuses to start** if `--target-size` + is unreachable from the preset's per-room pool (`baseline + + ⌊pool ÷ users-per-add⌋ × users-per-add`); it prints the reachable + ceiling — lower `--target-size` or pick a larger preset. ## History workload (LoadHistory / GetThreadMessages benchmark) diff --git a/tools/loadgen/main.go b/tools/loadgen/main.go index 9bb04ecdf..20a1ce32f 100644 --- a/tools/loadgen/main.go +++ b/tools/loadgen/main.go @@ -530,6 +530,15 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int { return 2 } + // Preflight: capacity mode grows each room to target-size from its single-use + // pool. Build the deterministic fixtures up front and reject an unreachable + // target before any NATS/store work instead of silently under-filling rooms. + fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) + if err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, *targetSize, *usersPerAdd); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + return 2 + } + nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) if err != nil { slog.Error("nats connect", "error", err) @@ -553,7 +562,6 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int { } }() - fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) owners := OwnersByRoom(&fixtures) collector := NewMemberCollector(metrics, p.Name, injectMode) diff --git a/tools/loadgen/members_capacity_test.go b/tools/loadgen/members_capacity_test.go index d4b9bf7a9..bb1649b60 100644 --- a/tools/loadgen/members_capacity_test.go +++ b/tools/loadgen/members_capacity_test.go @@ -128,3 +128,79 @@ func TestMembersHeavy_SustainsThousandRPS(t *testing.T) { "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) } } + +func TestValidateCapacityTarget(t *testing.T) { + // Each room grows by full users-per-add batches, so a room of pool P reaches + // baseline + ⌊P/usersPerAdd⌋*usersPerAdd at most. + tests := []struct { + name string + pools CandidatePools + baseline int + targetSize int + usersPerAdd int + wantErr bool + wantContains []string + }{ + { + name: "target below baseline is a no-op", + pools: CandidatePools{"r1": make([]string, 0)}, + baseline: 10, targetSize: 5, usersPerAdd: 10, + wantErr: false, + }, + { + name: "zero usersPerAdd guarded by caller", + pools: CandidatePools{"r1": make([]string, 50)}, + baseline: 10, targetSize: 100, usersPerAdd: 0, + wantErr: false, + }, + { + name: "every room reaches target", + pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 100)}, + baseline: 10, targetSize: 100, usersPerAdd: 10, // need 9 batches, have 10 + wantErr: false, + }, + { + name: "a room falls short", + pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 50)}, + baseline: 10, targetSize: 100, usersPerAdd: 10, // r2 has 5 batches < 9 needed + wantErr: true, + wantContains: []string{"members-x", "target-size=100", "--target-size to 60"}, // 10 + 5*10 + }, + { + name: "pool below one batch reaches only baseline", + pools: CandidatePools{"r1": make([]string, 9)}, + baseline: 10, targetSize: 11, usersPerAdd: 10, // need 1 batch, have 0 + wantErr: true, + wantContains: []string{"--target-size to 10"}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := ValidateCapacityTarget("members-x", tc.pools, tc.baseline, tc.targetSize, tc.usersPerAdd) + if !tc.wantErr { + assert.NoError(t, err) + return + } + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) + for _, sub := range tc.wantContains { + assert.Contains(t, err.Error(), sub) + } + }) + } +} + +// members-capacity must clear its preflight at the documented growth target. +func TestMembersCapacity_ReachesDocumentedTarget(t *testing.T) { + p, ok := BuiltinMembersPreset("members-capacity") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + require.NoError(t, + ValidateCapacityTarget(p.Name, pools, p.BaselineSize, 500, 10), + "members-capacity must reach target-size=500") + // Asking past baseline+pool must be rejected with the reachable ceiling. + err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, p.BaselineSize+p.CandidatePool+1, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) +} diff --git a/tools/loadgen/members_generator.go b/tools/loadgen/members_generator.go index 192e01669..7fb1db906 100644 --- a/tools/loadgen/members_generator.go +++ b/tools/loadgen/members_generator.go @@ -53,7 +53,7 @@ var ErrPoolsExhausted = errors.New("candidate pool exhausted on every room: rate // with the specifics) when the requested rate/duration would consume more // candidates than the preset's pools can supply. It exists so callers can // match the preflight failure with errors.Is. -var ErrInsufficientPool = errors.New("insufficient candidate pool for requested rate and duration") +var ErrInsufficientPool = errors.New("insufficient candidate pool for requested workload") // SustainableOps returns the maximum number of add-member publishes the given // candidate pools can supply at usersPerAdd accounts each. A candidate is @@ -107,6 +107,41 @@ func ValidateSustainedCapacity(presetName string, pools CandidatePools, rate int ) } +// ValidateCapacityTarget is the preflight guard for members-capacity. Capacity +// mode grows each room to targetSize by adding usersPerAdd members per batch and +// never sends a final partial batch, so a room with pool P reaches at most +// baseline + ⌊P/usersPerAdd⌋·usersPerAdd. The binding constraint is per-room +// pool depth (not aggregate throughput), so this fails fast when any room can't +// reach targetSize, naming how many rooms fall short and the reachable ceiling. +func ValidateCapacityTarget(presetName string, pools CandidatePools, baselineSize, targetSize, usersPerAdd int) error { + if usersPerAdd <= 0 || targetSize <= baselineSize { + return nil // nothing to grow, or usersPerAdd validated by the caller + } + batchesNeeded := (targetSize - baselineSize + usersPerAdd - 1) / usersPerAdd + short := 0 + minBatches := -1 + for _, pool := range pools { + batches := len(pool) / usersPerAdd + if batches < batchesNeeded { + short++ + if minBatches < 0 || batches < minBatches { + minBatches = batches + } + } + } + if short == 0 { + return nil + } + maxTarget := baselineSize + minBatches*usersPerAdd + return fmt.Errorf( + "preset %q cannot grow %d/%d rooms to target-size=%d at users-per-add=%d: "+ + "reaching it needs %d add batches/room but the thinnest pool supplies %d; "+ + "lower --target-size to %d or pick a larger preset: %w", + presetName, short, len(pools), targetSize, usersPerAdd, + batchesNeeded, minBatches, maxTarget, ErrInsufficientPool, + ) +} + // NewSustainedMembersGenerator clones the candidate pools so the input is // not mutated. func NewSustainedMembersGenerator(cfg *SustainedMembersConfig, seed int64) *SustainedMembersGenerator {