Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,387 changes: 1,387 additions & 0 deletions docs/superpowers/plans/2026-06-02-loadgen-read-receipt-maxrps.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Loadgen `max-rps` read-receipt workload

**Date:** 2026-06-02
**Status:** Approved (brainstorming) — pending implementation plan

## Summary

Add a `read-receipt` workload to the loadgen `max-rps` ramp command. The workload
drives the room-service read-receipt RPC (`chat.user.{account}.request.room.{roomID}.{siteID}.message.read-receipt`)
at increasing RPS steps and reports the maximum sustainable rate under the
configured latency/error SLOs.

Read receipts are a synchronous NATS request/reply read ("who has read message
X"), so the existing `history` workload — also a synchronous request/reply read
with no JetStream consumer — is the template. The new workload plugs into the
existing `rpsWorkload` interface, reusing the ramp engine, verdict logic, and
report rendering unchanged.

## Background

The read-receipt handler (`room-service/handler.go`, `handleMessageReadReceipt`)
enforces the following before returning the reader list:

1. The requester is subscribed to the room (`GetSubscription`).
2. The target message exists (`msgReader.GetMessageRoomAndCreatedAt`, reads
Cassandra `messages_by_id`).
3. The message belongs to the subject's room.
4. **The requester is the message's sender** (`msgSender == requesterAccount`),
else `errNotMessageSender`.
5. `ListReadReceipts(roomID, since=msgCreatedAt, excludeAccount=sender, limit)`
runs a Mongo aggregate: `$match {roomId, lastSeenAt >= since, u.account != sender}`
→ `$lookup` users → `$unwind` → `$replaceWith` → `$limit`.

Therefore a valid load target is a tuple `(senderAccount, roomID, messageID)`
where the message exists in Cassandra and the sender has a Mongo subscription.

**Key realism constraint:** history-seeded subscriptions set no `lastSeenAt`
(it is `*time.Time` with `omitempty`). Without it, the `ListReadReceipts`
`$match` matches zero documents and short-circuits before the `$lookup`/`$unwind`,
making the query artificially cheap. The workload must seed `lastSeenAt` on a
configurable fraction of subscribers to exercise the real query path.

## Decisions (from brainstorming)

- **Integration:** new `--workload read-receipt` adapter for the existing
`max-rps` ramp command. Not a standalone sustained generator.
- **Fixtures:** reuse the history seed. `BuildHistoryFixtures` + `BuiltinHistoryPreset`
produce users/rooms/subscriptions/messages; read-receipt targets are derived
from `plan.Messages`.
- **Reader seeding:** tunable `--read-ratio` (default `0.7`). Stamp `lastSeenAt`
on that fraction of each room's non-sender subscribers so the query returns
realistic fan-out.
- **Targets:** top-level messages only (`ThreadParentID == ""`).

## Architecture

The `max-rps` command already owns a pluggable workload seam:

```go
type rpsWorkload interface {
RunStep(ctx context.Context, targetRPS int, warmup, hold time.Duration) (rpsStepInputs, error)
Label() string
}
```

The ramp engine (`ramp.go`), verdict evaluation (`verdict.go`), normalized
step inputs (`rpsStepInputs`), and report rendering (`maxrps_report.go`) are
reused as-is. The read-receipt workload only supplies a new adapter plus its
generator/collector/requester, mirroring the history workload's file layout.

### New files (each mirrors a history counterpart)

| File | Mirrors | Responsibility |
|------|---------|----------------|
| `tools/loadgen/maxrps_readreceipt.go` | `maxrps_history.go` | `readReceiptWorkload` implementing `rpsWorkload`. `newReadReceiptWorkload` wires NATS, the metrics HTTP server, the requester, and derives targets from `BuildHistoryFixtures`. `RunStep` runs warmup (discarded) then hold (measured) and returns `rpsStepInputs`. `Label()` returns `"read-receipt"`. |
| `tools/loadgen/readreceipt_generator.go` | `history_generator.go` | `ReadReceiptGenerator` with a `Rate` ticker and a `MaxInFlight` semaphore. Each tick picks a random target, issues the request via the requester, and records the result in the collector. Saturation (pool full on tick) is recorded, not dropped silently. |
| `tools/loadgen/readreceipt_collector.go` | `history_collector.go` | In-memory latency tape plus `timeout`, `reply-error`, `bad-reply`, and `saturation` counters. Thread-safe (mutex), `Reset()`-able. |
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove Reset() references from collector behavior unless that API is actually added.

Line 77 and Line 153 describe a Reset()-able collector, but the implemented collector contract does not include Reset(). Align the spec to the real API to avoid misleading test/usage expectations.

Also applies to: 153-153

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md` at
line 77, Remove the dangling Reset() references from the collector spec: update
the descriptions for the readreceipt_collector and history_collector to omit
"Reset()"-able and instead state the collector provides an in-memory latency
tape, timeout/reply-error/bad-reply/saturation counters and is thread-safe
(mutex); ensure any other occurrences (e.g., the second mention around line 153)
are likewise cleaned up and optionally add a short note that Reset() will be
documented when/if a Reset API is actually introduced.

| `tools/loadgen/readreceipt_requester.go` | history requester (`newNATSHistoryRequester`) | `ReadReceiptRequester` interface + `newNATSReadReceiptRequester(nc)`. Builds the subject via `subject.MessageReadReceipt(account, roomID, siteID)`, marshals `model.ReadReceiptRequest`, calls `nc.Request(...)` with the per-request timeout, and classifies the reply (success / reply-error / bad-reply / timeout). |
| `tools/loadgen/readreceipt_seed.go` | `history_seed.go` | `SeedReadReceiptState(ctx, db, plan, readRatio, seed)`: stamps `lastSeenAt` on a deterministic `readRatio` sample of each room's non-sender subscribers. |

### Reused unchanged

- `BuildHistoryFixtures`, `BuiltinHistoryPreset` (`history.go`, `preset.go`).
- `Seed` (`seed.go`) — already creates `users`, `rooms`, `subscriptions`
collections with the `roomId` and `u.account` indexes the RPC needs.
- `ramp.go`, `verdict.go`, `maxrps_report.go`, `rpsStepInputs`.

### Wiring in existing files

- `maxrps.go`: add `case "read-receipt"` to the `runMaxRPS` workload switch,
constructing `newReadReceiptWorkload`. Reuse the existing `--request-timeout`
flag for the per-request timeout (currently labelled history-only).
- `main.go`: add a `seed-read-receipt` subcommand that runs the full history
seed (`Seed`, `SeedRoomKeys`, `SeedThreadRooms`, `SeedHistoryCassandra`) then
`SeedReadReceiptState`, parameterized by `--preset`, `--seed`, `--read-ratio`.
Comment on lines +93 to +95
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix the seed command contract: this spec documents a non-existent CLI shape.

Lines 93–95 and Line 101 describe seed-read-receipt, but this PR’s contract is loadgen seed --workload read-receipt .... Keeping the wrong command here will cause immediate operator error.

Also applies to: 101-101

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md`
around lines 93 - 95, Update the spec to reflect the actual CLI contract:
replace references to the non-existent `seed-read-receipt` subcommand with the
real invocation `loadgen seed --workload read-receipt` and show the same
parameters (`--preset`, `--seed`, `--read-ratio`) passed to `loadgen seed`;
specifically edit the sections that describe the command (currently mentioning
`seed-read-receipt`) so they present the correct command shape (e.g., `loadgen
seed --workload read-receipt --preset ... --seed ... --read-ratio ...`) and any
examples or docs that repeat `seed-read-receipt` to avoid operator confusion.

- `maxrps.go` `defaultSteps`: add a `read-receipt` branch returning
`"200,500,1000,2000,5000"` (history-like read profile).

## Data flow

1. **Seed (one-time):** `loadgen seed-read-receipt --preset <hp> --read-ratio 0.7`
- Runs the existing history seed (Mongo users/rooms/subscriptions + Cassandra
messages + room keys + thread_rooms).
- `SeedReadReceiptState` then sets `lastSeenAt = latestTargetCreatedAt + 1ms`
on a deterministic `read-ratio` sample of each room's non-sender subscribers.
2. **Fixtures (at run time):** `BuildHistoryFixtures(preset, seed, siteID, now)`
→ filter `plan.Messages` to `ThreadParentID == ""` →
`[]readReceiptTarget{Account, RoomID, MessageID}`. The same `seed` reproduces
the identities the seed step wrote.
3. **Per step:** the generator fires at `targetRPS`; for each tick it selects a
random target and issues the read-receipt request. The collector tapes E2E
latency on reply and counts hard errors and saturation.
4. **Normalized inputs:** `buildReadReceiptInputs` maps the hold-window collector
to `rpsStepInputs`:
- One latency series named `"read-receipt"`.
- `AttemptedOps = replies + failed`.
- `FailedOps = timeout + reply-error + bad-reply`.
- `Saturation = saturated`.
- `Pending` empty (synchronous read, no JetStream consumer — same as history).
Latency SLO gating (`--slo-p95`/`--slo-p99`) and error-rate gating
(`--slo-error-rate`) apply; `--slo-pending-growth` is ignored (like history).

## RunStep behavior

Mirrors `historyWorkload.RunStep`: run a fresh generator for `warmup` against a
throwaway collector (samples discarded), then run a fresh generator for `hold`
against the measured collector, sleep briefly to drain trailing in-flight
replies, and return `buildReadReceiptInputs(targetRPS, hold, collector)`.
Comment on lines +125 to +128
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid prescribing sleep-based draining in RunStep behavior.

The “sleep briefly to drain trailing replies” guidance should be replaced with explicit synchronization semantics; sleep-based coordination is nondeterministic and policy-inconsistent.

As per coding guidelines: "Never use time.Sleep for goroutine synchronization — use proper sync primitives (channels, sync.WaitGroup, sync.Mutex)".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/specs/2026-06-02-loadgen-read-receipt-maxrps-design.md`
around lines 125 - 128, The RunStep description currently tells implementers to
"sleep briefly to drain trailing in-flight replies" — replace that with explicit
synchronization: when mirroring historyWorkload.RunStep (functions/methods
referenced: RunStep, buildReadReceiptInputs, warmup, hold, collector) require
the warmup and hold generators to run as distinct goroutines that signal
completion via channels or a sync.WaitGroup and ensure the measured collector
drains by waiting for a done signal from the generator/collector rather than
sleeping; document that the throwaway collector used for warmup discards samples
and does not block the synchronization, and specify the exact signal (e.g.,
generator done channel or WaitGroup) that callers must wait on before calling
buildReadReceiptInputs(targetRPS, hold, collector).


## Error handling

- Requester classifies each request outcome: success (reply, no `error` field),
reply-error (reply with non-empty `error`), bad-reply (unmarshal failure),
timeout (`nats.ErrTimeout`/context deadline). Each maps to a collector counter.
- Saturation: when the in-flight semaphore is full on a tick, increment the
saturation counter rather than blocking the ticker or dropping silently.
- Seed errors wrap with context (`fmt.Errorf("seed read-receipt state: %w", err)`).
- `--read-ratio` validated to `0 < r <= 1`; invalid returns exit code 2 with a
message, matching the existing flag-validation convention in `runMaxRPS`.

## Testing (TDD — Red/Green/Refactor)

Unit tests (`package main`, `testify`):

- `parseReadRatio` / read-ratio validation: bounds `0 < r <= 1`, rejects `0`,
negatives, `>1`, non-numeric.
- Target derivation: only top-level messages selected; thread replies excluded;
tuple fields populated from the plan.
- `SeedReadReceiptState` sampling: deterministic for a fixed seed; selects the
configured fraction of non-sender subscribers; `lastSeenAt` is after the
target message `createdAt`; senders excluded.
- Collector accounting: latency tape length, per-reason error counts,
saturation count, `Reset()` clears state.
- `buildReadReceiptInputs`: correct mapping to `rpsStepInputs` (single series,
attempted/failed/saturation math, empty `Pending`).
- Generator: honors `Rate` (tick cadence) and records saturation when the
semaphore is full, using a fake `ReadReceiptRequester`.
- Requester reply classification: success / reply-error / bad-reply / timeout
against a fake NATS responder.

Integration test (`//go:build integration`, mirrors `history_integration_test.go`):

- Use `testutil.NATS` for the shared NATS URL (the existing history test uses
only NATS plus a canned responder — it exercises the loadgen request/reply
plumbing, not the real handler).
- Subscribe a minimal responder on `subject.MessageReadReceiptWildcard(siteID)`
that returns a canned `model.ReadReceiptResponse`, build the workload, run one
ramp step, and assert the step produced replies (non-empty `read-receipt`
latency series, zero `FailedOps`).
- `SeedReadReceiptState` factors its selection into a pure function
(`selectReaders(subs, readRatio, seed) -> []selected`) covered by unit tests;
the Mongo write itself is thin bulk-update glue.
- `TestMain` drives `testutil.RunTests(m)` per CLAUDE.md.

## Documentation

- Update `tools/loadgen/README.md`: document `--workload read-receipt`, the
`seed-read-receipt` subcommand, `--read-ratio`, and an example invocation.
- No `docs/client-api.md` change: this is load tooling only; the read-receipt
RPC contract is unchanged.

## Out of scope (YAGNI)

- A standalone sustained-rate read-receipt subcommand (no max-RPS discovery).
- Thread-reply read-receipt targets.
- Prometheus per-reason counters for the verdict (the in-memory collector is the
source of truth, matching history).
- Hot-room / Zipf target distribution (uniform pick, matching history).
52 changes: 48 additions & 4 deletions tools/loadgen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ list of steps, holds at each step for a measurement window, evaluates SLO
signals, and reports the largest step at which every signal passed.

```bash
loadgen max-rps --workload=messages|history --preset=<name> [flags]
loadgen max-rps --workload=messages|history|read-receipt --preset=<name> [flags]
```

### Quick start
Expand All @@ -232,6 +232,10 @@ loadgen max-rps --workload=messages --preset=medium --steps=500,1k,2k,5k,10k

# history: per-endpoint SLO, custom p95
loadgen max-rps --workload=history --preset=history-medium --steps=200,500,1k,2k --slo-p95=80ms

# read-receipt: seed reader state first, then ramp
loadgen seed --workload=read-receipt --preset=history-medium --read-ratio=0.7
loadgen max-rps --workload=read-receipt --preset=history-medium --steps=200,500,1k,2k
```

Via the deploy Makefile:
Expand All @@ -245,9 +249,10 @@ make -C tools/loadgen/deploy run-max-rps WORKLOAD=history PRESET=history-medium

| Flag | Default | Notes |
|------|---------|-------|
| `--workload` | `messages` | `messages` or `history` |
| `--preset` | (required) | an existing preset for the chosen workload |
| `--steps` | messages `500,1k,2k,5k,10k` / history `200,500,1k,2k,5k` | explicit ordered RPS list; `k` suffix = ×1000 |
| `--workload` | `messages` | `messages`, `history`, or `read-receipt` |
| `--preset` | (required) | an existing preset for the chosen workload (`read-receipt` reuses the history presets) |
| `--steps` | messages `500,1k,2k,5k,10k` / history+read-receipt `200,500,1k,2k,5k` | explicit ordered RPS list; `k` suffix = ×1000 |
| `--request-timeout` | `5s` | **history / read-receipt**: per-request reply timeout |
| `--warmup` | `10s` | per-step warmup (samples discarded) |
| `--hold` | `30s` | per-step measurement window |
| `--cooldown` | `5s` | per-step settle gap before next step |
Expand Down Expand Up @@ -280,3 +285,42 @@ healthy — i.e. the load generator itself, not the service under test, was
the limiting factor, so the step's result can't be trusted. An
INCONCLUSIVE step does **not** count as a pass and does **not** stop the
ramp, even with `--stop-on-trip`; only a hard TRIP stops the ramp.

### Read-receipt workload (`--workload=read-receipt`)

Drives the room-service read-receipt RPC
(`chat.user.{account}.request.room.{roomID}.{siteID}.message.read-receipt`) — a
synchronous request/reply read ("who has read message X") — to find the maximum
sustainable RPS under the latency/error SLOs. Like `history`, it is a read with
no JetStream consumer, so `--slo-pending-growth` is ignored and the per-request
timeout is set with `--request-timeout`.

Read receipts reuse the **history** presets and seed: the requester for each
target is the message's sender (the RPC requires `msgSender == requesterAccount`),
and only top-level messages are used as targets. Reader state must be seeded so
the `ListReadReceipts` Mongo query exercises its real `$match`/`$lookup`/`$unwind`
path instead of short-circuiting on an empty `lastSeenAt` match.

Seed (stamps `lastSeenAt` on a `--read-ratio` fraction — default `0.7` — of each
room's subscribers; requires `CASSANDRA_HOSTS` like the history seed):

```bash
loadgen seed --workload=read-receipt --preset=history-medium --read-ratio=0.7
```

Then ramp:

```bash
loadgen max-rps --workload=read-receipt --preset=history-medium --steps=200,500,1k,2k,5k
```

The gated latency series is named `read-receipt`; the verdict, INCONCLUSIVE
guard, and CSV output behave exactly as for the other workloads.

To tear down, use the history teardown — read-receipt seeds the identical
history fixtures, so `loadgen teardown --workload=history --preset=<name>` drops
everything (dropping `subscriptions` removes the stamped `lastSeenAt` too):

```bash
loadgen teardown --workload=history --preset=history-medium
Comment on lines +407 to +412
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Mention that teardown must reuse the seed.

This example implies any seed works, but runTeardownHistory derives room IDs from the preset+seed before deleting Valkey room keys. If someone seeded read-receipt data with a non-default seed, this command will drop Mongo/Cassandra state but leave the corresponding room keys behind unless the same --seed is passed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/loadgen/README.md` around lines 320 - 325, The README example is
misleading because runTeardownHistory derives room IDs from the preset+seed, so
teardown must be run with the same --seed used to create the fixtures; update
the docs to state this requirement, mention that failing to pass the original
--seed will leave Valkey room keys behind, and change the example for loadgen
teardown --workload=history --preset=history-medium to include a matching --seed
(e.g., --seed=1234) and a short note referencing runTeardownHistory so readers
know why the seed must match.

```
68 changes: 68 additions & 0 deletions tools/loadgen/history_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,74 @@ func runSeedHistory(ctx context.Context, cfg *config, preset string, seed int64)
return 0
}

