diff --git a/tools/loadgen/README.md b/tools/loadgen/README.md index a86c2b277..98d3f299e 100644 --- a/tools/loadgen/README.md +++ b/tools/loadgen/README.md @@ -114,9 +114,29 @@ make -C tools/loadgen/deploy reset-members PRESET=members-medium | preset | rooms | baseline | candidate pool | use case | |--------------------|-------|----------|----------------|-----------------------------------------| | `members-small` | 5 | 10 | 50 | smoke / dev | -| `members-medium` | 100 | 100 | 500 | sustained-throughput default | +| `members-medium` | 100 | 100 | 900 | sustained-throughput default | +| `members-heavy` | 700 | 10 | 990 | high-rate sustained (≈1000 req/s) | | `members-capacity` | 5 | 1 | 990 | capacity-growth, fills up to ~MAX_ROOM_SIZE | +A candidate is single-use — once added it's a room member and can't be +re-added, and `baseline + candidate pool` is capped at `MAX_ROOM_SIZE` (1000). +So a sustained run can make at most `rooms × ⌊candidate pool ÷ users-per-add⌋` +add-member publishes total. `members-medium` (100 × ⌊900÷10⌋ = 9000 ops) +sustains the default `RATE=100 DURATION=60s` (6000 ops) with margin; +`members-small` is a smoke preset and cannot sustain that load. + +For higher rates, add rooms rather than pool (pool is capped per room). To +sustain **1000 req/s for 60s** (60,000 ops) at the default `users-per-add=10`, +use `members-heavy` (700 × ⌊990÷10⌋ = 69,300 ops, ≈69s of headroom): + +``` +make -C tools/loadgen/deploy seed-members PRESET=members-heavy +make -C tools/loadgen/deploy run-sustained PRESET=members-heavy RATE=1000 DURATION=60s +``` + +If instead each request need only add one member, `members-medium` at +`USERS_PER_ADD=1` already supplies 90,000 ops — no heavy preset required. + ### Subcommands - `loadgen seed --workload=members --preset=` — populate Mongo @@ -143,14 +163,19 @@ for the rationale and the v2 plan. - **Sustained mode**: `final_pending == 0` on room-worker + zero errors → pipeline is sustaining the target rate. Climbing `final_pending` or - non-zero errors → over capacity. If you see `aborted early — pools - exhausted` in the logs, your `rate × duration × users-per-add` exceeded - the preset's `CandidatePool` budget; pick a bigger preset or shorter - duration. + non-zero errors → over capacity. If `rate × duration` would exceed the + preset's pool budget (see the preset table above), the command now + **refuses to start** and prints the achievable max `--rate`/`--duration` + for the preset — lower one of them or pick a bigger preset. (The old + behaviour ran for ~50s and then logged `aborted early — pools exhausted`.) - **Capacity mode**: the size-bucket table shows latency at four size ranges; the `final sizes` block confirms each room hit `--target-size`. A row with `count > 0` whose `e2_p99` is much larger - than smaller-size buckets indicates a per-room-size degradation. + than smaller-size buckets indicates a per-room-size degradation. Like + sustained mode, capacity mode **refuses to start** if `--target-size` + is unreachable from the preset's per-room pool (`baseline + + ⌊pool ÷ users-per-add⌋ × users-per-add`); it prints the reachable + ceiling — lower `--target-size` or pick a larger preset. ## History workload (LoadHistory / GetThreadMessages benchmark) diff --git a/tools/loadgen/main.go b/tools/loadgen/main.go index 9204b9c0b..20a1ce32f 100644 --- a/tools/loadgen/main.go +++ b/tools/loadgen/main.go @@ -318,6 +318,16 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int { return 2 } + // Preflight: a candidate is single-use, so rate*duration cannot exceed + // the preset's pool budget. Build the deterministic fixtures up front and + // fail fast with an actionable message before any NATS/store work rather + // than aborting mid-run on exhaustion. + fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) + if err := ValidateSustainedCapacity(p.Name, pools, *rate, *duration, *usersPerAdd); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + return 2 + } + nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) if err != nil { slog.Error("nats connect", "error", err) @@ -341,7 +351,6 @@ func runMembersSustained(ctx context.Context, cfg *config, args []string) int { } }() - fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) owners := OwnersByRoom(&fixtures) collector := NewMemberCollector(metrics, p.Name, injectMode) @@ -521,6 +530,15 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int { return 2 } + // Preflight: capacity mode grows each room to target-size from its single-use + // pool. Build the deterministic fixtures up front and reject an unreachable + // target before any NATS/store work instead of silently under-filling rooms. + fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) + if err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, *targetSize, *usersPerAdd); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + return 2 + } + nc, err := natsutil.Connect(cfg.NatsURL, cfg.NatsCredsFile) if err != nil { slog.Error("nats connect", "error", err) @@ -544,7 +562,6 @@ func runMembersCapacity(ctx context.Context, cfg *config, args []string) int { } }() - fixtures, pools := BuildMembersFixtures(&p, *seed, cfg.SiteID) owners := OwnersByRoom(&fixtures) collector := NewMemberCollector(metrics, p.Name, injectMode) diff --git a/tools/loadgen/members.go b/tools/loadgen/members.go index dbb8ca7bd..67406d59f 100644 --- a/tools/loadgen/members.go +++ b/tools/loadgen/members.go @@ -47,12 +47,16 @@ var builtinMembersPresets = map[string]MembersPreset{ }, "members-medium": { Name: "members-medium", Users: 5000, Rooms: 100, - BaselineSize: 100, CandidatePool: 500, + BaselineSize: 100, CandidatePool: 900, // baseline+pool = 1000 = MAX_ROOM_SIZE }, "members-capacity": { Name: "members-capacity", Users: 12000, Rooms: 5, BaselineSize: 1, CandidatePool: 990, // fits under MAX_ROOM_SIZE=1000 }, + "members-heavy": { + Name: "members-heavy", Users: 5000, Rooms: 700, + BaselineSize: 10, CandidatePool: 990, // 700 × ⌊990/10⌋ = 69,300 ops ≈ 69s at 1000/s + }, } // BuiltinMembersPreset looks up a preset by name. diff --git a/tools/loadgen/members_capacity_test.go b/tools/loadgen/members_capacity_test.go new file mode 100644 index 000000000..bb1649b60 --- /dev/null +++ b/tools/loadgen/members_capacity_test.go @@ -0,0 +1,206 @@ +package main + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSustainableOps(t *testing.T) { + tests := []struct { + name string + pools CandidatePools + usersPerAdd int + want int + }{ + {"nil pools", nil, 10, 0}, + {"empty pools", CandidatePools{}, 10, 0}, + {"zero usersPerAdd", CandidatePools{"r1": make([]string, 50)}, 0, 0}, + {"negative usersPerAdd", CandidatePools{"r1": make([]string, 50)}, -1, 0}, + {"single room exact", CandidatePools{"r1": make([]string, 50)}, 10, 5}, + {"single room floor", CandidatePools{"r1": make([]string, 55)}, 10, 5}, + {"multi room", CandidatePools{"r1": make([]string, 50), "r2": make([]string, 30)}, 10, 8}, + {"room below threshold contributes zero", CandidatePools{"r1": make([]string, 9)}, 10, 0}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, SustainableOps(tc.pools, tc.usersPerAdd)) + }) + } +} + +func TestValidateSustainedCapacity_WithinBudget(t *testing.T) { + // capacity = 2 rooms * floor(50/10) = 10 ops; demand = 5/s * 1s = 5 ops. + pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 5, time.Second, 10) + assert.NoError(t, err) +} + +func TestValidateSustainedCapacity_AtBudgetBoundary(t *testing.T) { + // capacity = 10 ops; demand = 10/s * 1s = 10 ops — exactly equal must pass. + pools := CandidatePools{"r1": make([]string, 50), "r2": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 10) + assert.NoError(t, err) +} + +func TestValidateSustainedCapacity_Oversubscribed(t *testing.T) { + // capacity = floor(60000/10) = 6000 ops; demand = 100/s * 120s = 12000 ops. + // Achievable bounds are non-degenerate: maxRate=50, maxDuration=60s. + pools := CandidatePools{"r1": make([]string, 60000)} + err := ValidateSustainedCapacity("members-x", pools, 100, 120*time.Second, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool), + "oversubscribed validation must wrap ErrInsufficientPool") + // The message must be actionable: name the preset and the achievable bounds. + msg := err.Error() + assert.Contains(t, msg, "members-x") + assert.Contains(t, msg, "--rate 50") + assert.Contains(t, msg, "--duration 1m0s") +} + +func TestValidateSustainedCapacity_TooSmallForWorkload(t *testing.T) { + // capacity = 25 ops; at rate=100/60s the achievable bounds round to zero, + // so the message must steer to a larger preset rather than "--rate 0". + pools := CandidatePools{"r1": make([]string, 250)} + err := ValidateSustainedCapacity("members-small", pools, 100, 60*time.Second, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) + msg := err.Error() + assert.Contains(t, msg, "members-small") + assert.Contains(t, msg, "larger preset") + assert.NotContains(t, msg, "--rate 0", "must not suggest a nonsensical zero rate") + assert.NotContains(t, msg, "--duration 0s", "must not suggest a nonsensical zero duration") +} + +func TestValidateSustainedCapacity_ZeroUsersPerAdd(t *testing.T) { + pools := CandidatePools{"r1": make([]string, 50)} + err := ValidateSustainedCapacity("members-x", pools, 10, time.Second, 0) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) +} + +// The default members-sustained invocation (rate=100, duration=60s, +// usersPerAdd=10) must be satisfiable by the members-medium preset so the +// documented out-of-box command completes instead of aborting on exhaustion. +func TestMembersMedium_SustainsDefaultInvocation(t *testing.T) { + p, ok := BuiltinMembersPreset("members-medium") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + const ( + defaultRate = 100 + defaultDuration = 60 * time.Second + defaultUsersPerAdd = 10 + ) + require.NoError(t, + ValidateSustainedCapacity(p.Name, pools, defaultRate, defaultDuration, defaultUsersPerAdd), + "members-medium must sustain the default 100rps/60s/10-per-add invocation") + + // Stays under MAX_ROOM_SIZE (1000): baseline members + full pool per room. + for roomID, pool := range pools { + assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000, + "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) + } +} + +// members-heavy is the high-rate preset: it must sustain rate=1000 for 60s at +// the default users-per-add=10 (60,000 ops) so a 1000 req/s steady-state run +// clears the preflight guard instead of being capped to a short burst. +func TestMembersHeavy_SustainsThousandRPS(t *testing.T) { + p, ok := BuiltinMembersPreset("members-heavy") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + const ( + rate = 1000 + duration = 60 * time.Second + usersPerAdd = 10 + ) + require.NoError(t, + ValidateSustainedCapacity(p.Name, pools, rate, duration, usersPerAdd), + "members-heavy must sustain rate=1000/60s at users-per-add=10") + + for roomID, pool := range pools { + assert.LessOrEqual(t, p.BaselineSize+len(pool), 1000, + "room %s baseline+pool must fit under MAX_ROOM_SIZE", roomID) + } +} + +func TestValidateCapacityTarget(t *testing.T) { + // Each room grows by full users-per-add batches, so a room of pool P reaches + // baseline + ⌊P/usersPerAdd⌋*usersPerAdd at most. + tests := []struct { + name string + pools CandidatePools + baseline int + targetSize int + usersPerAdd int + wantErr bool + wantContains []string + }{ + { + name: "target below baseline is a no-op", + pools: CandidatePools{"r1": make([]string, 0)}, + baseline: 10, targetSize: 5, usersPerAdd: 10, + wantErr: false, + }, + { + name: "zero usersPerAdd guarded by caller", + pools: CandidatePools{"r1": make([]string, 50)}, + baseline: 10, targetSize: 100, usersPerAdd: 0, + wantErr: false, + }, + { + name: "every room reaches target", + pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 100)}, + baseline: 10, targetSize: 100, usersPerAdd: 10, // need 9 batches, have 10 + wantErr: false, + }, + { + name: "a room falls short", + pools: CandidatePools{"r1": make([]string, 100), "r2": make([]string, 50)}, + baseline: 10, targetSize: 100, usersPerAdd: 10, // r2 has 5 batches < 9 needed + wantErr: true, + wantContains: []string{"members-x", "target-size=100", "--target-size to 60"}, // 10 + 5*10 + }, + { + name: "pool below one batch reaches only baseline", + pools: CandidatePools{"r1": make([]string, 9)}, + baseline: 10, targetSize: 11, usersPerAdd: 10, // need 1 batch, have 0 + wantErr: true, + wantContains: []string{"--target-size to 10"}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := ValidateCapacityTarget("members-x", tc.pools, tc.baseline, tc.targetSize, tc.usersPerAdd) + if !tc.wantErr { + assert.NoError(t, err) + return + } + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) + for _, sub := range tc.wantContains { + assert.Contains(t, err.Error(), sub) + } + }) + } +} + +// members-capacity must clear its preflight at the documented growth target. +func TestMembersCapacity_ReachesDocumentedTarget(t *testing.T) { + p, ok := BuiltinMembersPreset("members-capacity") + require.True(t, ok) + _, pools := BuildMembersFixtures(&p, 42, "site-A") + + require.NoError(t, + ValidateCapacityTarget(p.Name, pools, p.BaselineSize, 500, 10), + "members-capacity must reach target-size=500") + // Asking past baseline+pool must be rejected with the reachable ceiling. + err := ValidateCapacityTarget(p.Name, pools, p.BaselineSize, p.BaselineSize+p.CandidatePool+1, 10) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrInsufficientPool)) +} diff --git a/tools/loadgen/members_generator.go b/tools/loadgen/members_generator.go index 709e3ca59..7fb1db906 100644 --- a/tools/loadgen/members_generator.go +++ b/tools/loadgen/members_generator.go @@ -49,6 +49,99 @@ type SustainedMembersGenerator struct { // fewer than UsersPerAdd accounts remaining. var ErrPoolsExhausted = errors.New("candidate pool exhausted on every room: rate * duration * usersPerAdd exceeded preset CandidatePool") +// ErrInsufficientPool is returned by ValidateSustainedCapacity (and wrapped +// with the specifics) when the requested rate/duration would consume more +// candidates than the preset's pools can supply. It exists so callers can +// match the preflight failure with errors.Is. +var ErrInsufficientPool = errors.New("insufficient candidate pool for requested workload") + +// SustainableOps returns the maximum number of add-member publishes the given +// candidate pools can supply at usersPerAdd accounts each. A candidate is +// single-use (once added, the account is a room member and cannot be re-added), +// so this is a hard ceiling on the publishes a sustained run can ever make. +func SustainableOps(pools CandidatePools, usersPerAdd int) int { + if usersPerAdd <= 0 { + return 0 + } + total := 0 + for _, pool := range pools { + total += len(pool) / usersPerAdd + } + return total +} + +// ValidateSustainedCapacity is the preflight guard for members-sustained. It +// fails fast (before any NATS/store work) when rate*duration would exhaust the +// preset's candidate pools, returning an actionable error wrapping +// ErrInsufficientPool that names the achievable max --rate and --duration. +func ValidateSustainedCapacity(presetName string, pools CandidatePools, rate int, duration time.Duration, usersPerAdd int) error { + capacity := SustainableOps(pools, usersPerAdd) + durSecs := duration.Seconds() + demand := int(float64(rate) * durSecs) + if demand <= capacity { + return nil + } + maxRate := 0 + if durSecs > 0 { + maxRate = int(float64(capacity) / durSecs) + } + maxDuration := time.Duration(0) + if rate > 0 { + maxDuration = (time.Duration(capacity) * time.Second) / time.Duration(rate) + } + // When the achievable bounds round to zero the preset simply cannot sustain + // this workload (a smoke preset asked to run a real load). Steer to a bigger + // preset / smaller --users-per-add instead of suggesting "--rate 0". + var advice string + if maxRate >= 1 && maxDuration >= time.Second { + advice = fmt.Sprintf("lower to --rate %d (at this duration) or --duration %s (at this rate), or pick a larger preset", + maxRate, maxDuration.Round(time.Second)) + } else { + advice = fmt.Sprintf("too small for this workload: at --rate %d it sustains only ~%s — pick a larger preset or lower --users-per-add (and --duration)", + rate, maxDuration.Round(time.Millisecond)) + } + return fmt.Errorf( + "preset %q candidate pools supply at most %d add-member ops at users-per-add=%d, "+ + "but rate=%d/s × duration=%s needs ~%d ops; %s: %w", + presetName, capacity, usersPerAdd, rate, duration, demand, advice, ErrInsufficientPool, + ) +} + +// ValidateCapacityTarget is the preflight guard for members-capacity. Capacity +// mode grows each room to targetSize by adding usersPerAdd members per batch and +// never sends a final partial batch, so a room with pool P reaches at most +// baseline + ⌊P/usersPerAdd⌋·usersPerAdd. The binding constraint is per-room +// pool depth (not aggregate throughput), so this fails fast when any room can't +// reach targetSize, naming how many rooms fall short and the reachable ceiling. +func ValidateCapacityTarget(presetName string, pools CandidatePools, baselineSize, targetSize, usersPerAdd int) error { + if usersPerAdd <= 0 || targetSize <= baselineSize { + return nil // nothing to grow, or usersPerAdd validated by the caller + } + batchesNeeded := (targetSize - baselineSize + usersPerAdd - 1) / usersPerAdd + short := 0 + minBatches := -1 + for _, pool := range pools { + batches := len(pool) / usersPerAdd + if batches < batchesNeeded { + short++ + if minBatches < 0 || batches < minBatches { + minBatches = batches + } + } + } + if short == 0 { + return nil + } + maxTarget := baselineSize + minBatches*usersPerAdd + return fmt.Errorf( + "preset %q cannot grow %d/%d rooms to target-size=%d at users-per-add=%d: "+ + "reaching it needs %d add batches/room but the thinnest pool supplies %d; "+ + "lower --target-size to %d or pick a larger preset: %w", + presetName, short, len(pools), targetSize, usersPerAdd, + batchesNeeded, minBatches, maxTarget, ErrInsufficientPool, + ) +} + // NewSustainedMembersGenerator clones the candidate pools so the input is // not mutated. func NewSustainedMembersGenerator(cfg *SustainedMembersConfig, seed int64) *SustainedMembersGenerator { diff --git a/tools/loadgen/members_publisher.go b/tools/loadgen/members_publisher.go index fca2c7df4..48c85db47 100644 --- a/tools/loadgen/members_publisher.go +++ b/tools/loadgen/members_publisher.go @@ -10,6 +10,7 @@ import ( "github.com/nats-io/nats.go/jetstream" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/subject" ) @@ -30,12 +31,19 @@ func newCanonicalMemberPublisher(js jetstream.JetStream, siteID string) *canonic } func (p *canonicalMemberPublisher) Publish(ctx context.Context, _ string, _ string, - req *model.AddMembersRequest, _ string) error { + req *model.AddMembersRequest, corrID string) error { data, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshal add-members canonical event: %w", err) } - if _, err := p.js.Publish(ctx, subject.RoomCanonical(p.siteID, "member.add"), data); err != nil { + // room-worker rejects canonical events without an X-Request-ID header as a + // permanent error; stamp corrID so the event carries one and traces end-to-end. + msg := &nats.Msg{ + Subject: subject.RoomCanonical(p.siteID, "member.add"), + Data: data, + Header: nats.Header{natsutil.RequestIDHeader: []string{corrID}}, + } + if _, err := p.js.PublishMsg(ctx, msg); err != nil { return fmt.Errorf("jetstream publish: %w", err) } return nil @@ -89,7 +97,15 @@ func (p *frontdoorMemberPublisher) Publish(_ context.Context, requesterAccount, } subj := subject.MemberAdd(requesterAccount, roomID, p.siteID) reply := p.inboxPrefix + "." + corrID - if err := p.nc.PublishRequest(subj, reply, data); err != nil { + // room-service requires an X-Request-ID header on the request; stamp corrID + // so the request ID and reply-correlation token are the same across the pipeline. + msg := &nats.Msg{ + Subject: subj, + Reply: reply, + Data: data, + Header: nats.Header{natsutil.RequestIDHeader: []string{corrID}}, + } + if err := p.nc.PublishMsg(msg); err != nil { return fmt.Errorf("nats publish request: %w", err) } return nil diff --git a/tools/loadgen/members_publisher_test.go b/tools/loadgen/members_publisher_test.go index 0a9698cea..1df5c3260 100644 --- a/tools/loadgen/members_publisher_test.go +++ b/tools/loadgen/members_publisher_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/hmchangw/chat/pkg/model" + "github.com/hmchangw/chat/pkg/natsutil" "github.com/hmchangw/chat/pkg/stream" "github.com/hmchangw/chat/pkg/subject" ) @@ -76,6 +77,72 @@ func TestCanonicalMemberPublisher_PublishesToRoomCanonical(t *testing.T) { } } +func TestCanonicalMemberPublisher_SetsRequestIDHeader(t *testing.T) { + nc, js := startEmbeddedJetStream(t) + + siteID := "site-A" + _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{ + Name: stream.Rooms(siteID).Name, + Subjects: stream.Rooms(siteID).Subjects, + }) + require.NoError(t, err) + + captured := make(chan *nats.Msg, 1) + sub, err := nc.Subscribe(subject.RoomCanonicalWildcard(siteID), func(m *nats.Msg) { + captured <- m + }) + require.NoError(t, err) + defer func() { _ = sub.Unsubscribe() }() + + p := newCanonicalMemberPublisher(js, siteID) + + req := &model.AddMembersRequest{ + RoomID: "room-1", + Users: []string{"u1", "u2"}, + RequesterAccount: "owner-1", + Timestamp: time.Now().UTC().UnixMilli(), + } + const corrID = "01970a4f-8c2d-7c9a-abcd-e0123456789f" + require.NoError(t, p.Publish(context.Background(), "owner-1", "room-1", req, corrID)) + + select { + case m := <-captured: + assert.Equal(t, corrID, m.Header.Get(natsutil.RequestIDHeader), + "canonical publish must carry the corrID as the X-Request-ID header") + case <-time.After(2 * time.Second): + t.Fatal("did not receive canonical publish within 2s") + } +} + +func TestFrontdoorMemberPublisher_SetsRequestIDHeader(t *testing.T) { + nc, _ := startEmbeddedJetStream(t) + siteID := "site-A" + + captured := make(chan *nats.Msg, 1) + sub, err := nc.Subscribe(subject.MemberAddWildcard(siteID), func(m *nats.Msg) { + captured <- m + _ = m.Respond([]byte(`{"status":"accepted"}`)) + }) + require.NoError(t, err) + defer func() { _ = sub.Unsubscribe() }() + + p, err := newFrontdoorMemberPublisher(nc, siteID, func(string, []byte, time.Time) {}) + require.NoError(t, err) + defer p.Close() + + req := &model.AddMembersRequest{RoomID: "room-X", Users: []string{"u1"}} + const corrID = "01970a4f-8c2d-7c9a-abcd-e0123456789f" + require.NoError(t, p.Publish(context.Background(), "owner-9", "room-X", req, corrID)) + + select { + case m := <-captured: + assert.Equal(t, corrID, m.Header.Get(natsutil.RequestIDHeader), + "frontdoor publish must carry the corrID as the X-Request-ID header") + case <-time.After(2 * time.Second): + t.Fatal("never received request") + } +} + func TestFrontdoorMemberPublisher_RequestReply(t *testing.T) { nc, _ := startEmbeddedJetStream(t) siteID := "site-A" diff --git a/tools/loadgen/members_test.go b/tools/loadgen/members_test.go index 08ed51010..74c04ebc3 100644 --- a/tools/loadgen/members_test.go +++ b/tools/loadgen/members_test.go @@ -66,7 +66,7 @@ func TestValidateInjectShape(t *testing.T) { } func TestBuiltinMembersPreset(t *testing.T) { - cases := []string{"members-small", "members-medium", "members-capacity"} + cases := []string{"members-small", "members-medium", "members-capacity", "members-heavy"} for _, name := range cases { t.Run(name, func(t *testing.T) { p, ok := BuiltinMembersPreset(name)