Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions tools/loadgen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,29 @@ make -C tools/loadgen/deploy reset-members PRESET=members-medium
| preset | rooms | baseline | candidate pool | use case |
|--------------------|-------|----------|----------------|-----------------------------------------|
| `members-small` | 5 | 10 | 50 | smoke / dev |
| `members-medium` | 100 | 100 | 500 | sustained-throughput default |
| `members-medium` | 100 | 100 | 900 | sustained-throughput default |
| `members-heavy` | 700 | 10 | 990 | high-rate sustained (≈1000 req/s) |
| `members-capacity` | 5 | 1 | 990 | capacity-growth, fills up to ~MAX_ROOM_SIZE |

A candidate is single-use — once added it's a room member and can't be
re-added, and `baseline + candidate pool` is capped at `MAX_ROOM_SIZE` (1000).
So a sustained run can make at most `rooms × ⌊candidate pool ÷ users-per-add⌋`
add-member publishes total. `members-medium` (100 × ⌊900÷10⌋ = 9000 ops)
sustains the default `RATE=100 DURATION=60s` (6000 ops) with margin;
`members-small` is a smoke preset and cannot sustain that load.

For higher rates, add rooms rather than pool (pool is capped per room). To
sustain **1000 req/s for 60s** (60,000 ops) at the default `users-per-add=10`,
use `members-heavy` (700 × ⌊990÷10⌋ = 69,300 ops, ≈69s of headroom):

```
make -C tools/loadgen/deploy seed-members PRESET=members-heavy
make -C tools/loadgen/deploy run-sustained PRESET=members-heavy RATE=1000 DURATION=60s
```

If instead each request need only add one member, `members-medium` at
`USERS_PER_ADD=1` already supplies 90,000 ops — no heavy preset required.

### Subcommands

- `loadgen seed --workload=members --preset=<name>` — populate Mongo
Expand All @@ -143,14 +163,19 @@ for the rationale and the v2 plan.

