feat(loadgen): add read-receipt workload to max-rps#264
Conversation
Moves the production natsReadReceiptRequester impl out of the interface file (which caused unused-lint failures) to align with the history pattern where the concrete NATS requester lives in the main wiring file. https://claude.ai/code/session_019TuJE9Y5nbsTMnXdycLiWo
Wires the readReceiptWorkload into the max-rps ramp engine: restores the production natsReadReceiptRequester, adds the read-receipt case to runMaxRPS, and introduces buildReadReceiptInputs/runReadReceiptFor. https://claude.ai/code/session_019TuJE9Y5nbsTMnXdycLiWo
Implements selectReaders (ceil-based uniform random selection) and latestTopLevelByRoom helpers, then composes them in SeedReadReceiptState to stamp lastSeenAt on a configurable fraction of subscribers per room. Wires a new "read-receipt" workload into the seed subcommand via runSeedReadReceipt, which seeds the full history fixture set (Mongo + Cassandra) before stamping reader state. https://claude.ai/code/session_019TuJE9Y5nbsTMnXdycLiWo
📝 WalkthroughWalkthroughThis pull request implements a complete ChangesRead-receipt max-rps workload implementation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes The PR combines heterogeneous new components (collector, requester, generator, seeding helpers, workload adapter) across multiple files with logic density in the generator concurrency bounding and seeding selection. However, each component is self-contained with unit tests, and the architecture mirrors the existing history workload pattern. The integration points are straightforward wiring into existing CLI dispatch tables and the Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/superpowers/plans/2026-06-02-loadgen-read-receipt-maxrps.md`:
- Around line 700-713: The RunStep method in readReceiptWorkload uses time.Sleep
to "drain" in-flight replies; replace this with explicit synchronization by
having the generator created by w.newGenerator provide a deterministic
completion signal (for example a done channel or sync.WaitGroup) and waiting on
that signal instead of time.Sleep. Concretely: update
w.newGenerator/runReadReceiptFor contract so runReadReceiptFor returns (or
exposes) a completion indicator tied to the generator instance (e.g., a done
chan struct{} or *sync.WaitGroup) that is closed/marked when all reply
goroutines are finished; then in RunStep (the function shown) remove
time.Sleep(2 * time.Second) and wait on that completion signal before calling
buildReadReceiptInputs with the collector; ensure NewReadReceiptCollector and
collector usage remain unchanged but are only read after the generator signals
completion.
In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md`:
- Around line 125-128: The RunStep description currently tells implementers to
"sleep briefly to drain trailing in-flight replies" — replace that with explicit
synchronization: when mirroring historyWorkload.RunStep (functions/methods
referenced: RunStep, buildReadReceiptInputs, warmup, hold, collector) require
the warmup and hold generators to run as distinct goroutines that signal
completion via channels or a sync.WaitGroup and ensure the measured collector
drains by waiting for a done signal from the generator/collector rather than
sleeping; document that the throwaway collector used for warmup discards samples
and does not block the synchronization, and specify the exact signal (e.g.,
generator done channel or WaitGroup) that callers must wait on before calling
buildReadReceiptInputs(targetRPS, hold, collector).
- Around line 93-95: Update the spec to reflect the actual CLI contract: replace
references to the non-existent `seed-read-receipt` subcommand with the real
invocation `loadgen seed --workload read-receipt` and show the same parameters
(`--preset`, `--seed`, `--read-ratio`) passed to `loadgen seed`; specifically
edit the sections that describe the command (currently mentioning
`seed-read-receipt`) so they present the correct command shape (e.g., `loadgen
seed --workload read-receipt --preset ... --seed ... --read-ratio ...`) and any
examples or docs that repeat `seed-read-receipt` to avoid operator confusion.
- Line 77: Remove the dangling Reset() references from the collector spec:
update the descriptions for the readreceipt_collector and history_collector to
omit "Reset()"-able and instead state the collector provides an in-memory
latency tape, timeout/reply-error/bad-reply/saturation counters and is
thread-safe (mutex); ensure any other occurrences (e.g., the second mention
around line 153) are likewise cleaned up and optionally add a short note that
Reset() will be documented when/if a Reset API is actually introduced.
In `@tools/loadgen/history_main.go`:
- Around line 82-84: The validation for the command-line flag readRatio
incorrectly allows NaN because flag.Float64 parses "NaN" and the current check
(readRatio <= 0 || readRatio > 1) doesn't catch it; update the validation in the
same block that references readRatio (before calling SeedReadReceiptState) to
explicitly reject NaN (use math.IsNaN(readRatio)) and treat it as invalid,
printing the same error message "--read-ratio must be in (0, 1]" and returning
2.
In `@tools/loadgen/maxrps_readreceipt.go`:
- Around line 71-76: In the cleanup closure, don't ignore errors from
srv.Shutdown and nc.Drain; capture each return value (e.g., err :=
srv.Shutdown(shutCtx) and err := nc.Drain()) and log a warning or error with
context (for example "server shutdown failed" and "nats drain failed") including
the error value so teardown failures are visible; keep the existing context
cancel call and ensure logging happens before returning from the cleanup func.
- Around line 112-119: The goroutine currently discards the error returned by
gen.Run(genCtx), causing failures to be hidden; change it to capture the result
(e.g., send the error to a buffered channel or store it in a variable
synchronized with the WaitGroup) and after wg.Wait() check that error and return
it instead of the nil from waitOrCancel; ensure you still call cancel() and
wait, and return the gen.Run error if present (whether it finished before or
after waitOrCancel). Reference: gen.Run, genCtx, waitOrCancel, cancel, wg.
In `@tools/loadgen/README.md`:
- Around line 320-325: The README example is misleading because
runTeardownHistory derives room IDs from the preset+seed, so teardown must be
run with the same --seed used to create the fixtures; update the docs to state
this requirement, mention that failing to pass the original --seed will leave
Valkey room keys behind, and change the example for loadgen teardown
--workload=history --preset=history-medium to include a matching --seed (e.g.,
--seed=1234) and a short note referencing runTeardownHistory so readers know why
the seed must match.
In `@tools/loadgen/readreceipt_integration_test.go`:
- Around line 78-84: Replace the fixed time.Sleep by waiting on a deterministic
signal from the generator/responder: modify the test to create a done channel
(or use a sync.WaitGroup) that the stub responder closes or signals when it has
delivered all trailing replies, then replace time.Sleep(500*time.Millisecond)
with a select that waits on that done channel (or WaitGroup completion) or the
runCtx.Done() for timeout; update the stub responder (or generator) to signal
the done channel when it finishes emitting replies so the test uses
gen.Run(runCtx) plus a synchronous wait on that signal before asserting on
collector.Samples() and collector.Failed().
In `@tools/loadgen/readreceipt_seed.go`:
- Around line 69-77: byRoom is a map so iterating over it uses a
nondeterministic order and consumes the shared RNG (rng) unpredictably; to
restore deterministic behavior for SeedReadReceiptState, collect the room IDs
from byRoom, sort them (e.g., lexicographically), and iterate over the sorted
slice when generating lastSeen and calling selectReaders so the same rng
sequence is consumed in a stable order for every run.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d0270cc3-2bf2-4650-a477-4f67c93bdacb
📒 Files selected for processing (16)
docs/superpowers/plans/2026-06-02-loadgen-read-receipt-maxrps.mddocs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.mdtools/loadgen/README.mdtools/loadgen/history_main.gotools/loadgen/main.gotools/loadgen/maxrps.gotools/loadgen/maxrps_readreceipt.gotools/loadgen/maxrps_readreceipt_test.gotools/loadgen/readreceipt_collector.gotools/loadgen/readreceipt_collector_test.gotools/loadgen/readreceipt_generator.gotools/loadgen/readreceipt_generator_test.gotools/loadgen/readreceipt_integration_test.gotools/loadgen/readreceipt_requester.gotools/loadgen/readreceipt_seed.gotools/loadgen/readreceipt_seed_test.go
| func (w *readReceiptWorkload) RunStep(ctx context.Context, targetRPS int, warmup, hold time.Duration) (rpsStepInputs, error) { | ||
| if warmup > 0 { | ||
| warmCollector := NewReadReceiptCollector() | ||
| if err := runReadReceiptFor(ctx, w.newGenerator(warmCollector, targetRPS), warmup); err != nil { | ||
| return rpsStepInputs{}, err | ||
| } | ||
| } | ||
| collector := NewReadReceiptCollector() | ||
| if err := runReadReceiptFor(ctx, w.newGenerator(collector, targetRPS), hold); err != nil { | ||
| return rpsStepInputs{}, err | ||
| } | ||
| time.Sleep(2 * time.Second) // drain trailing in-flight replies into the collector | ||
| return buildReadReceiptInputs(targetRPS, hold, collector), nil | ||
| } |
There was a problem hiding this comment.
Replace sleep-based draining with explicit synchronization in the prescribed RunStep flow.
Line 711 currently instructs time.Sleep(2 * time.Second) to drain replies. That pattern is brittle and violates the project’s concurrency rule; make the plan require deterministic completion signaling (e.g., wait on generator-owned sync primitives) instead.
As per coding guidelines: "Never use time.Sleep for goroutine synchronization — use proper sync primitives (channels, sync.WaitGroup, sync.Mutex)".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/plans/2026-06-02-loadgen-read-receipt-maxrps.md` around
lines 700 - 713, The RunStep method in readReceiptWorkload uses time.Sleep to
"drain" in-flight replies; replace this with explicit synchronization by having
the generator created by w.newGenerator provide a deterministic completion
signal (for example a done channel or sync.WaitGroup) and waiting on that signal
instead of time.Sleep. Concretely: update w.newGenerator/runReadReceiptFor
contract so runReadReceiptFor returns (or exposes) a completion indicator tied
to the generator instance (e.g., a done chan struct{} or *sync.WaitGroup) that
is closed/marked when all reply goroutines are finished; then in RunStep (the
function shown) remove time.Sleep(2 * time.Second) and wait on that completion
signal before calling buildReadReceiptInputs with the collector; ensure
NewReadReceiptCollector and collector usage remain unchanged but are only read
after the generator signals completion.
| |------|---------|----------------| | ||
| | `tools/loadgen/maxrps_readreceipt.go` | `maxrps_history.go` | `readReceiptWorkload` implementing `rpsWorkload`. `newReadReceiptWorkload` wires NATS, the metrics HTTP server, the requester, and derives targets from `BuildHistoryFixtures`. `RunStep` runs warmup (discarded) then hold (measured) and returns `rpsStepInputs`. `Label()` returns `"read-receipt"`. | | ||
| | `tools/loadgen/readreceipt_generator.go` | `history_generator.go` | `ReadReceiptGenerator` with a `Rate` ticker and a `MaxInFlight` semaphore. Each tick picks a random target, issues the request via the requester, and records the result in the collector. Saturation (pool full on tick) is recorded, not dropped silently. | | ||
| | `tools/loadgen/readreceipt_collector.go` | `history_collector.go` | In-memory latency tape plus `timeout`, `reply-error`, `bad-reply`, and `saturation` counters. Thread-safe (mutex), `Reset()`-able. | |
There was a problem hiding this comment.
Remove Reset() references from collector behavior unless that API is actually added.
Line 77 and Line 153 describe a Reset()-able collector, but the implemented collector contract does not include Reset(). Align the spec to the real API to avoid misleading test/usage expectations.
Also applies to: 153-153
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md` at
line 77, Remove the dangling Reset() references from the collector spec: update
the descriptions for the readreceipt_collector and history_collector to omit
"Reset()"-able and instead state the collector provides an in-memory latency
tape, timeout/reply-error/bad-reply/saturation counters and is thread-safe
(mutex); ensure any other occurrences (e.g., the second mention around line 153)
are likewise cleaned up and optionally add a short note that Reset() will be
documented when/if a Reset API is actually introduced.
| - `main.go`: add a `seed-read-receipt` subcommand that runs the full history | ||
| seed (`Seed`, `SeedRoomKeys`, `SeedThreadRooms`, `SeedHistoryCassandra`) then | ||
| `SeedReadReceiptState`, parameterized by `--preset`, `--seed`, `--read-ratio`. |
There was a problem hiding this comment.
Fix the seed command contract: this spec documents a non-existent CLI shape.
Lines 93–95 and Line 101 describe seed-read-receipt, but this PR’s contract is loadgen seed --workload read-receipt .... Keeping the wrong command here will cause immediate operator error.
Also applies to: 101-101
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md`
around lines 93 - 95, Update the spec to reflect the actual CLI contract:
replace references to the non-existent `seed-read-receipt` subcommand with the
real invocation `loadgen seed --workload read-receipt` and show the same
parameters (`--preset`, `--seed`, `--read-ratio`) passed to `loadgen seed`;
specifically edit the sections that describe the command (currently mentioning
`seed-read-receipt`) so they present the correct command shape (e.g., `loadgen
seed --workload read-receipt --preset ... --seed ... --read-ratio ...`) and any
examples or docs that repeat `seed-read-receipt` to avoid operator confusion.
| Mirrors `historyWorkload.RunStep`: run a fresh generator for `warmup` against a | ||
| throwaway collector (samples discarded), then run a fresh generator for `hold` | ||
| against the measured collector, sleep briefly to drain trailing in-flight | ||
| replies, and return `buildReadReceiptInputs(targetRPS, hold, collector)`. |
There was a problem hiding this comment.
Avoid prescribing sleep-based draining in RunStep behavior.
The “sleep briefly to drain trailing replies” guidance should be replaced with explicit synchronization semantics; sleep-based coordination is nondeterministic and policy-inconsistent.
As per coding guidelines: "Never use time.Sleep for goroutine synchronization — use proper sync primitives (channels, sync.WaitGroup, sync.Mutex)".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md`
around lines 125 - 128, The RunStep description currently tells implementers to
"sleep briefly to drain trailing in-flight replies" — replace that with explicit
synchronization: when mirroring historyWorkload.RunStep (functions/methods
referenced: RunStep, buildReadReceiptInputs, warmup, hold, collector) require
the warmup and hold generators to run as distinct goroutines that signal
completion via channels or a sync.WaitGroup and ensure the measured collector
drains by waiting for a done signal from the generator/collector rather than
sleeping; document that the throwaway collector used for warmup discards samples
and does not block the synchronization, and specify the exact signal (e.g.,
generator done channel or WaitGroup) that callers must wait on before calling
buildReadReceiptInputs(targetRPS, hold, collector).
| if readRatio <= 0 || readRatio > 1 { | ||
| fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]") | ||
| return 2 |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does Go's flag package accept "NaN" for a float64 flag, and do comparisons like x <= 0 || x > 1 evaluate to false when x is NaN?
💡 Result:
Yes. The standard library flag package’s float64 flag ultimately uses strconv.ParseFloat to parse the flag value, and strconv.ParseFloat recognizes the string "NaN" (case-insensitive) as a NaN float64. [1][2] For comparisons: in Go, any comparison operation (<, <=, >, >=, ==,!=) involving a NaN produces results that follow IEEE 754 semantics: x <= 0 is false when x is NaN, and x > 1 is also false when x is NaN; therefore the boolean expression (x <= 0 || x > 1) evaluates to false when x is NaN. This is explicitly tested by Go’s float comparison test suite (nan <= f, nan > f, and nan <= nan are all false). [3] Note:!= is the only “true” comparison when comparing NaN to anything (including itself), consistent with IEEE 754. [3]
Citations:
- 1: https://github.com/golang/go/blob/996b9850/src/flag/flag.go
- 2: https://pkg.go.dev/strconv
- 3: https://go.googlesource.com/go/+/2af974777d8e771e5c58f172876ac4c6d9fb7256/test/floatcmp.go
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n "readRatio|read-ratio|SeedReadReceiptState|runSeedReadReceipt" tools/loadgen/history_main.go tools/loadgen/main.go || true
echo "---- history_main.go (around guard) ----"
sed -n '60,110p' tools/loadgen/history_main.go
echo "---- history_main.go (around runSeedReadReceipt) ----"
sed -n '110,190p' tools/loadgen/history_main.go
echo "---- main.go (flag wiring) ----"
sed -n '1,220p' tools/loadgen/main.goRepository: hmchangw/chat
Length of output: 12596
Reject NaN for --read-ratio.
flag.Float64 accepts "NaN", and readRatio <= 0 || readRatio > 1 evaluates to false when readRatio is NaN, so an invalid value can reach SeedReadReceiptState.
Suggested fix
- if readRatio <= 0 || readRatio > 1 {
+ if !(readRatio > 0 && readRatio <= 1) {
fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]")
return 2
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if readRatio <= 0 || readRatio > 1 { | |
| fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]") | |
| return 2 | |
| if !(readRatio > 0 && readRatio <= 1) { | |
| fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]") | |
| return 2 | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/history_main.go` around lines 82 - 84, The validation for the
command-line flag readRatio incorrectly allows NaN because flag.Float64 parses
"NaN" and the current check (readRatio <= 0 || readRatio > 1) doesn't catch it;
update the validation in the same block that references readRatio (before
calling SeedReadReceiptState) to explicitly reject NaN (use
math.IsNaN(readRatio)) and treat it as invalid, printing the same error message
"--read-ratio must be in (0, 1]" and returning 2.
| cleanup := func() { | ||
| shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| _ = srv.Shutdown(shutCtx) | ||
| cancel() | ||
| _ = nc.Drain() | ||
| } |
There was a problem hiding this comment.
Log teardown failures instead of discarding them.
Lines 73 and 75 silently drop Shutdown/Drain errors, which hides failed listener teardown or NATS drain problems during loadgen cleanup. At least warn with context so the run output reflects partial shutdowns.
💡 Suggested fix
cleanup := func() {
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- _ = srv.Shutdown(shutCtx)
+ if err := srv.Shutdown(shutCtx); err != nil {
+ slog.Warn("shutdown metrics server", "error", err)
+ }
cancel()
- _ = nc.Drain()
+ if err := nc.Drain(); err != nil {
+ slog.Warn("drain nats connection", "error", err)
+ }
}As per coding guidelines: "Never ignore errors silently — comment if intentionally discarded".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| cleanup := func() { | |
| shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| _ = srv.Shutdown(shutCtx) | |
| cancel() | |
| _ = nc.Drain() | |
| } | |
| cleanup := func() { | |
| shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| if err := srv.Shutdown(shutCtx); err != nil { | |
| slog.Warn("shutdown metrics server", "error", err) | |
| } | |
| cancel() | |
| if err := nc.Drain(); err != nil { | |
| slog.Warn("drain nats connection", "error", err) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/maxrps_readreceipt.go` around lines 71 - 76, In the cleanup
closure, don't ignore errors from srv.Shutdown and nc.Drain; capture each return
value (e.g., err := srv.Shutdown(shutCtx) and err := nc.Drain()) and log a
warning or error with context (for example "server shutdown failed" and "nats
drain failed") including the error value so teardown failures are visible; keep
the existing context cancel call and ensure logging happens before returning
from the cleanup func.
| go func() { | ||
| defer wg.Done() | ||
| _ = gen.Run(genCtx) | ||
| }() | ||
| err := waitOrCancel(ctx, d) | ||
| cancel() | ||
| wg.Wait() | ||
| return err |
There was a problem hiding this comment.
Propagate gen.Run failures instead of dropping them.
Line 114 discards the generator error, so an early requester/setup failure turns into a full-duration wait and a misleadingly successful step result. Capture the goroutine result and return it when gen.Run exits before or after cancellation.
💡 Suggested fix
func runReadReceiptFor(ctx context.Context, gen *ReadReceiptGenerator, d time.Duration) error {
genCtx, cancel := context.WithCancel(ctx)
- var wg sync.WaitGroup
- wg.Add(1)
+ defer cancel()
+ errCh := make(chan error, 1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
defer wg.Done()
- _ = gen.Run(genCtx)
+ errCh <- gen.Run(genCtx)
}()
- err := waitOrCancel(ctx, d)
- cancel()
- wg.Wait()
- return err
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+
+ select {
+ case err := <-errCh:
+ cancel()
+ wg.Wait()
+ return err
+ case <-ctx.Done():
+ cancel()
+ wg.Wait()
+ return ctx.Err()
+ case <-timer.C:
+ cancel()
+ wg.Wait()
+ return <-errCh
+ }
}As per coding guidelines: "Never ignore errors silently — comment if intentionally discarded".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/maxrps_readreceipt.go` around lines 112 - 119, The goroutine
currently discards the error returned by gen.Run(genCtx), causing failures to be
hidden; change it to capture the result (e.g., send the error to a buffered
channel or store it in a variable synchronized with the WaitGroup) and after
wg.Wait() check that error and return it instead of the nil from waitOrCancel;
ensure you still call cancel() and wait, and return the gen.Run error if present
(whether it finished before or after waitOrCancel). Reference: gen.Run, genCtx,
waitOrCancel, cancel, wg.
| To tear down, use the history teardown — read-receipt seeds the identical | ||
| history fixtures, so `loadgen teardown --workload=history --preset=<name>` drops | ||
| everything (dropping `subscriptions` removes the stamped `lastSeenAt` too): | ||
|
|
||
| ```bash | ||
| loadgen teardown --workload=history --preset=history-medium |
There was a problem hiding this comment.
Mention that teardown must reuse the seed.
This example implies any seed works, but runTeardownHistory derives room IDs from the preset+seed before deleting Valkey room keys. If someone seeded read-receipt data with a non-default seed, this command will drop Mongo/Cassandra state but leave the corresponding room keys behind unless the same --seed is passed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/README.md` around lines 320 - 325, The README example is
misleading because runTeardownHistory derives room IDs from the preset+seed, so
teardown must be run with the same --seed used to create the fixtures; update
the docs to state this requirement, mention that failing to pass the original
--seed will leave Valkey room keys behind, and change the example for loadgen
teardown --workload=history --preset=history-medium to include a matching --seed
(e.g., --seed=1234) and a short note referencing runTeardownHistory so readers
know why the seed must match.
| runCtx, cancel := context.WithTimeout(ctx, 2*time.Second) | ||
| defer cancel() | ||
| require.NoError(t, gen.Run(runCtx)) | ||
| time.Sleep(500 * time.Millisecond) // drain trailing replies | ||
|
|
||
| assert.NotEmpty(t, collector.Samples(), "generator produced zero samples") | ||
| assert.Equal(t, 0, collector.Failed(), "stub responder should yield zero failures") |
There was a problem hiding this comment.
Replace the fixed sleep with a real synchronization signal.
time.Sleep(500 * time.Millisecond) makes this test timing-dependent: slower CI can still observe the collector too early, while faster runs just pay an unnecessary delay. Wait on a concrete condition from the responder/generator instead of sleeping for an arbitrary interval. As per coding guidelines, "Never use time.Sleep for goroutine synchronization — use proper sync primitives (channels, sync.WaitGroup, sync.Mutex)."
💡 One way to make the assertion deterministic
runCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
require.NoError(t, gen.Run(runCtx))
- time.Sleep(500 * time.Millisecond) // drain trailing replies
+ require.Eventually(t, func() bool {
+ return len(collector.Samples()) > 0 || collector.Failed() > 0
+ }, time.Second, 10*time.Millisecond)
assert.NotEmpty(t, collector.Samples(), "generator produced zero samples")
assert.Equal(t, 0, collector.Failed(), "stub responder should yield zero failures")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| runCtx, cancel := context.WithTimeout(ctx, 2*time.Second) | |
| defer cancel() | |
| require.NoError(t, gen.Run(runCtx)) | |
| time.Sleep(500 * time.Millisecond) // drain trailing replies | |
| assert.NotEmpty(t, collector.Samples(), "generator produced zero samples") | |
| assert.Equal(t, 0, collector.Failed(), "stub responder should yield zero failures") | |
| runCtx, cancel := context.WithTimeout(ctx, 2*time.Second) | |
| defer cancel() | |
| require.NoError(t, gen.Run(runCtx)) | |
| require.Eventually(t, func() bool { | |
| return len(collector.Samples()) > 0 || collector.Failed() > 0 | |
| }, time.Second, 10*time.Millisecond) | |
| assert.NotEmpty(t, collector.Samples(), "generator produced zero samples") | |
| assert.Equal(t, 0, collector.Failed(), "stub responder should yield zero failures") |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/readreceipt_integration_test.go` around lines 78 - 84, Replace
the fixed time.Sleep by waiting on a deterministic signal from the
generator/responder: modify the test to create a done channel (or use a
sync.WaitGroup) that the stub responder closes or signals when it has delivered
all trailing replies, then replace time.Sleep(500*time.Millisecond) with a
select that waits on that done channel (or WaitGroup completion) or the
runCtx.Done() for timeout; update the stub responder (or generator) to signal
the done channel when it finishes emitting replies so the test uses
gen.Run(runCtx) plus a synchronous wait on that signal before asserting on
collector.Samples() and collector.Failed().
| rng := rand.New(rand.NewSource(seed)) | ||
| coll := db.Collection("subscriptions") | ||
| for roomID, roomSubs := range byRoom { | ||
| floor, ok := latest[roomID] | ||
| if !ok { | ||
| continue // room has no top-level messages — nothing to read | ||
| } | ||
| lastSeen := floor.Add(time.Millisecond).UTC() | ||
| chosen := selectReaders(roomSubs, readRatio, rng) |
There was a problem hiding this comment.
Sort rooms before consuming the seeded RNG.
byRoom is a Go map, so this loop runs in nondeterministic order. Because every room pulls from the same rng, the chosen readers change across runs even with the same seed, which breaks the deterministic seeding behavior promised by SeedReadReceiptState.
💡 Proposed fix
import (
"context"
"fmt"
"math"
"math/rand"
+ "sort"
"time"
@@
rng := rand.New(rand.NewSource(seed))
coll := db.Collection("subscriptions")
- for roomID, roomSubs := range byRoom {
+ roomIDs := make([]string, 0, len(byRoom))
+ for roomID := range byRoom {
+ roomIDs = append(roomIDs, roomID)
+ }
+ sort.Strings(roomIDs)
+ for _, roomID := range roomIDs {
+ roomSubs := byRoom[roomID]
floor, ok := latest[roomID]
if !ok {
continue // room has no top-level messages — nothing to read
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rng := rand.New(rand.NewSource(seed)) | |
| coll := db.Collection("subscriptions") | |
| for roomID, roomSubs := range byRoom { | |
| floor, ok := latest[roomID] | |
| if !ok { | |
| continue // room has no top-level messages — nothing to read | |
| } | |
| lastSeen := floor.Add(time.Millisecond).UTC() | |
| chosen := selectReaders(roomSubs, readRatio, rng) | |
| rng := rand.New(rand.NewSource(seed)) | |
| coll := db.Collection("subscriptions") | |
| roomIDs := make([]string, 0, len(byRoom)) | |
| for roomID := range byRoom { | |
| roomIDs = append(roomIDs, roomID) | |
| } | |
| sort.Strings(roomIDs) | |
| for _, roomID := range roomIDs { | |
| roomSubs := byRoom[roomID] | |
| floor, ok := latest[roomID] | |
| if !ok { | |
| continue // room has no top-level messages — nothing to read | |
| } | |
| lastSeen := floor.Add(time.Millisecond).UTC() | |
| chosen := selectReaders(roomSubs, readRatio, rng) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tools/loadgen/readreceipt_seed.go` around lines 69 - 77, byRoom is a map so
iterating over it uses a nondeterministic order and consumes the shared RNG
(rng) unpredictably; to restore deterministic behavior for SeedReadReceiptState,
collect the room IDs from byRoom, sort them (e.g., lexicographically), and
iterate over the sorted slice when generating lastSeen and calling selectReaders
so the same rng sequence is consumed in a stable order for every run.
Resolve conflicts between the read-receipt max-rps workload and the room-read workload + bottleneck-attribution work from main: - main.go / maxrps.go: keep both read-receipt and room-read workloads in the seed and max-rps dispatch, flag help, and default-steps list. - README.md: keep both the read-receipt section and main's bottleneck-attribution + daily-IM guide additions. - Adapt read-receipt seed/workload to main's refactored HistoryFixtures API: SeedThreadRooms/SeedHistoryCassandra now take *HistoryFixtures (SeedHistoryCassandra returns msgCount), and the in-memory plan is materialized via res.FullPlan() instead of the removed res.Plan field.
Resolve README.md conflict: keep main's new INCONCLUSIVE reasons (emit-underrun vs saturation columns + rate-pacing note) ahead of the read-receipt workload subsection. main.go auto-merged cleanly with both the read-receipt and room-read workloads present.
Summary
Adds a
read-receiptworkload to the loadgenmax-rpsramp command. It drives the room-service read-receipt RPC (chat.user.{account}.request.room.{roomID}.{siteID}.message.read-receipt) — a synchronous request/reply read ("who has read message X") — at increasing RPS steps to find the maximum sustainable rate under the latency/error SLOs.historyworkload (synchronous read, no JetStream consumer): plugs into therpsWorkloadinterface and reuses the ramp engine, verdict logic, and report rendering unchanged.msgSender == requesterAccountguard.seed --workload=read-receipt --read-ratio=0.7step stampslastSeenAton a configurable fraction of each room's subscribers soListReadReceiptsexercises its real$match/$lookup/$unwindpath instead of short-circuiting on an empty match.--slo-pending-growthis ignored (no consumer);--request-timeoutsets the per-request timeout.New files:
readreceipt_collector.go,readreceipt_requester.go,readreceipt_generator.go,maxrps_readreceipt.go,readreceipt_seed.go(+ unit tests and an integration test). Wiring inmaxrps.go,main.go,history_main.go. Docs intools/loadgen/README.md. Design + plan underdocs/superpowers/.Test Plan
make test SERVICE=tools/loadgen— unit suite passes (race detector)make lint— 0 issuesmake fmt— no changesmake test-integration SERVICE=tools/loadgen—TestReadReceiptWorkload_EndToEnd(compiles +go vet -tags integrationclean; runs in CI — Docker unavailable in the dev environment)loadgen seed --workload=read-receipt --preset=history-medium --read-ratio=0.7thenloadgen max-rps --workload=read-receipt --preset=history-medium --steps=200,500,1k,2k,5khttps://claude.ai/code/session_019TuJE9Y5nbsTMnXdycLiWo
Generated by Claude Code
Summary by CodeRabbit
New Features
read-receiptworkload option toloadgen max-rpsfor capacity testing--read-ratioflag to configure the fraction of subscribers marked as readers during workload preparationread-receiptseeding support vialoadgen seedcommandDocumentation