// runSeedReadReceipt seeds the same Mongo+Cassandra fixtures as the history
// workload, then stamps lastSeenAt on a readRatio fraction of each room's
// subscribers so the read-receipt RPC's ListReadReceipts query returns real
// readers. readRatio must be in (0, 1].
func runSeedReadReceipt(ctx context.Context, cfg *config, preset string, seed int64, readRatio float64) int {
if readRatio <= 0 || readRatio > 1 {
fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]")
return 2
Comment on lines +83 to +85
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

Does Go's flag package accept "NaN" for a float64 flag, and do comparisons like x <= 0 || x > 1 evaluate to false when x is NaN?

💡 Result:

Yes. The standard library flag package’s float64 flag ultimately uses strconv.ParseFloat to parse the flag value, and strconv.ParseFloat recognizes the string "NaN" (case-insensitive) as a NaN float64. [1][2] For comparisons: in Go, any comparison operation (<, <=, >, >=, ==,!=) involving a NaN produces results that follow IEEE 754 semantics: x <= 0 is false when x is NaN, and x > 1 is also false when x is NaN; therefore the boolean expression (x <= 0 || x > 1) evaluates to false when x is NaN. This is explicitly tested by Go’s float comparison test suite (nan <= f, nan > f, and nan <= nan are all false). [3] Note:!= is the only “true” comparison when comparing NaN to anything (including itself), consistent with IEEE 754. [3]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n "readRatio|read-ratio|SeedReadReceiptState|runSeedReadReceipt" tools/loadgen/history_main.go tools/loadgen/main.go || true
echo "---- history_main.go (around guard) ----"
sed -n '60,110p' tools/loadgen/history_main.go
echo "---- history_main.go (around runSeedReadReceipt) ----"
sed -n '110,190p' tools/loadgen/history_main.go
echo "---- main.go (flag wiring) ----"
sed -n '1,220p' tools/loadgen/main.go

Repository: hmchangw/chat

Length of output: 12596


Reject NaN for --read-ratio.

flag.Float64 accepts "NaN", and readRatio <= 0 || readRatio > 1 evaluates to false when readRatio is NaN, so an invalid value can reach SeedReadReceiptState.

Suggested fix
-	if readRatio <= 0 || readRatio > 1 {
+	if !(readRatio > 0 && readRatio <= 1) {
 		fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]")
 		return 2
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if readRatio <= 0 || readRatio > 1 {
fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]")
return 2
if !(readRatio > 0 && readRatio <= 1) {
fmt.Fprintln(os.Stderr, "--read-ratio must be in (0, 1]")
return 2
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/loadgen/history_main.go` around lines 82 - 84, The validation for the
command-line flag readRatio incorrectly allows NaN because flag.Float64 parses
"NaN" and the current check (readRatio <= 0 || readRatio > 1) doesn't catch it;
update the validation in the same block that references readRatio (before
calling SeedReadReceiptState) to explicitly reject NaN (use
math.IsNaN(readRatio)) and treat it as invalid, printing the same error message
"--read-ratio must be in (0, 1]" and returning 2.

}
if cfg.CassandraHosts == "" {
fmt.Fprintln(os.Stderr, "read-receipt workload requires CASSANDRA_HOSTS")
return 2
}
p, ok := BuiltinHistoryPreset(preset)
if !ok {
fmt.Fprintf(os.Stderr, "unknown history preset: %s\n", preset)
return 2
}

db, keyStore, cleanup, err := connectStores(ctx, cfg)
if err != nil {
return 1
}
defer cleanup()

session, err := connectCassandra(cfg)
if err != nil {
slog.Error("cassandra connect", "error", err)
return 1
}
defer cassutil.Close(session)

now := time.Now().UTC()
res := BuildHistoryFixtures(&p, seed, cfg.SiteID, now)

if err := Seed(ctx, db, &res.Fixtures); err != nil {
slog.Error("seed mongo fixtures", "error", err)
return 1
}
if err := SeedRoomKeys(ctx, keyStore, res.Fixtures.RoomKeys); err != nil {
slog.Error("seed room keys", "error", err)
return 1
}
if err := SeedThreadRooms(ctx, db, &res.Plan, cfg.SiteID); err != nil {
slog.Error("seed thread rooms", "error", err)
return 1
}
sizer := msgbucket.New(time.Duration(cfg.MessageBucketHours) * time.Hour)
if err := SeedHistoryCassandra(ctx, session, sizer, &res.Plan, cfg.SiteID); err != nil {
slog.Error("seed cassandra messages", "error", err)
return 1
}
if err := SeedReadReceiptState(ctx, db, res.Fixtures.Subscriptions, &res.Plan, readRatio, seed); err != nil {
slog.Error("seed read-receipt reader state", "error", err)
return 1
}

slog.Info("seed complete (read-receipt)",
"preset", p.Name,
"users", len(res.Fixtures.Users),
"rooms", len(res.Fixtures.Rooms),
"subs", len(res.Fixtures.Subscriptions),
"messages", len(res.Plan.Messages),
"readRatio", readRatio,
"bucketHours", cfg.MessageBucketHours)
return 0
}

func runTeardownHistory(ctx context.Context, cfg *config, preset string, seed int64) int {
if cfg.CassandraHosts == "" {
fmt.Fprintln(os.Stderr, "history workload requires CASSANDRA_HOSTS")
Expand Down
5 changes: 4 additions & 1 deletion tools/loadgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ func dispatch(ctx context.Context, cfg *config) int {

func runSeed(ctx context.Context, cfg *config, args []string) int {
fs := flag.NewFlagSet("seed", flag.ExitOnError)
workload := fs.String("workload", "messages", "messages|members|history")
workload := fs.String("workload", "messages", "messages|members|history|read-receipt")
preset := fs.String("preset", "", "preset name")
seed := fs.Int64("seed", 42, "RNG seed")
readRatio := fs.Float64("read-ratio", 0.7, "read-receipt only: fraction of each room's subscribers to mark as readers")
_ = fs.Parse(args)
if *preset == "" {
fmt.Fprintln(os.Stderr, "--preset required")
Expand All @@ -118,6 +119,8 @@ func runSeed(ctx context.Context, cfg *config, args []string) int {
return runSeedMembers(ctx, cfg, *preset, *seed)
case "history":
return runSeedHistory(ctx, cfg, *preset, *seed)
case "read-receipt":
return runSeedReadReceipt(ctx, cfg, *preset, *seed, *readRatio)
default:
fmt.Fprintf(os.Stderr, "unknown workload: %s\n", *workload)
return 2
Expand Down
Loading
Loading