- **Sustained mode**: `final_pending == 0` on room-worker + zero errors →
pipeline is sustaining the target rate. Climbing `final_pending` or
non-zero errors → over capacity. If you see `aborted early — pools
exhausted` in the logs, your `rate × duration × users-per-add` exceeded
the preset's `CandidatePool` budget; pick a bigger preset or shorter
duration.
non-zero errors → over capacity. If `rate × duration` would exceed the
preset's pool budget (see the preset table above), the command now
**refuses to start** and prints the achievable max `--rate`/`--duration`
for the preset — lower one of them or pick a bigger preset. (The old
behaviour ran for ~50s and then logged `aborted early — pools exhausted`.)
- **Capacity mode**: the size-bucket table shows latency at four
size ranges; the `final sizes` block confirms each room hit
`--target-size`. A row with `count > 0` whose `e2_p99` is much larger
than smaller-size buckets indicates a per-room-size degradation.
than smaller-size buckets indicates a per-room-size degradation. Like
sustained mode, capacity mode **refuses to start** if `--target-size`
is unreachable from the preset's per-room pool (`baseline +
⌊pool ÷ users-per-add⌋ × users-per-add`); it prints the reachable
ceiling — lower `--target-size` or pick a larger preset.

## History workload (LoadHistory / GetThreadMessages benchmark)

Expand Down
21 changes: 19 additions & 2 deletions tools/loadgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,16 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int {
return 2
}

// Preflight: a candidate is single-use, so rate*duration cannot exceed
// the preset's pool budget. Build the deterministic fixtures up front and
// fail fast with an actionable message before any NATS/store work rather
// than aborting mid-run on exhaustion.
fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID)
if err := ValidateSustainedCapacity(p.Name, pools, *rate, *duration, *usersPerAdd); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return 2
}

nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile)
if err != nil {
slog.Error("nats connect", "error", err)
Expand All @@ -341,7 +351,6 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int {
}
}()

fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID)
owners := OwnersByRoom(&fixtures)
collector := NewMemberCollector(metrics, p.Name, injectMode)

Expand Down Expand Up @@ -521,6 +530,15 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int {
return 2
}

// Preflight: capacity mode grows each room to target-size from its single-use
// pool. Build the deterministic fixtures up front and reject an unreachable
// target before any NATS/store work instead of silently under-filling rooms.
fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID)
if err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, *targetSize, *usersPerAdd); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return 2
}

nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile)
if err != nil {
slog.Error("nats connect", "error", err)
Expand All @@ -544,7 +562,6 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int {
}
}()

fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID)
owners := OwnersByRoom(&fixtures)
collector := NewMemberCollector(metrics, p.Name, injectMode)

Expand Down
6 changes: 5 additions & 1 deletion tools/loadgen/members.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ var builtinMembersPresets = map[string]MembersPreset{
},
"members-medium": {
Name: "members-medium", Users: 5000, Rooms: 100,
BaselineSize: 100, CandidatePool: 500,
BaselineSize: 100, CandidatePool: 900, // baseline+pool = 1000 = MAX_ROOM_SIZE
},
"members-capacity": {
Name: "members-capacity", Users: 12000, Rooms: 5,
BaselineSize: 1, CandidatePool: 990, // fits under MAX_ROOM_SIZE=1000
},
"members-heavy": {
Name: "members-heavy", Users: 5000, Rooms: 700,
BaselineSize: 10, CandidatePool: 990, // 700 × ⌊990/10⌋ = 69,300 ops ≈ 69s at 1000/s
},
}

// BuiltinMembersPreset looks up a preset by name.
Expand Down
206 changes: 206 additions & 0 deletions tools/loadgen/members_capacity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSustainableOps(t *testing.T) {
tests := []struct {
name string
pools CandidatePools
usersPerAdd int
want int
}{
{"nil pools", nil, 10, 0},
{"empty pools", CandidatePools{}, 10, 0},
{"zero usersPerAdd", CandidatePools{"r1": make([]string, 50)}, 0, 0},
{"negative usersPerAdd", CandidatePools{"r1": make([]string, 50)}, -1, 0},
{"single room exact", CandidatePools{"r1": make([]string, 50)}, 10, 5},
{"single room floor", CandidatePools{"r1": make([]string, 55)}, 10, 5},
{"multi room", CandidatePools{"r1": make([]string, 50), "r2": make([]string, 30)}, 10, 8},
{"room below threshold contributes zero", CandidatePools{"r1": make([]string, 9)}, 10, 0},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.want, SustainableOps(tc.pools, tc.usersPerAdd))
})
}
}

func TestValidateSustainedCapacity_WithinBudget(t *testing.T) {
// capacity = 2 rooms * floor(50/10) = 10 ops; demand = 5/s * 1s = 5 ops.
pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)}
err := ValidateSustainedCapacity("members-x", pools, 5, time.Second, 10)
assert.NoError(t, err)
}

func TestValidateSustainedCapacity_AtBudgetBoundary(t *testing.T) {
// capacity = 10 ops; demand = 10/s * 1s = 10 ops — exactly equal must pass.
pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)}
err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 10)
assert.NoError(t, err)
}

func TestValidateSustainedCapacity_Oversubscribed(t *testing.T) {
// capacity = floor(60000/10) = 6000 ops; demand = 100/s * 120s = 12000 ops.
// Achievable bounds are non-degenerate: maxRate=50, maxDuration=60s.
pools := CandidatePools{"r1": make([]string, 60000)}
err := ValidateSustainedCapacity("members-x", pools, 100, 120*time.Second, 10)
require.Error(t, err)
assert.True(t, errors.Is(err, ErrInsufficientPool),
"oversubscribed validation must wrap ErrInsufficientPool")
// The message must be actionable: name the preset and the achievable bounds.
msg := err.Error()
assert.Contains(t, msg, "members-x")
assert.Contains(t, msg, "--rate 50")
assert.Contains(t, msg, "--duration 1m0s")
}

func TestValidateSustainedCapacity_TooSmallForWorkload(t *testing.T) {
// capacity = 25 ops; at rate=100/60s the achievable bounds round to zero,
// so the message must steer to a larger preset rather than "--rate 0".
pools := CandidatePools{"r1": make([]string, 250)}
err := ValidateSustainedCapacity("members-small", pools, 100, 60*time.Second, 10)
require.Error(t, err)
assert.True(t, errors.Is(err, ErrInsufficientPool))
msg := err.Error()
assert.Contains(t, msg, "members-small")
assert.Contains(t, msg, "larger preset")
assert.NotContains(t, msg, "--rate 0", "must not suggest a nonsensical zero rate")
assert.NotContains(t, msg, "--duration 0s", "must not suggest a nonsensical zero duration")
}

func TestValidateSustainedCapacity_ZeroUsersPerAdd(t *testing.T) {
pools := CandidatePools{"r1": make([]string, 50)}
err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 0)
require.Error(t, err)
assert.True(t, errors.Is(err, ErrInsufficientPool))
}

// The default members-sustained invocation (rate=100, duration=60s,
// usersPerAdd=10) must be satisfiable by the members-medium preset so the
// documented out-of-box command completes instead of aborting on exhaustion.
func TestMembersMedium_SustainsDefaultInvocation(t *testing.T) {
p, ok := BuiltinMembersPreset("members-medium")
require.True(t, ok)
_, pools := BuildMembersFixtures(&p, 42, "site-A")

const (
defaultRate = 100
defaultDuration = 60 * time.Second
defaultUsersPerAdd = 10
)
require.NoError(t,
ValidateSustainedCapacity(p.Name, pools, defaultRate, defaultDuration, defaultUsersPerAdd),
"members-medium must sustain the default 100rps/60s/10-per-add invocation")

// Stays under MAX_ROOM_SIZE (1000): baseline members + full pool per room.
for roomID, pool := range pools {
assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000,
"room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID)
}
}

// members-heavy is the high-rate preset: it must sustain rate=1000 for 60s at
// the default users-per-add=10 (60,000 ops) so a 1000 req/s steady-state run
// clears the preflight guard instead of being capped to a short burst.
func TestMembersHeavy_SustainsThousandRPS(t *testing.T) {
p, ok := BuiltinMembersPreset("members-heavy")
require.True(t, ok)
_, pools := BuildMembersFixtures(&p, 42, "site-A")

const (
rate = 1000
duration = 60 * time.Second
usersPerAdd = 10
)
require.NoError(t,
ValidateSustainedCapacity(p.Name, pools, rate, duration, usersPerAdd),
"members-heavy must sustain rate=1000/60s at users-per-add=10")

for roomID, pool := range pools {
assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000,
"room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID)
}
}

func TestValidateCapacityTarget(t *testing.T) {
// Each room grows by full users-per-add batches, so a room of pool P reaches
// baseline + ⌊P/usersPerAdd⌋*usersPerAdd at most.
tests := []struct {
name string
pools CandidatePools
baseline int
targetSize int
usersPerAdd int
wantErr bool
wantContains []string
}{
{
name: "target below baseline is a no-op",
pools: CandidatePools{"r1": make([]string, 0)},
baseline: 10, targetSize: 5, usersPerAdd: 10,
wantErr: false,
},
{
name: "zero usersPerAdd guarded by caller",
pools: CandidatePools{"r1": make([]string, 50)},
baseline: 10, targetSize: 100, usersPerAdd: 0,
wantErr: false,
},
{
name: "every room reaches target",
pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 100)},
baseline: 10, targetSize: 100, usersPerAdd: 10, // need 9 batches, have 10
wantErr: false,
},
{
name: "a room falls short",
pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 50)},
baseline: 10, targetSize: 100, usersPerAdd: 10, // r2 has 5 batches < 9 needed
wantErr: true,
wantContains: []string{"members-x", "target-size=100", "--target-size to 60"}, // 10 + 5*10
},
{
name: "pool below one batch reaches only baseline",
pools: CandidatePools{"r1": make([]string, 9)},
baseline: 10, targetSize: 11, usersPerAdd: 10, // need 1 batch, have 0
wantErr: true,
wantContains: []string{"--target-size to 10"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := ValidateCapacityTarget("members-x", tc.pools, tc.baseline, tc.targetSize, tc.usersPerAdd)
if !tc.wantErr {
assert.NoError(t, err)
return
}
require.Error(t, err)
assert.True(t, errors.Is(err, ErrInsufficientPool))
for _, sub := range tc.wantContains {
assert.Contains(t, err.Error(), sub)
}
})
}
}

// members-capacity must clear its preflight at the documented growth target.
func TestMembersCapacity_ReachesDocumentedTarget(t *testing.T) {
p, ok := BuiltinMembersPreset("members-capacity")
require.True(t, ok)
_, pools := BuildMembersFixtures(&p, 42, "site-A")

require.NoError(t,
ValidateCapacityTarget(p.Name, pools, p.BaselineSize, 500, 10),
"members-capacity must reach target-size=500")
// Asking past baseline+pool must be rejected with the reachable ceiling.
err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, p.BaselineSize+p.CandidatePool+1, 10)
require.Error(t, err)
assert.True(t, errors.Is(err, ErrInsufficientPool))
}
Loading
Loading