diff --git a/.gitignore b/.gitignore index 01ff89d57..106713f6d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ bin/ demo /seed-sample-data +/loadgen *.exe *.exe~ *.dll diff --git a/docs/superpowers/plans/2026-06-02-loadgen-bottleneck-attribution.md b/docs/superpowers/plans/2026-06-02-loadgen-bottleneck-attribution.md new file mode 100644 index 000000000..0a184c22d --- /dev/null +++ b/docs/superpowers/plans/2026-06-02-loadgen-bottleneck-attribution.md @@ -0,0 +1,1511 @@ +# loadgen Bottleneck Attribution Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** When a `max-rps --workload=messages` ramp trips, append a `BOTTLENECK:` block to the verdict that names the culprit component and saturated resource, by fusing loadgen's per-stage signals with cAdvisor container CPU trends from Prometheus. + +**Architecture:** A new attribution engine in `tools/loadgen` runs once, on the first tripped step. It reads the breaching step + the prior passing step (both now carrying their hold-window wall-clock times), walks a declarative messages stage-graph, and for each stage checks two predicates — *backing up* (durable backlog delta > 0 or its latency series breached the SLO) and *saturated* (CPU "knee": cores used barely rose between the passing and tripping windows even though offered RPS rose). The first stage that is both backing-up and saturated (or whose downstream dependency is saturated) is the culprit. Falls back to pure CPU ranking, then to `undetermined`. Purely additive — never fails the run. + +**Tech Stack:** Go 1.25, `flag`-based CLI subcommands, Resty (via `pkg/restyutil`) for the Prometheus HTTP API, `caarlos0/env` for config, `testify` + `httptest` for tests. No new third-party dependencies. + +--- + +## File Structure + +**New files (all in `tools/loadgen/`, `package main`):** +- `promclient.go` / `promclient_test.go` — Prometheus `query_range` HTTP client; one `RangeQuery` method returning typed series. +- `stagegraph.go` / `stagegraph_test.go` — declarative messages pipeline stage-graph + dependency display names. Pure data. +- `identity.go` / `identity_test.go` — logical service name → cAdvisor PromQL selector (compose-service label, with short-ID fallback). Pure function. +- `attribution.go` / `attribution_test.go` — the engine: CPU-knee test, causality walk, fallback, undetermined. Takes a `promQuerier` interface so unit tests inject a fake. +- `attribution_report.go` / `attribution_report_test.go` — formats the `bottleneckVerdict` into the `BOTTLENECK:` block. + +**Modified files:** +- `verdict.go` — add `HoldStart/HoldEnd time.Time` and `Pending []consumerPendingDelta` to `rpsStepResult`; populate in `evaluateRPSStep`. +- `ramp.go` — record each step's hold window wall-clock in `runRamp`. +- `main.go` — add `bottleneckConfig` to `config`. +- `maxrps.go` — build the engine after the ramp and pass its verdict to the reporter. +- `maxrps_report.go` — print the `BOTTLENECK:` block after `ANSWER:`; add culprit columns to the CSV trip row. +- `tools/loadgen/deploy/prometheus/prometheus.yml` — add a cAdvisor scrape job. +- `tools/loadgen/deploy/docker-compose.yml` + `README.md` — bring up cAdvisor; document the feature. + +--- + +## Task 1: Carry the hold window + per-durable deltas on each step result + +The engine needs (a) each step's measurement-window wall-clock times to query Prometheus over the right interval, and (b) the per-durable backlog deltas (today only the *worst* durable is kept). Add both to `rpsStepResult`. + +**Files:** +- Modify: `tools/loadgen/verdict.go` +- Modify: `tools/loadgen/ramp.go` +- Test: `tools/loadgen/verdict_test.go` (add cases), `tools/loadgen/ramp_test.go` (add case) + +- [ ] **Step 1: Write the failing test for the new result fields** + +Add to `tools/loadgen/verdict_test.go`. This file currently imports only `testify/assert`; add `"github.com/stretchr/testify/require"` to its import block first. + +```go +func TestEvaluateRPSStep_CopiesPendingAndWindow(t *testing.T) { + in := &rpsStepInputs{ + TargetRPS: 1000, + Hold: 30 * time.Second, + AttemptedOps: 30000, + Pending: []consumerPendingDelta{ + {Durable: "message-worker", Start: 0, End: 5000}, + {Durable: "broadcast-worker", Start: 0, End: 10}, + }, + } + res := evaluateRPSStep(in, buildThresholds(100*time.Millisecond, 250*time.Millisecond, 0.001, 1000, 0.05)) + require.Len(t, res.Pending, 2) + assert.Equal(t, "message-worker", res.Pending[0].Durable) + assert.Equal(t, int64(5000), res.Pending[0].Delta()) +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `res.Pending` undefined. + +- [ ] **Step 3: Add the fields and populate them** + +In `tools/loadgen/verdict.go`, add to the `rpsStepResult` struct (after `Reasons []string`): + +```go + // Pending carries every durable's backlog delta for the step (not just the + // worst), so the bottleneck engine can map a delta to a pipeline stage. + Pending []consumerPendingDelta + // HoldStart/HoldEnd bound the step's measurement window in wall-clock time, + // set by runRamp. Used to query container metrics over the same interval. + HoldStart, HoldEnd time.Time +``` + +In `evaluateRPSStep`, right after the `res := rpsStepResult{...}` literal, add: + +```go + res.Pending = in.Pending +``` + +- [ ] **Step 4: Write the failing test for window capture in runRamp** + +Add to `tools/loadgen/ramp_test.go` (reuse any existing fake `rpsWorkload`; if none exists, add this minimal one): + +```go +type windowFakeWorkload struct{ hold time.Duration } + +func (f windowFakeWorkload) Label() string { return "fake" } +func (f windowFakeWorkload) RunStep(ctx context.Context, rps int, warmup, hold time.Duration) (rpsStepInputs, error) { + time.Sleep(2 * time.Millisecond) // simulate a measurement window + return rpsStepInputs{TargetRPS: rps, Hold: hold, AttemptedOps: rps}, nil +} + +func TestRunRamp_RecordsHoldWindow(t *testing.T) { + results := runRamp(context.Background(), windowFakeWorkload{}, &rampConfig{ + Steps: []int{100}, + Hold: 30 * time.Second, + Thresholds: buildThresholds(time.Second, time.Second, 1, 1<<62, 0), + }) + require.Len(t, results, 1) + assert.False(t, results[0].HoldStart.IsZero(), "HoldStart should be set") + assert.False(t, results[0].HoldEnd.IsZero(), "HoldEnd should be set") + assert.True(t, results[0].HoldEnd.After(results[0].HoldStart)) +} +``` + +- [ ] **Step 5: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `HoldStart`/`HoldEnd` are zero. + +- [ ] **Step 6: Capture the window in runRamp** + +In `tools/loadgen/ramp.go`, inside `runRamp`'s loop, replace: + +```go + in, err := w.RunStep(ctx, n, cfg.Warmup, cfg.Hold) +``` + +with: + +```go + stepStart := time.Now() + in, err := w.RunStep(ctx, n, cfg.Warmup, cfg.Hold) + stepEnd := time.Now() +``` + +and, right after `res := evaluateRPSStep(&in, cfg.Thresholds)`, add: + +```go + // RunStep does warmup then hold sequentially; approximate the hold + // window as [start+warmup, end] so metric queries skip the ramp-up. + res.HoldStart = stepStart.Add(cfg.Warmup) + res.HoldEnd = stepEnd +``` + +- [ ] **Step 7: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 8: Commit** + +```bash +git add tools/loadgen/verdict.go tools/loadgen/verdict_test.go tools/loadgen/ramp.go tools/loadgen/ramp_test.go +git commit -m "feat(loadgen): carry hold window + per-durable deltas on step result" +``` + +--- + +## Task 2: Prometheus range-query client + +A thin client over the Prometheus HTTP API `GET /api/v1/query_range`, returning typed time-ordered samples per series. Built on the shared `restyutil.New`. + +**Files:** +- Create: `tools/loadgen/promclient.go` +- Test: `tools/loadgen/promclient_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/promclient_test.go`: + +```go +package main + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPromClient_RangeQuery_ParsesMatrix(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/v1/query_range", r.URL.Path) + assert.NotEmpty(t, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status":"success", + "data":{"resultType":"matrix","result":[ + {"metric":{"container_label_com_docker_compose_service":"cassandra"}, + "values":[[100,"10.5"],[105,"11.0"]]} + ]}}`)) + })) + defer srv.Close() + + c := newPromClient(srv.URL) + start := time.Unix(100, 0) + series, err := c.RangeQuery(context.Background(), `up`, start, start.Add(5*time.Second), 5*time.Second) + require.NoError(t, err) + require.Len(t, series, 1) + assert.Equal(t, "cassandra", series[0].Labels["container_label_com_docker_compose_service"]) + require.Len(t, series[0].Samples, 2) + assert.Equal(t, 10.5, series[0].Samples[0].V) + assert.Equal(t, 11.0, series[0].Samples[1].V) + assert.Equal(t, time.Unix(105, 0).UTC(), series[0].Samples[1].T.UTC()) +} + +func TestPromClient_RangeQuery_NonSuccessStatus(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"status":"error","errorType":"bad_data","error":"boom"}`)) + })) + defer srv.Close() + + _, err := newPromClient(srv.URL).RangeQuery(context.Background(), `up`, time.Unix(0, 0), time.Unix(5, 0), time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "boom") +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `newPromClient` undefined. + +- [ ] **Step 3: Implement the client** + +Create `tools/loadgen/promclient.go`: + +```go +package main + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/go-resty/resty/v2" + + "github.com/hmchangw/chat/pkg/restyutil" +) + +// promSample is one (timestamp, value) point from a Prometheus matrix result. +type promSample struct { + T time.Time + V float64 +} + +// promSeries is one labelled time-series returned by a range query. +type promSeries struct { + Labels map[string]string + Samples []promSample +} + +// promClient queries the Prometheus HTTP API. It is the production promQuerier. +type promClient struct { + rc *resty.Client +} + +// newPromClient builds a client against a Prometheus base URL (e.g. +// "http://prometheus:9090"). A short timeout keeps a slow/missing Prometheus +// from stalling the end-of-run report. +func newPromClient(baseURL string) *promClient { + return &promClient{rc: restyutil.New(baseURL, restyutil.WithTimeout(10*time.Second))} +} + +// rangeQueryResponse mirrors the subset of the query_range payload we read. +type rangeQueryResponse struct { + Status string `json:"status"` + Error string `json:"error"` + Data struct { + Result []struct { + Metric map[string]string `json:"metric"` + Values [][2]any `json:"values"` + } `json:"result"` + } `json:"data"` +} + +// RangeQuery runs a PromQL range query and returns one promSeries per result. +func (c *promClient) RangeQuery(ctx context.Context, query string, start, end time.Time, step time.Duration) ([]promSeries, error) { + resp, err := c.rc.R(). + SetContext(ctx). + SetQueryParams(map[string]string{ + "query": query, + "start": strconv.FormatInt(start.Unix(), 10), + "end": strconv.FormatInt(end.Unix(), 10), + "step": strconv.FormatFloat(step.Seconds(), 'f', -1, 64), + }). + Get("/api/v1/query_range") + if err != nil { + return nil, fmt.Errorf("query prometheus: %w", err) + } + + var parsed rangeQueryResponse + if err := json.Unmarshal(resp.Body(), &parsed); err != nil { + return nil, fmt.Errorf("decode prometheus response: %w", err) + } + if parsed.Status != "success" { + return nil, fmt.Errorf("prometheus query failed: %s", parsed.Error) + } + + out := make([]promSeries, 0, len(parsed.Data.Result)) + for _, r := range parsed.Data.Result { + s := promSeries{Labels: r.Metric} + for _, v := range r.Values { + ts, ok := v[0].(float64) + if !ok { + continue + } + raw, ok := v[1].(string) + if !ok { + continue + } + val, err := strconv.ParseFloat(raw, 64) + if err != nil { + continue + } + s.Samples = append(s.Samples, promSample{T: time.Unix(int64(ts), 0).UTC(), V: val}) + } + out = append(out, s) + } + return out, nil +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/promclient.go tools/loadgen/promclient_test.go +git commit -m "feat(loadgen): add Prometheus range-query client" +``` + +--- + +## Task 3: Messages stage-graph + +A declarative description of the messages pipeline: each stage's logical name, its cAdvisor compose-service container, the durable that fronts it (if any), the latency series that measures it (if any), and its downstream dependencies. Plus a display-name helper for dependencies. + +**Files:** +- Create: `tools/loadgen/stagegraph.go` +- Test: `tools/loadgen/stagegraph_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/stagegraph_test.go`: + +```go +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMessagesStageGraph_Shape(t *testing.T) { + g := messagesStageGraph() + require.Len(t, g, 3) + assert.Equal(t, "message-gatekeeper", g[0].Name) + assert.Equal(t, "E1", g[0].LatencySeries) + assert.Empty(t, g[0].Durable) + + assert.Equal(t, "message-worker", g[1].Name) + assert.Equal(t, "message-worker", g[1].Durable) + assert.Equal(t, []string{"cassandra"}, g[1].DependsOn) + + assert.Equal(t, "broadcast-worker", g[2].Name) + assert.Equal(t, "broadcast-worker", g[2].Durable) + assert.Equal(t, "E2", g[2].LatencySeries) +} + +func TestDependencyDisplayName(t *testing.T) { + assert.Equal(t, "Cassandra", dependencyDisplayName("cassandra")) + assert.Equal(t, "MongoDB", dependencyDisplayName("mongo")) + assert.Equal(t, "valkey", dependencyDisplayName("valkey")) // unknown -> as-is +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `messagesStageGraph` undefined. + +- [ ] **Step 3: Implement the stage-graph** + +Create `tools/loadgen/stagegraph.go`: + +```go +package main + +// stage is one node in a workload's pipeline. The bottleneck engine walks +// stages in flow order, mapping loadgen's signals (durable backlog, latency +// series) and cAdvisor's container metrics onto each one. +type stage struct { + Name string // logical component name, used in the verdict + Container string // cAdvisor compose-service label value + Durable string // durable consumer fronting this stage; "" if none + LatencySeries string // loadgen latency series measuring this stage; "" if none + DependsOn []string // downstream components this stage calls into +} + +// messagesStageGraph describes the messages pipeline: +// publish -> message-gatekeeper -> MESSAGES_CANONICAL -> {message-worker (Cassandra), +// broadcast-worker (MongoDB membership + Valkey keys)}. E1 latency measures the +// gatekeeper front door; E2 is the end-to-end publish->broadcast time. +func messagesStageGraph() []stage { + return []stage{ + {Name: "message-gatekeeper", Container: "message-gatekeeper", LatencySeries: "E1"}, + {Name: "message-worker", Container: "message-worker", Durable: "message-worker", DependsOn: []string{"cassandra"}}, + {Name: "broadcast-worker", Container: "broadcast-worker", Durable: "broadcast-worker", LatencySeries: "E2", DependsOn: []string{"mongo", "valkey"}}, + } +} + +// dependencyDisplayName maps an internal dependency key to a human label for +// the verdict ("message-worker (Cassandra-bound)"). Unknown keys pass through. +func dependencyDisplayName(dep string) string { + switch dep { + case "cassandra": + return "Cassandra" + case "mongo": + return "MongoDB" + case "valkey": + return "Valkey" + default: + return dep + } +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/stagegraph.go tools/loadgen/stagegraph_test.go +git commit -m "feat(loadgen): add messages pipeline stage-graph" +``` + +--- + +## Task 4: Container identity resolver + +Builds the PromQL metric selector for a logical service name. Prefers the cAdvisor compose-service label; if the operator supplied a `shortid:name` fallback map (for hosts where cAdvisor doesn't populate the label), it selects by the cgroup-path `id` instead. + +**Files:** +- Create: `tools/loadgen/identity.go` +- Test: `tools/loadgen/identity_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/identity_test.go`: + +```go +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseContainerMap(t *testing.T) { + m, err := parseContainerMap("0a1b2c3d4e5f:cassandra,deadbeef0000:mongo") + require.NoError(t, err) + assert.Equal(t, "0a1b2c3d4e5f", m["cassandra"]) + assert.Equal(t, "deadbeef0000", m["mongo"]) +} + +func TestParseContainerMap_Empty(t *testing.T) { + m, err := parseContainerMap("") + require.NoError(t, err) + assert.Empty(t, m) +} + +func TestParseContainerMap_Malformed(t *testing.T) { + _, err := parseContainerMap("noseparator") + require.Error(t, err) +} + +func TestIdentityResolver_Selector(t *testing.T) { + r := identityResolver{fallback: map[string]string{"cassandra": "0a1b2c3d4e5f"}} + assert.Equal(t, `container_label_com_docker_compose_service="message-worker"`, r.selector("message-worker")) + assert.Equal(t, `id=~".*0a1b2c3d4e5f.*"`, r.selector("cassandra")) +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `parseContainerMap` / `identityResolver` undefined. + +- [ ] **Step 3: Implement the resolver** + +Create `tools/loadgen/identity.go`: + +```go +package main + +import ( + "fmt" + "strings" +) + +// identityResolver maps a logical service name to a cAdvisor PromQL label +// selector. The fallback map (name -> 12-char container short-ID) is used on +// hosts where cAdvisor cannot populate the compose-service label. +type identityResolver struct { + fallback map[string]string +} + +// parseContainerMap parses "shortid:name,shortid2:name2" into a name->shortid +// map. An empty string yields an empty map. +func parseContainerMap(s string) (map[string]string, error) { + out := map[string]string{} + if strings.TrimSpace(s) == "" { + return out, nil + } + for _, pair := range strings.Split(s, ",") { + id, name, ok := strings.Cut(strings.TrimSpace(pair), ":") + if !ok || id == "" || name == "" { + return nil, fmt.Errorf("bad container-map entry %q (want shortid:name)", pair) + } + out[name] = id + } + return out, nil +} + +// selector returns the inner PromQL label matcher (no metric name, no braces) +// that identifies the given service's container. +func (r identityResolver) selector(service string) string { + if id, ok := r.fallback[service]; ok { + return fmt.Sprintf(`id=~".*%s.*"`, id) + } + return fmt.Sprintf(`container_label_com_docker_compose_service=%q`, service) +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/identity.go tools/loadgen/identity_test.go +git commit -m "feat(loadgen): add cAdvisor container identity resolver" +``` + +--- + +## Task 5: Attribution engine — CPU-knee helper + +The saturation primitive. Given a service and two windows (passing step, tripping step), query the CPU usage counter over each window, compute cores-used as `(lastSample - firstSample) / windowSeconds`, and decide "saturated" = the trip-window cores barely rose over the pass-window cores (a plateau) while above an idle floor. + +**Files:** +- Create: `tools/loadgen/attribution.go` +- Test: `tools/loadgen/attribution_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/attribution_test.go`: + +```go +package main + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeProm returns canned counter samples per service name. The key is matched +// as a substring of the query (the query embeds the service selector). +type fakeProm struct { + // byService[service] = samples for the *next* RangeQuery whose query + // mentions that service. Windows are distinguished by start time via fn. + fn func(query string, start, end time.Time) []promSample + err error +} + +func (f fakeProm) RangeQuery(_ context.Context, query string, start, end time.Time, _ time.Duration) ([]promSeries, error) { + if f.err != nil { + return nil, f.err + } + samples := f.fn(query, start, end) + if samples == nil { + return nil, nil + } + return []promSeries{{Labels: map[string]string{}, Samples: samples}}, nil +} + +func counterSamples(start time.Time, startVal, cores float64, windowSec int) []promSample { + // Linear counter: startVal at t0, rising `cores` per second for windowSec. + return []promSample{ + {T: start, V: startVal}, + {T: start.Add(time.Duration(windowSec) * time.Second), V: startVal + cores*float64(windowSec)}, + } +} + +func TestEngine_cpuCores(t *testing.T) { + start := time.Unix(1000, 0) + q := fakeProm{fn: func(_ string, s, _ time.Time) []promSample { + return counterSamples(s, 100, 2.5, 30) // 2.5 cores over 30s + }} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + cores, reset, ok := eng.cpuCores(context.Background(), "message-worker", start, start.Add(30*time.Second)) + require.True(t, ok) + assert.False(t, reset) + assert.InDelta(t, 2.5, cores, 0.001) +} + +func TestEngine_cpuCores_CounterReset(t *testing.T) { + start := time.Unix(1000, 0) + q := fakeProm{fn: func(_ string, s, _ time.Time) []promSample { + return []promSample{{T: s, V: 500}, {T: s.Add(30 * time.Second), V: 3}} // dropped -> restart + }} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + _, reset, ok := eng.cpuCores(context.Background(), "x", start, start.Add(30*time.Second)) + require.True(t, ok) + assert.True(t, reset) +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `newBottleneckEngine` undefined. + +- [ ] **Step 3: Implement the engine scaffold + cpuCores** + +Create `tools/loadgen/attribution.go`: + +```go +package main + +import ( + "context" + "fmt" + "log/slog" + "time" +) + +const ( + // cpuIdleFloorCores: a container using less than this over the trip window + // is treated as idle and never blamed. + cpuIdleFloorCores = 0.05 +) + +// promQuerier is the consumer-defined seam over Prometheus, so unit tests can +// inject a fake without a live server. +type promQuerier interface { + RangeQuery(ctx context.Context, query string, start, end time.Time, step time.Duration) ([]promSeries, error) +} + +// bottleneckVerdict is the engine's output, rendered as the BOTTLENECK: block. +type bottleneckVerdict struct { + Component string // culprit component, "" when undetermined + Resource string // "CPU", a dependency display name, or "unknown" + Confidence string // "high" | "medium" | "low" + Reasons []string // human-readable causal lines + Determined bool // false -> render "undetermined ()" +} + +// bottleneckEngine fuses loadgen signals with cAdvisor CPU trends. +type bottleneckEngine struct { + q promQuerier + ident identityResolver + knee float64 // max relative CPU rise still counted as a plateau + step time.Duration // PromQL query step +} + +func newBottleneckEngine(q promQuerier, ident identityResolver, knee float64, step time.Duration) *bottleneckEngine { + return &bottleneckEngine{q: q, ident: ident, knee: knee, step: step} +} + +// cpuCores returns mean cores used by service over [start,end], derived from +// the CPU usage counter. reset=true when the counter dropped (container +// restart) — callers treat that as a memory/restart signal, not a CPU rate. +func (e *bottleneckEngine) cpuCores(ctx context.Context, service string, start, end time.Time) (cores float64, reset bool, ok bool) { + query := fmt.Sprintf(`container_cpu_usage_seconds_total{%s}`, e.ident.selector(service)) + series, err := e.q.RangeQuery(ctx, query, start, end, e.step) + if err != nil { + slog.Warn("cpu query failed", "service", service, "error", err) + return 0, false, false + } + // Sum across any matching cgroup series (cAdvisor may emit several). + var first, last float64 + var t0, t1 time.Time + var have bool + for _, s := range series { + if len(s.Samples) < 2 { + continue + } + first += s.Samples[0].V + last += s.Samples[len(s.Samples)-1].V + t0 = s.Samples[0].T + t1 = s.Samples[len(s.Samples)-1].T + have = true + } + if !have || !t1.After(t0) { + return 0, false, false + } + if last < first { + return 0, true, true + } + return (last - first) / t1.Sub(t0).Seconds(), false, true +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/attribution.go tools/loadgen/attribution_test.go +git commit -m "feat(loadgen): add bottleneck engine CPU-knee primitive" +``` + +--- + +## Task 6: Attribution engine — causality walk + +The decision logic. Given the tripping step result, the last passing step result, and the stage-graph, produce a `bottleneckVerdict` via the precedence: high (stage CPU-knee) → high (dependency CPU-knee) → medium (backs up, no knee) → low (pure CPU ranking) → undetermined. + +**Files:** +- Modify: `tools/loadgen/attribution.go` +- Test: `tools/loadgen/attribution_test.go` + +- [ ] **Step 1: Write the failing tests** + +Append to `tools/loadgen/attribution_test.go`: + +```go +func tripResult(window time.Time) *rpsStepResult { + return &rpsStepResult{ + TargetRPS: 2000, + HoldStart: window, HoldEnd: window.Add(30 * time.Second), + Latencies: []seriesPercentile{ + {Name: "E1", Pct: Percentiles{P95: 20 * time.Millisecond}}, + {Name: "E2", Pct: Percentiles{P95: 200 * time.Millisecond}}, + }, + Pending: []consumerPendingDelta{ + {Durable: "message-worker", Start: 0, End: 12000}, + {Durable: "broadcast-worker", Start: 0, End: 0}, + }, + } +} + +func passResult(window time.Time) *rpsStepResult { + return &rpsStepResult{ + TargetRPS: 1000, + HoldStart: window, HoldEnd: window.Add(30 * time.Second), + } +} + +var slo = buildThresholds(100*time.Millisecond, 250*time.Millisecond, 0.001, 1000, 0.05) + +// stageProm returns per-service cores keyed by service, with a plateau for +// services in `plateau` (same cores in both windows) and growth otherwise. +func stageProm(passT, tripT time.Time, plateau map[string]float64) fakeProm { + return fakeProm{fn: func(query string, s, _ time.Time) []promSample { + for svc, cores := range plateau { + if strings.Contains(query, svc) { + return counterSamples(s, 0, cores, 30) // same cores both windows -> plateau + } + } + // non-plateau services: grow a lot from pass to trip window + base := 0.2 + if s.Equal(tripT) { + base = 2.0 + } + return counterSamples(s, 0, base, 30) + }} +} + +func TestEngine_DependencyBound(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // message-worker backs up; cassandra CPU plateaus -> Cassandra-bound, high. + q := stageProm(passT, tripT, map[string]float64{"cassandra": 3.8, "message-worker": 0.4}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(tripT), passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-worker", v.Component) + assert.Equal(t, "Cassandra", v.Resource) + assert.Equal(t, "high", v.Confidence) +} + +func TestEngine_StageCPUBound(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // E1 (gatekeeper) breaches and gatekeeper CPU plateaus -> CPU-bound, high. + trip := tripResult(tripT) + trip.Latencies[0].Pct.P95 = 150 * time.Millisecond // E1 over SLO + trip.Pending[0].End = 0 // no worker backlog + q := stageProm(passT, tripT, map[string]float64{"message-gatekeeper": 4.0}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), trip, passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-gatekeeper", v.Component) + assert.Equal(t, "CPU", v.Resource) + assert.Equal(t, "high", v.Confidence) +} + +func TestEngine_BacksUpNoKnee_Medium(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // worker backs up, but nothing plateaus (all CPU still rising) -> medium/unknown. + q := stageProm(passT, tripT, nil) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(tripT), passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-worker", v.Component) + assert.Equal(t, "unknown", v.Resource) + assert.Equal(t, "medium", v.Confidence) +} + +func TestEngine_NoBackup_FallbackRanking_Low(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + trip := tripResult(tripT) + trip.Pending[0].End = 0 // nothing backs up, no latency breach + // cassandra has the clearest plateau at the highest cores -> low-confidence pick. + q := stageProm(passT, tripT, map[string]float64{"cassandra": 3.8}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), trip, passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "cassandra", v.Component) + assert.Equal(t, "low", v.Confidence) +} + +func TestEngine_NoPassStep_Undetermined(t *testing.T) { + eng := newBottleneckEngine(stageProm(time.Unix(1, 0), time.Unix(2, 0), nil), identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(time.Unix(2000, 0)), nil, messagesStageGraph(), slo) + assert.False(t, v.Determined) + assert.Contains(t, v.Reasons[0], "no passing step") +} + +func TestEngine_PromError_Undetermined(t *testing.T) { + eng := newBottleneckEngine(fakeProm{err: assertAnErr{}}, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(time.Unix(2000, 0)), passResult(time.Unix(1000, 0)), messagesStageGraph(), slo) + assert.False(t, v.Determined) +} + +type assertAnErr struct{} + +func (assertAnErr) Error() string { return "prom down" } +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `Diagnose` undefined. + +- [ ] **Step 3: Implement Diagnose + helpers** + +Append to `tools/loadgen/attribution.go`: + +```go +// saturated reports whether service plateaued between the pass and trip windows +// — cores stayed above the idle floor but rose by less than the knee fraction +// even though offered RPS rose. A counter reset also counts (restart under +// pressure). dataOK=false means we couldn't measure it. +func (e *bottleneckEngine) saturated(ctx context.Context, service string, pass, trip *rpsStepResult) (sat, dataOK bool) { + tripCores, reset, okT := e.cpuCores(ctx, service, trip.HoldStart, trip.HoldEnd) + if !okT { + return false, false + } + if reset { + return true, true + } + if tripCores < cpuIdleFloorCores { + return false, true + } + passCores, _, okP := e.cpuCores(ctx, service, pass.HoldStart, pass.HoldEnd) + if !okP || passCores <= 0 { + // No baseline: treat a high absolute trip usage as saturated. + return tripCores >= 1.0, true + } + rise := (tripCores - passCores) / passCores + return rise < e.knee, true +} + +// stageBackingUp reports whether a stage is accumulating backlog or breaching +// its latency SLO at the tripping step. +func stageBackingUp(st stage, trip *rpsStepResult, th rpsThresholds) bool { + if st.Durable != "" { + for _, p := range trip.Pending { + if p.Durable == st.Durable && p.Delta() > 0 { + return true + } + } + } + if st.LatencySeries != "" { + for _, sp := range trip.Latencies { + if sp.Name == st.LatencySeries && (sp.Pct.P95 > th.P95 || sp.Pct.P99 > th.P99) { + return true + } + } + } + return false +} + +// Diagnose applies the attribution precedence and returns a verdict. It never +// returns an error; measurement gaps degrade to a lower confidence or to +// undetermined. +func (e *bottleneckEngine) Diagnose(ctx context.Context, trip, pass *rpsStepResult, graph []stage, th rpsThresholds) bottleneckVerdict { + if pass == nil { + return bottleneckVerdict{Reasons: []string{"no passing step before breach; cannot compute CPU knee"}} + } + + // Pass 1: first backing-up stage that is itself CPU-saturated -> high. + for _, st := range graph { + if !stageBackingUp(st, trip, th) { + continue + } + if sat, ok := e.saturated(ctx, st.Container, pass, trip); ok && sat { + return bottleneckVerdict{ + Component: st.Name, Resource: "CPU", Confidence: "high", Determined: true, + Reasons: []string{ + fmt.Sprintf("%s is the first stage to back up", st.Name), + fmt.Sprintf("%s CPU plateaued between %d and %d rps while load rose", st.Container, pass.TargetRPS, trip.TargetRPS), + }, + } + } + } + + // Pass 2: first backing-up stage whose downstream dependency is saturated -> high. + for _, st := range graph { + if !stageBackingUp(st, trip, th) { + continue + } + for _, dep := range st.DependsOn { + if sat, ok := e.saturated(ctx, dep, pass, trip); ok && sat { + return bottleneckVerdict{ + Component: st.Name, Resource: dependencyDisplayName(dep), Confidence: "high", Determined: true, + Reasons: []string{ + fmt.Sprintf("%s consumer backlog grew (first stage to back up)", st.Name), + fmt.Sprintf("%s CPU plateaued between %d and %d rps while load rose", dep, pass.TargetRPS, trip.TargetRPS), + }, + } + } + } + } + + // Pass 3: first backing-up stage, nothing saturated -> medium / unknown. + for _, st := range graph { + if stageBackingUp(st, trip, th) { + return bottleneckVerdict{ + Component: st.Name, Resource: "unknown", Confidence: "medium", Determined: true, + Reasons: []string{fmt.Sprintf("%s backs up but no resource knee found — likely I/O or lock wait", st.Name)}, + } + } + } + + // Pass 4: nothing backed up -> rank containers by clearest saturation -> low. + if v, ok := e.fallbackRanking(ctx, pass, trip, graph); ok { + return v + } + + // Pass 5: nothing stands out. + return bottleneckVerdict{Reasons: []string{"no stage backed up and no container saturated in the breach window"}} +} + +// fallbackRanking picks the saturated container with the highest trip-window +// cores across all stages and their dependencies. Confidence low. +func (e *bottleneckEngine) fallbackRanking(ctx context.Context, pass, trip *rpsStepResult, graph []stage) (bottleneckVerdict, bool) { + seen := map[string]bool{} + var best string + var bestCores float64 + consider := func(svc string) { + if seen[svc] { + return + } + seen[svc] = true + sat, ok := e.saturated(ctx, svc, pass, trip) + if !ok || !sat { + return + } + cores, _, _ := e.cpuCores(ctx, svc, trip.HoldStart, trip.HoldEnd) + if cores > bestCores { + bestCores, best = cores, svc + } + } + for _, st := range graph { + consider(st.Container) + for _, dep := range st.DependsOn { + consider(dep) + } + } + if best == "" { + return bottleneckVerdict{}, false + } + return bottleneckVerdict{ + Component: best, Resource: "CPU", Confidence: "low", Determined: true, + Reasons: []string{fmt.Sprintf("resource-ranking fallback: %s had the clearest CPU plateau (%.1f cores)", best, bestCores)}, + }, true +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS (all `TestEngine_*` cases). + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/attribution.go tools/loadgen/attribution_test.go +git commit -m "feat(loadgen): add bottleneck causality walk + fallback" +``` + +--- + +## Task 7: Render the BOTTLENECK block + +Format a `bottleneckVerdict` into the text block appended under `ANSWER:`. + +**Files:** +- Create: `tools/loadgen/attribution_report.go` +- Test: `tools/loadgen/attribution_report_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/attribution_report_test.go`: + +```go +package main + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRenderBottleneck_Determined(t *testing.T) { + var sb strings.Builder + renderBottleneck(&sb, bottleneckVerdict{ + Component: "message-worker", Resource: "Cassandra", Confidence: "high", Determined: true, + Reasons: []string{"message-worker consumer backlog grew", "cassandra CPU plateaued"}, + }) + out := sb.String() + assert.Contains(t, out, "BOTTLENECK: message-worker (Cassandra-bound)") + assert.Contains(t, out, "message-worker consumer backlog grew") + assert.Contains(t, out, "confidence: high") +} + +func TestRenderBottleneck_Undetermined(t *testing.T) { + var sb strings.Builder + renderBottleneck(&sb, bottleneckVerdict{Reasons: []string{"prometheus unreachable"}}) + assert.Contains(t, sb.String(), "BOTTLENECK: undetermined (prometheus unreachable)") +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `renderBottleneck` undefined. + +- [ ] **Step 3: Implement the renderer** + +Create `tools/loadgen/attribution_report.go`: + +```go +package main + +import ( + "fmt" + "io" + "strings" +) + +// renderBottleneck writes the BOTTLENECK: block. For an undetermined verdict it +// writes a single line naming why; otherwise it names the culprit, the causal +// reasons, and the confidence. +func renderBottleneck(w io.Writer, v bottleneckVerdict) { + if !v.Determined { + reason := "no signal" + if len(v.Reasons) > 0 { + reason = v.Reasons[0] + } + fmt.Fprintf(w, "BOTTLENECK: undetermined (%s)\n", reason) + return + } + fmt.Fprintf(w, "BOTTLENECK: %s (%s-bound)\n", v.Component, v.Resource) + for _, r := range v.Reasons { + fmt.Fprintf(w, " %s\n", r) + } + fmt.Fprintf(w, " confidence: %s\n", v.Confidence) +} + +// bottleneckCSVColumns returns the trip-row culprit columns appended to the CSV. +func bottleneckCSVColumns(v bottleneckVerdict) []string { + if !v.Determined { + return []string{"undetermined", "", ""} + } + return []string{v.Component, strings.ToLower(v.Resource), v.Confidence} +} +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/attribution_report.go tools/loadgen/attribution_report_test.go +git commit -m "feat(loadgen): render BOTTLENECK verdict block" +``` + +--- + +## Task 8: Config — bottleneck settings + +Add the bottleneck config (env-driven) to the loadgen `config` struct. + +**Files:** +- Modify: `tools/loadgen/main.go` +- Test: covered indirectly; no new unit test (config is parsed by `caarlos0/env`, exercised in Task 9 wiring). Add a small parse test to satisfy coverage. + +- [ ] **Step 1: Write the failing test** + +Create `tools/loadgen/config_bottleneck_test.go`: + +```go +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBottleneckConfig_Defaults(t *testing.T) { + var c bottleneckConfig + // zero value should be safe; the wiring treats Enabled=false as off. + assert.False(t, c.Enabled) + assert.Equal(t, "", c.PromURL) +} +``` + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `bottleneckConfig` undefined. + +- [ ] **Step 3: Add the config type and field** + +In `tools/loadgen/main.go`, add this type above `type config struct`: + +```go +// bottleneckConfig tunes the max-rps(messages) bottleneck attribution. It is +// additive: when Enabled is false (or PromURL is empty) the run behaves exactly +// as before. +type bottleneckConfig struct { + Enabled bool `env:"ENABLED" envDefault:"true"` + PromURL string `env:"PROM_URL" envDefault:""` + KneeTolerance float64 `env:"KNEE_TOLERANCE" envDefault:"0.10"` + QueryStep time.Duration `env:"QUERY_STEP" envDefault:"5s"` + ContainerMap string `env:"CONTAINER_MAP" envDefault:""` +} +``` + +Add this field to the `config` struct (after `MessageBucketHours`): + +```go + Bottleneck bottleneckConfig `envPrefix:"BOTTLENECK_"` +``` + +- [ ] **Step 4: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add tools/loadgen/main.go tools/loadgen/config_bottleneck_test.go +git commit -m "feat(loadgen): add bottleneck attribution config" +``` + +--- + +## Task 9: Wire the engine into max-rps and the report + +Build the engine after the ramp (messages workload only, when enabled and a step tripped), run `Diagnose`, and thread the verdict into the report (stdout block + CSV trip-row columns). + +**Files:** +- Modify: `tools/loadgen/maxrps.go` +- Modify: `tools/loadgen/maxrps_report.go` +- Test: `tools/loadgen/maxrps_report_test.go` (add a render-with-bottleneck case) + +- [ ] **Step 1: Write the failing test** + +Add to the existing `tools/loadgen/maxrps_report_test.go` (ensure its import block includes `strings`, `testing`, and `testify/assert` + `testify/require`): + +```go +func TestRenderRPSReport_AppendsBottleneck(t *testing.T) { + results := []rpsStepResult{ + {TargetRPS: 1000, Kind: verdictPass}, + {TargetRPS: 2000, Kind: verdictTrip, Reasons: []string{"E2 p95=143ms > 100ms"}}, + } + bn := bottleneckVerdict{Component: "message-worker", Resource: "Cassandra", Confidence: "high", Determined: true, + Reasons: []string{"message-worker consumer backlog grew"}} + var sb strings.Builder + require.NoError(t, renderRPSReportWithBottleneck(&sb, results, "messages", "medium", &bn)) + out := sb.String() + assert.Contains(t, out, "ANSWER: max RPS = 1000") + assert.Contains(t, out, "BOTTLENECK: message-worker (Cassandra-bound)") +} + +func TestRenderRPSReport_NilBottleneckUnchanged(t *testing.T) { + results := []rpsStepResult{{TargetRPS: 1000, Kind: verdictPass}} + var sb strings.Builder + require.NoError(t, renderRPSReportWithBottleneck(&sb, results, "messages", "medium", nil)) + assert.NotContains(t, sb.String(), "BOTTLENECK:") +} +``` + +Ensure the file imports `strings`, `testing`, and the testify packages. + +- [ ] **Step 2: Run it to confirm it fails** + +Run: `make test SERVICE=loadgen` +Expected: FAIL — `renderRPSReportWithBottleneck` undefined. + +- [ ] **Step 3: Refactor the reporter to accept an optional verdict** + +In `tools/loadgen/maxrps_report.go`, rename the body of `renderRPSReport` into a new function and keep a thin wrapper. Replace the existing `func renderRPSReport(...)` signature line: + +```go +func renderRPSReport(w io.Writer, results []rpsStepResult, workload, preset string) error { +``` + +with: + +```go +func renderRPSReport(w io.Writer, results []rpsStepResult, workload, preset string) error { + return renderRPSReportWithBottleneck(w, results, workload, preset, nil) +} + +func renderRPSReportWithBottleneck(w io.Writer, results []rpsStepResult, workload, preset string, bn *bottleneckVerdict) error { +``` + +Then, at the end of `renderRPSReportWithBottleneck`, immediately before the final `return nil`, add: + +```go + if bn != nil { + renderBottleneck(w, *bn) + } +``` + +- [ ] **Step 4: Wire the engine in maxrps.go** + +In `tools/loadgen/maxrps.go`, replace this block: + +```go + if err := renderRPSReport(os.Stdout, results, w.Label(), presetID); err != nil { + slog.Warn("render report", "error", err) + } +``` + +with: + +```go + var bn *bottleneckVerdict + if *workload == "messages" { + if v := diagnoseBottleneck(ctx, cfg, results, thresholds); v != nil { + bn = v + } + } + if err := renderRPSReportWithBottleneck(os.Stdout, results, w.Label(), presetID, bn); err != nil { + slog.Warn("render report", "error", err) + } +``` + +Then add this helper at the bottom of `tools/loadgen/maxrps.go`: + +```go +// diagnoseBottleneck runs the attribution engine for a messages ramp that +// tripped. Returns nil when disabled, unconfigured, or no step tripped — the +// report then prints normally with no BOTTLENECK line. +func diagnoseBottleneck(ctx context.Context, cfg *config, results []rpsStepResult, th rpsThresholds) *bottleneckVerdict { + bc := cfg.Bottleneck + if !bc.Enabled || bc.PromURL == "" { + return nil + } + trip := firstTrip(results) + if trip == nil { + return nil + } + var pass *rpsStepResult + for i := range results { + if results[i].Kind == verdictPass { + pass = &results[i] + } + } + fallback, err := parseContainerMap(bc.ContainerMap) + if err != nil { + slog.Warn("bad BOTTLENECK_CONTAINER_MAP; ignoring", "error", err) + fallback = map[string]string{} + } + eng := newBottleneckEngine(newPromClient(bc.PromURL), identityResolver{fallback: fallback}, bc.KneeTolerance, bc.QueryStep) + v := eng.Diagnose(ctx, trip, pass, messagesStageGraph(), th) + return &v +} +``` + +- [ ] **Step 5: Add the CSV trip-row columns** + +In `tools/loadgen/maxrps.go`, the CSV is written via `writeRPSCSV(f, results)`. Extend it to pass the verdict. Replace `writeRPSCSV(f, results)` with `writeRPSCSV(f, results, bn)`. + +In `tools/loadgen/maxrps_report.go`, change `func writeRPSCSV(w io.Writer, results []rpsStepResult) error {` to: + +```go +func writeRPSCSV(w io.Writer, results []rpsStepResult, bn *bottleneckVerdict) error { +``` + +In its header slice, append the three culprit columns: + +```go + header = append(header, "error_rate", "attempted", "failed", "saturation", "worst_durable", "worst_pending_delta", "verdict", "reasons", "bottleneck_component", "bottleneck_resource", "bottleneck_confidence") +``` + +In the per-row loop, after the existing `row = append(row, ...)` that ends with `strings.Join(r.Reasons, "; ")`, add: + +```go + if bn != nil && r.Kind == verdictTrip { + row = append(row, bottleneckCSVColumns(*bn)...) + } else { + row = append(row, "", "", "") + } +``` + +- [ ] **Step 6: Run tests to confirm pass** + +Run: `make test SERVICE=loadgen` +Expected: PASS. + +- [ ] **Step 7: Lint, then commit** + +Run: `make lint` (fixes/flags formatting and vet issues) +Then: + +```bash +git add tools/loadgen/maxrps.go tools/loadgen/maxrps_report.go tools/loadgen/maxrps_report_test.go +git commit -m "feat(loadgen): wire bottleneck attribution into max-rps report" +``` + +--- + +## Task 10: Deploy wiring + docs + +Add a cAdvisor scrape job to loadgen's deploy Prometheus, bring cAdvisor up in the deploy compose, and document the feature in the README. + +**Files:** +- Modify: `tools/loadgen/deploy/prometheus/prometheus.yml` +- Modify: `tools/loadgen/deploy/docker-compose.yml` +- Modify: `tools/loadgen/README.md` + +- [ ] **Step 1: Read the current deploy compose** + +Run: `cat tools/loadgen/deploy/docker-compose.yml` +Note the existing `prometheus` and `loadgen` service definitions, the network name, and how env is passed to `loadgen`. + +- [ ] **Step 2: Add the cAdvisor scrape job** + +In `tools/loadgen/deploy/prometheus/prometheus.yml`, add under `scrape_configs:`: + +```yaml + - job_name: cadvisor + static_configs: + - targets: ["cadvisor:8080"] +``` + +- [ ] **Step 3: Add cAdvisor to the deploy compose** + +In `tools/loadgen/deploy/docker-compose.yml`, add a `cadvisor` service mirroring the proven `tools/observability` posture (privileged, `cgroup: host`, host mounts), joined to the same network the loadgen overlay uses, and add `BOTTLENECK_PROM_URL=http://prometheus:9090` to the `loadgen` service's environment. Use this service block: + +```yaml + cadvisor: + image: gcr.io/cadvisor/cadvisor:v0.49.1 + privileged: true + cgroup: host + pid: host + devices: + - /dev/kmsg + volumes: + - /:/rootfs:ro + - /var/run:/var/run:ro + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + - /var/run/docker.sock:/var/run/docker.sock:ro + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:8080/healthz"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 5s +``` + +If the `prometheus` service has a `depends_on`, add `cadvisor` to it. + +- [ ] **Step 4: Verify compose parses** + +Run: `docker compose -f tools/loadgen/deploy/docker-compose.yml config >/dev/null && echo OK` +Expected: `OK` (no YAML/compose errors). If Docker is unavailable in the environment, skip and note it. + +- [ ] **Step 5: Document the feature in the README** + +In `tools/loadgen/README.md`, under the `max-rps` section, add a subsection: + +```markdown +### Bottleneck attribution + +When a `max-rps --workload=messages` ramp trips, loadgen appends a +`BOTTLENECK:` block naming the culprit component, the saturated resource, +and a confidence: + +```text +ANSWER: max RPS = 2000 (workload=messages, preset=medium) + Next limit: E2 p95=143ms > 100ms +BOTTLENECK: message-worker (Cassandra-bound) + message-worker consumer backlog grew (first stage to back up) + cassandra CPU plateaued between 1000 and 2000 rps while load rose + confidence: high +``` + +It fuses loadgen's per-stage signals (E1/E2 latency, per-durable backlog) +with cAdvisor container CPU trends from Prometheus. The deploy stack brings +up cAdvisor automatically. Tunables (env, `BOTTLENECK_` prefix): + +| Var | Default | Notes | +|-----|---------|-------| +| `BOTTLENECK_ENABLED` | `true` | Set `false` to disable; run behaves as before. | +| `BOTTLENECK_PROM_URL` | (set in compose) | Prometheus that scrapes cAdvisor. Empty = disabled. | +| `BOTTLENECK_KNEE_TOLERANCE` | `0.10` | Max relative CPU rise still counted as a plateau. | +| `BOTTLENECK_QUERY_STEP` | `5s` | PromQL step; match the scrape interval. | +| `BOTTLENECK_CONTAINER_MAP` | (empty) | `shortid:name,…` fallback when cAdvisor omits the compose-service label. | + +The verdict is best-effort: if Prometheus is unreachable or the data is too +thin (e.g. the breach was on the first step), the line reads +`BOTTLENECK: undetermined ()` and the run still reports normally. +``` + +- [ ] **Step 6: Commit** + +```bash +git add tools/loadgen/deploy/prometheus/prometheus.yml tools/loadgen/deploy/docker-compose.yml tools/loadgen/README.md +git commit -m "feat(loadgen): bring up cAdvisor for bottleneck attribution + docs" +``` + +--- + +## Task 11: Final verification + +- [ ] **Step 1: Full unit suite with race detector** + +Run: `make test SERVICE=loadgen` +Expected: PASS, no data races. + +- [ ] **Step 2: Coverage check (≥80% on new files)** + +Run: +```bash +go test -coverprofile=/tmp/loadgen.cov ./tools/loadgen/... >/dev/null 2>&1; go tool cover -func=/tmp/loadgen.cov | grep -E 'promclient|stagegraph|identity|attribution' +``` +Expected: each new file's functions ≥ 80%. If any are below, add table cases to the corresponding `_test.go` (e.g. malformed Prometheus JSON in `promclient_test.go`, empty graph in `attribution_test.go`) and re-run. + +- [ ] **Step 3: Lint + SAST** + +Run: `make lint && make sast-gosec` +Expected: clean. (No `InsecureSkipVerify`/unsafe conversions were introduced; if gosec flags the cAdvisor host mounts they live in compose YAML, not Go, and are out of gosec's scope.) + +- [ ] **Step 4: Confirm the build** + +Run: `make build SERVICE=loadgen` +Expected: builds successfully. + +- [ ] **Step 5: Push the branch** + +```bash +git push -u origin claude/magical-ramanujan-gA5rC +``` + +--- + +## Self-Review Notes (author) + +- **Spec coverage:** promclient (Task 2) ✓; stage-graph causality (Tasks 3, 6) ✓; identity resolution (Task 4) ✓; knee/plateau saturation given no CPU limits (Task 5) ✓; precedence high→high→medium→low→undetermined (Task 6) ✓; config (Task 8) ✓; additive output + CSV (Tasks 7, 9) ✓; deploy + docs (Task 10) ✓; fake-promclient-only testing, no integration test (Tasks 2, 5, 6) ✓. +- **Type consistency:** `bottleneckVerdict`, `bottleneckEngine`, `newBottleneckEngine`, `promQuerier`, `promSeries`/`promSample`, `identityResolver`, `stage`, `Diagnose`, `cpuCores`, `saturated`, `stageBackingUp`, `renderBottleneck`, `renderRPSReportWithBottleneck`, `diagnoseBottleneck`, `bottleneckConfig` are defined once and referenced consistently across tasks. +- **Memory/network signals:** the spec lists them as corroborating-only for v1; the implementation treats a CPU counter reset as the restart/memory proxy and otherwise leans on the CPU knee. Pure memory-bound-without-restart attribution is explicitly deferred (matches "corroborating only in v1"). diff --git a/docs/superpowers/specs/2026-06-02-loadgen-bottleneck-attribution-design.md b/docs/superpowers/specs/2026-06-02-loadgen-bottleneck-attribution-design.md new file mode 100644 index 000000000..3e612cff9 --- /dev/null +++ b/docs/superpowers/specs/2026-06-02-loadgen-bottleneck-attribution-design.md @@ -0,0 +1,182 @@ +# loadgen Bottleneck Attribution (v1) + +**Date:** 2026-06-02 +**Status:** Design — approved for planning +**Scope:** `tools/loadgen` — messages workload + `max-rps(messages)` only + +## Problem + +When a load run breaches SLO, `loadgen` tells you *that* it broke (latency +p95/p99, error rate, or per-durable backlog growth) but not *which* +component is the bottleneck. Today a human cross-references loadgen's +verdict against the `tools/observability` Grafana dashboard (cAdvisor +container CPU/mem/net) and eyeballs the hot box. We want loadgen to fuse +those two signal sources automatically and name a culprit. + +The raw signals already exist in two places: + +- **loadgen** owns per-stage latency (E1/E2 split), per-durable consumer + backlog deltas (`WorstDurable`/`WorstDelta`), and error classes. +- **cAdvisor** (in `tools/observability`) exposes per-container CPU, memory, + and network for every container — Go services *and* dependencies + (Mongo/Cassandra/Valkey/ES/NATS) — with no per-service instrumentation. + +What's missing is the correlation layer that walks the known event flow and +attributes the breach to a stage and a resource. + +## Goals / Non-goals + +**Goals** +- On the breaching step of a `max-rps --workload=messages` ramp, append a + `BOTTLENECK:` block to the existing verdict naming the culprit + component, the saturated resource, a causal reason, and a confidence. +- Purely additive: when Prometheus is unreachable or data is thin, print + `BOTTLENECK: undetermined ()` and never fail the run. + +**Non-goals (v1)** +- history and members workloads (different stage graphs; history has no + durable backlog signal). They get their own stage graphs later. +- Production use. Inherits the local-dev-only posture of cAdvisor. +- Absolute resource thresholds / cross-machine comparison. + +## Integration point + +The engine fires once, when the `max-rps` ramp identifies its first `TRIP` +step. At that moment loadgen already knows the breach reason, the hold +window `[start, end]`, and the breaching step's `rpsStepResult` plus the +prior steps' results. The engine takes the breach window, queries a +Prometheus that scrapes cAdvisor for per-container trends over the same +window (and the prior step's window, for the knee test), fuses them with +loadgen's owned stage signals, walks the messages stage graph, and appends: + +```text +ANSWER: max RPS = 2000 (workload=messages, preset=medium) + Next limit: E2 p95=143ms > 100ms +BOTTLENECK: message-worker (Cassandra-bound) + message-worker consumer backlog +12k (first stage to back up) + cassandra CPU plateaued at 3.8 cores across steps 1k→2k while load rose + confidence: high +``` + +## Architecture & components + +All new code lives in `tools/loadgen`, following the existing flat +`package main` layout. Each unit has one job: + +- **`promclient.go`** — thin PromQL HTTP client over Resty (already in + `go.mod`; CLAUDE.md mandates Resty for outbound HTTP with a timeout). + One method: `RangeQuery(ctx, query, start, end, step) ([]series, error)`. +- **`stagegraph.go`** — declarative, ordered DAG of the messages pipeline. + Each stage names its component(s), the cAdvisor container identity to + query, and (where applicable) the durable consumer fronting it. Pure + data + helpers, no I/O. The only workload-specific piece. +- **`identity.go`** — resolves a logical stage name → cAdvisor series + selector, preferring the `container_label_com_docker_compose_service` + label, falling back to a configured short-ID→name map. Pure function. +- **`attribution.go`** — the engine. Input: the breaching step's + `rpsStepResult`, the prior steps' results (for the knee comparison), and + a promclient (accepted as a **consumer-defined interface** so unit tests + inject a fake — no real Prometheus in unit tests). Output: + `bottleneckVerdict{Component, Resource, Reasons, Confidence}`. +- **`attribution_report.go`** — formats `bottleneckVerdict` into the + `BOTTLENECK:` block, mirroring `maxrps_report.go`. + +Wiring lives in existing `maxrps.go`: after the ramp finds the breaching +step, call the engine and hand its verdict to the reporter. New config is +parsed via `caarlos0/env` in `main.go` like everything else. + +## Signals & the saturation definition + +No container in the local stack has a CPU quota (only one `mem_limit` +exists), so container CPU% has no per-container denominator. "Saturated" +is therefore defined **relative to the ramp**, not to a limit. + +Per-container series pulled over the breach window and the prior step's +window: + +| Signal | PromQL (cAdvisor) | Bottleneck meaning | +|---|---|---| +| CPU cores | `rate(container_cpu_usage_seconds_total[…])` | **Knee/plateau**: usage stopped rising step-over-step while offered load rose → CPU-bound | +| CPU throttle | `rate(container_cpu_cfs_throttled_seconds_total[…])` | Non-zero only if limits ever added; strong direct signal when present | +| Memory | `container_memory_working_set_bytes` | Climb toward `mem_limit`, or a restart (counter reset) → memory-bound | +| Network | `rate(container_network_*_bytes_total[…])` | Corroborating only in v1 | + +Plus loadgen's owned signals (no Prometheus needed): per-durable backlog +delta, E1/E2 latency split, error class. + +**Knee test (core primitive).** A component is "saturated" if, between the +last `PASS` step and the breaching step, offered RPS rose materially but +its CPU did **not** rise (within `BOTTLENECK_KNEE_TOLERANCE`) — it +flat-lined while being asked to do more. This sidesteps the missing-quota +problem and is exactly what a bottleneck looks like. + +## Attribution algorithm + +1. **Causality walk (primary).** Walk the stage graph in flow order. For + each stage evaluate two predicates: *backing up?* (its durable backlog + delta > 0 or its latency series breached) and *saturated?* (knee test on + its container). The culprit is the **first** stage that is both backing + up **and** saturated → ` (-bound)`. Confidence + **high**. +2. **Single-signal stages.** If a stage backs up but is **not** visibly + saturated (e.g. waiting on Cassandra I/O with low CPU), attribute to its + **downstream dependency** if that dependency *is* saturated + (message-worker backs up + cassandra CPU knee → "message-worker, + Cassandra-bound"). Confidence **high**. If neither the stage nor its + dependency is saturated, name the backing-up stage with resource + `unknown` (likely I/O / lock wait). Confidence **medium**. +3. **Saturation fallback (cAdvisor-led).** If stage signals are ambiguous + (no clear first-backed-up stage), fall back to pure ranking: the + container with the clearest knee / highest absolute core usage in the + breach window wins. Confidence **low**, flagged as "resource-ranking + fallback." +4. **Undetermined.** Prometheus unreachable, too few steps for a knee + (e.g. breach on step 1), or nothing stands out → + `BOTTLENECK: undetermined ()`. Never errors the run. + +## Configuration + +Env via `caarlos0/env`, prefixed, with defaults. `BOTTLENECK_PROM_URL` is +required only when attribution is enabled. + +| Var | Default | Notes | +|---|---|---| +| `BOTTLENECK_ENABLED` | `true` | Master switch; off = today's behavior exactly | +| `BOTTLENECK_PROM_URL` | — | Prometheus that scrapes cAdvisor (e.g. `http://prometheus:9090`) | +| `BOTTLENECK_KNEE_TOLERANCE` | `0.10` | CPU rise below this fraction across the step = plateau | +| `BOTTLENECK_QUERY_STEP` | `5s` | Matches the scrape interval | +| `BOTTLENECK_CONTAINER_MAP` | — | Optional `shortid:name,…` identity fallback | + +Deploy changes: add a cAdvisor scrape job to loadgen's deploy +`prometheus.yml`; update the deploy compose/README so `run-max-rps` brings +up cAdvisor alongside loadgen's Prometheus. + +## Output + +- The `BOTTLENECK:` block (above) appended after the existing + `ANSWER:`/`Next limit:` lines. +- A bottleneck column added to the CSV when `--csv` is set. + +## Error handling + +- Prometheus errors/timeouts → `undetermined`, run still prints its normal + verdict. The engine never returns an error that aborts the run. +- Counter resets (container restart) detected and treated as a + memory-pressure signal, not a negative rate. +- Identity unresolved (no label, no map entry) → that container is reported + by short-ID rather than dropped. + +## Testing (TDD, ≥80% coverage) + +- `attribution_test.go` — table-driven over synthetic `rpsStepResult` + sequences + a fake promclient: CPU-knee on a worker; + DB-bound-with-low-worker-CPU; ambiguous → fallback; too-few-steps → + undetermined; prom-unreachable → undetermined. +- `stagegraph_test.go` / `identity_test.go` — pure-function tables (label + present vs short-ID fallback). +- `attribution_report_test.go` — golden-string formatting incl. + undetermined. + +No integration test in v1: the fake promclient (injected via the +consumer-defined interface) covers the engine end-to-end at unit scope, +and there is no store/container dependency to exercise. diff --git a/tools/loadgen/README.md b/tools/loadgen/README.md index a86c2b277..a6bf1e9fb 100644 --- a/tools/loadgen/README.md +++ b/tools/loadgen/README.md @@ -265,7 +265,7 @@ make -C tools/loadgen/deploy run-max-rps WORKLOAD=history PRESET=history-medium At the end of the run the tool prints a per-step table and a final verdict line: -``` +```text ANSWER: max RPS = 2000 (workload=messages, preset=medium) Next limit: E2 p95=143ms > 100ms ``` @@ -281,6 +281,37 @@ the limiting factor, so the step's result can't be trusted. An INCONCLUSIVE step does **not** count as a pass and does **not** stop the ramp, even with `--stop-on-trip`; only a hard TRIP stops the ramp. +### Bottleneck attribution + +When a `max-rps --workload=messages` ramp trips, loadgen appends a +`BOTTLENECK:` block naming the culprit component, the saturated resource, +and a confidence: + +```text +ANSWER: max RPS = 2000 (workload=messages, preset=medium) + Next limit: E2 p95=143ms > 100ms +BOTTLENECK: message-worker (Cassandra-bound) + message-worker consumer backlog grew (first stage to back up) + cassandra CPU plateaued between 1000 and 2000 rps while load rose + confidence: high +``` + +It fuses loadgen's per-stage signals (E1/E2 latency, per-durable backlog) +with cAdvisor container CPU trends from Prometheus. `make run-max-rps` +starts cAdvisor + Prometheus for you (no need to run `make run-dashboards` +first). Tunables (env, `BOTTLENECK_` prefix): + +| Var | Default | Notes | +|-----|---------|-------| +| `BOTTLENECK_ENABLED` | `true` | Set `false` to disable; run behaves as before. | +| `BOTTLENECK_PROM_URL` | (set in compose) | Prometheus that scrapes cAdvisor. Empty = disabled. | +| `BOTTLENECK_KNEE_TOLERANCE` | `0.10` | Max relative CPU rise still counted as a plateau. | +| `BOTTLENECK_QUERY_STEP` | `5s` | PromQL step; match the scrape interval. | +| `BOTTLENECK_CONTAINER_MAP` | (empty) | `shortid:name,…` fallback when cAdvisor omits the compose-service label. | + +The verdict is best-effort: if Prometheus is unreachable or the data is too +thin (e.g. the breach was on the first step), the line reads +`BOTTLENECK: undetermined ()` and the run still reports normally. ## Daily-IM scenario (find N) — Operator Guide Simulates N users using the chat system as their primary IM throughout diff --git a/tools/loadgen/attribution.go b/tools/loadgen/attribution.go new file mode 100644 index 000000000..46e139aef --- /dev/null +++ b/tools/loadgen/attribution.go @@ -0,0 +1,255 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "time" +) + +const ( + // cpuSaturatedFloorCores is the absolute CPU bar for blaming a container's + // own CPU. Below this, a plateaued container is treated as NOT CPU-bound + // (it is idle, or low-and-flat because it is waiting on a dependency / I/O), + // so attribution falls through to the dependency or to "unknown". Roughly + // one fully-utilised core; a host-relative heuristic since no CPU limits + // are set on the local stack. + cpuSaturatedFloorCores = 1.0 +) + +// promQuerier is the consumer-defined seam over Prometheus, so unit tests can +// inject a fake without a live server. +type promQuerier interface { + RangeQuery(ctx context.Context, query string, start, end time.Time, step time.Duration) ([]promSeries, error) +} + +// bottleneckVerdict is the engine's output, rendered as the BOTTLENECK: block. +type bottleneckVerdict struct { + Component string // culprit component, "" when undetermined + Resource string // "CPU", a dependency display name, or "unknown" + Confidence string // "high" | "medium" | "low" + Reasons []string // human-readable causal lines + Determined bool // false -> render "undetermined ()" +} + +// bottleneckEngine fuses loadgen signals with cAdvisor CPU trends. +type bottleneckEngine struct { + q promQuerier + ident identityResolver + knee float64 // max relative CPU rise still counted as a plateau + step time.Duration // PromQL query step +} + +// newBottleneckEngine builds an engine over a promQuerier and identity resolver. +// knee is the max relative CPU rise still treated as a plateau; step is the +// PromQL query step. +func newBottleneckEngine(q promQuerier, ident identityResolver, knee float64, step time.Duration) *bottleneckEngine { + return &bottleneckEngine{q: q, ident: ident, knee: knee, step: step} +} + +// saturated reports whether service is CPU-bound at the breach: its trip-window +// CPU is above the saturation floor AND plateaued (rose by less than the knee +// fraction) even though offered RPS rose. A counter reset also counts (restart +// under pressure). A container below the floor is NOT blamed — it is idle or +// low-and-flat because it is waiting on something downstream. dataOK=false means +// the measurement itself failed (e.g. Prometheus unreachable). +func (e *bottleneckEngine) saturated(ctx context.Context, service string, pass, trip *rpsStepResult) (sat, dataOK bool, tripCores float64) { + tripCores, reset, okT := e.cpuCores(ctx, service, trip.HoldStart, trip.HoldEnd) + if !okT { + return false, false, 0 + } + if reset { + return true, true, tripCores + } + if tripCores < cpuSaturatedFloorCores { + return false, true, tripCores + } + passCores, _, okP := e.cpuCores(ctx, service, pass.HoldStart, pass.HoldEnd) + if !okP || passCores <= 0 { + // No usable baseline (pass-window query failed or read zero), but + // trip-window usage is already above the saturation floor -> treat as + // saturated. Caveat: without a baseline we can't confirm a plateau, so a + // busy-but-still-scaling container could be over-blamed here. Rare: it + // needs the pass query to fail while the trip query succeeds against the + // same Prometheus. + return true, true, tripCores + } + rise := (tripCores - passCores) / passCores + return rise < e.knee, true, tripCores +} + +// stageBackingUp reports whether a stage is accumulating backlog or breaching +// its latency SLO at the tripping step. +func stageBackingUp(st *stage, trip *rpsStepResult, th rpsThresholds) bool { + if st.Durable != "" { + for _, p := range trip.Pending { + if p.Durable == st.Durable && p.Delta() > 0 { + return true + } + } + } + if st.LatencySeries != "" { + for _, sp := range trip.Latencies { + if sp.Name == st.LatencySeries && (sp.Pct.P95 > th.P95 || sp.Pct.P99 > th.P99) { + return true + } + } + } + return false +} + +// Diagnose applies the attribution precedence and returns a verdict. It never +// returns an error; measurement gaps degrade to a lower confidence or to +// undetermined. Precedence: high (stage CPU) -> high (dependency CPU) -> +// medium (backs up, no knee) -> low (resource-ranking fallback) -> undetermined. +func (e *bottleneckEngine) Diagnose(ctx context.Context, trip, pass *rpsStepResult, graph []stage, th rpsThresholds) bottleneckVerdict { + if pass == nil { + return bottleneckVerdict{Reasons: []string{"no passing step before breach; cannot compute CPU knee"}} + } + + // Evaluate each stage once: is it backing up, and (if so) is its own + // container CPU-saturated? sawData tracks whether ANY CPU query returned + // usable data — if every query failed, Prometheus is effectively + // unreachable and we must not emit a resource verdict. + type stageEval struct { + st stage + backingUp bool + satStage bool + } + evals := make([]stageEval, 0, len(graph)) + sawData := false + for _, st := range graph { + ev := stageEval{st: st, backingUp: stageBackingUp(&st, trip, th)} + if ev.backingUp { + sat, ok, _ := e.saturated(ctx, st.Container, pass, trip) + ev.satStage = sat + sawData = sawData || ok + } + evals = append(evals, ev) + } + + // Pass 1: first backing-up stage whose own CPU is saturated -> high. + for _, ev := range evals { + if ev.backingUp && ev.satStage { + return bottleneckVerdict{ + Component: ev.st.Name, Resource: "CPU", Confidence: "high", Determined: true, + Reasons: []string{ + fmt.Sprintf("%s is the first stage to back up", ev.st.Name), + fmt.Sprintf("%s CPU plateaued between %d and %d rps while load rose", ev.st.Container, pass.TargetRPS, trip.TargetRPS), + }, + } + } + } + + // Pass 2: first backing-up stage whose backing dependency is saturated -> high. + for _, ev := range evals { + if !ev.backingUp { + continue + } + for _, dep := range ev.st.DependsOn { + sat, ok, _ := e.saturated(ctx, dep, pass, trip) + sawData = sawData || ok + if sat { + return bottleneckVerdict{ + Component: ev.st.Name, Resource: dependencyDisplayName(dep), Confidence: "high", Determined: true, + Reasons: []string{ + fmt.Sprintf("%s consumer backlog grew (first stage to back up)", ev.st.Name), + fmt.Sprintf("%s CPU plateaued between %d and %d rps while load rose", dep, pass.TargetRPS, trip.TargetRPS), + }, + } + } + } + } + + // Pass 3: a stage backs up but nothing is saturated. If we had resource + // data, that points to I/O or lock wait (medium); if every CPU query + // failed, we cannot attribute at all (undetermined). + for _, ev := range evals { + if ev.backingUp { + if !sawData { + return bottleneckVerdict{Reasons: []string{"prometheus unreachable; cannot attribute the breach"}} + } + return bottleneckVerdict{ + Component: ev.st.Name, Resource: "unknown", Confidence: "medium", Determined: true, + Reasons: []string{fmt.Sprintf("%s backs up but no resource knee found — likely I/O or lock wait", ev.st.Name)}, + } + } + } + + // Pass 4: nothing backed up -> rank containers by clearest saturation -> low. + if v, ok := e.fallbackRanking(ctx, pass, trip, graph); ok { + return v + } + + // Pass 5: nothing stands out. + return bottleneckVerdict{Reasons: []string{"no stage backed up and no container saturated in the breach window"}} +} + +// fallbackRanking picks the saturated container with the highest trip-window +// cores across all stages and their dependencies. Confidence low. +func (e *bottleneckEngine) fallbackRanking(ctx context.Context, pass, trip *rpsStepResult, graph []stage) (bottleneckVerdict, bool) { + seen := map[string]bool{} + var best string + var bestCores float64 + consider := func(svc string) { + if seen[svc] { + return + } + seen[svc] = true + sat, ok, cores := e.saturated(ctx, svc, pass, trip) + if !ok || !sat { + return + } + if cores > bestCores { + bestCores, best = cores, svc + } + } + for _, st := range graph { + consider(st.Container) + for _, dep := range st.DependsOn { + consider(dep) + } + } + if best == "" { + return bottleneckVerdict{}, false + } + return bottleneckVerdict{ + Component: dependencyDisplayName(best), Resource: "CPU", Confidence: "low", Determined: true, + Reasons: []string{fmt.Sprintf("resource-ranking fallback: %s had the clearest CPU plateau (%.1f cores)", best, bestCores)}, + }, true +} + +// cpuCores returns mean cores used by service over [start,end], derived from +// the CPU usage counter. reset=true when the counter dropped (container +// restart) — callers treat that as a memory/restart signal, not a CPU rate. +func (e *bottleneckEngine) cpuCores(ctx context.Context, service string, start, end time.Time) (cores float64, reset bool, ok bool) { + query := fmt.Sprintf(`container_cpu_usage_seconds_total{%s}`, e.ident.selector(service)) + series, err := e.q.RangeQuery(ctx, query, start, end, e.step) + if err != nil { + slog.Warn("cpu query failed", "service", service, "error", err) + return 0, false, false + } + // Sum across any matching cgroup series (cAdvisor may emit several). + var first, last float64 + var t0, t1 time.Time + var have bool + for _, s := range series { + if len(s.Samples) < 2 { + continue + } + first += s.Samples[0].V + last += s.Samples[len(s.Samples)-1].V + // cAdvisor emits its series aligned to the same scrape window, so the + // timestamps from any one series suffice as the shared [t0,t1] divisor. + t0 = s.Samples[0].T + t1 = s.Samples[len(s.Samples)-1].T + have = true + } + if !have || !t1.After(t0) { + return 0, false, false + } + if last < first { + return 0, true, true + } + return (last - first) / t1.Sub(t0).Seconds(), false, true +} diff --git a/tools/loadgen/attribution_report.go b/tools/loadgen/attribution_report.go new file mode 100644 index 000000000..b8c110a59 --- /dev/null +++ b/tools/loadgen/attribution_report.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "io" + "strings" +) + +// renderBottleneck writes the BOTTLENECK: block. For an undetermined verdict it +// writes a single line naming why; otherwise it names the culprit, the causal +// reasons, and the confidence. +func renderBottleneck(w io.Writer, v *bottleneckVerdict) { + if !v.Determined { + reason := "no signal" + if len(v.Reasons) > 0 { + reason = v.Reasons[0] + } + fmt.Fprintf(w, "BOTTLENECK: undetermined (%s)\n", reason) + return + } + fmt.Fprintf(w, "BOTTLENECK: %s (%s-bound)\n", v.Component, v.Resource) + for _, r := range v.Reasons { + fmt.Fprintf(w, " %s\n", r) + } + fmt.Fprintf(w, " confidence: %s\n", v.Confidence) +} + +// bottleneckCSVColumns returns the trip-row culprit columns appended to the CSV. +func bottleneckCSVColumns(v *bottleneckVerdict) []string { + if !v.Determined { + return []string{"undetermined", "", ""} + } + return []string{v.Component, strings.ToLower(v.Resource), v.Confidence} +} diff --git a/tools/loadgen/attribution_report_test.go b/tools/loadgen/attribution_report_test.go new file mode 100644 index 000000000..433e0627c --- /dev/null +++ b/tools/loadgen/attribution_report_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRenderBottleneck_Determined(t *testing.T) { + var sb strings.Builder + v := bottleneckVerdict{ + Component: "message-worker", Resource: "Cassandra", Confidence: "high", Determined: true, + Reasons: []string{"message-worker consumer backlog grew", "cassandra CPU plateaued"}, + } + renderBottleneck(&sb, &v) + out := sb.String() + assert.Contains(t, out, "BOTTLENECK: message-worker (Cassandra-bound)") + assert.Contains(t, out, "message-worker consumer backlog grew") + assert.Contains(t, out, "confidence: high") +} + +func TestRenderBottleneck_Undetermined(t *testing.T) { + var sb strings.Builder + v := bottleneckVerdict{Reasons: []string{"prometheus unreachable"}} + renderBottleneck(&sb, &v) + assert.Contains(t, sb.String(), "BOTTLENECK: undetermined (prometheus unreachable)") +} + +func TestRenderBottleneck_UndeterminedNoReasons(t *testing.T) { + var sb strings.Builder + v := bottleneckVerdict{} // Determined=false, empty Reasons + renderBottleneck(&sb, &v) + assert.Contains(t, sb.String(), "BOTTLENECK: undetermined (no signal)") +} + +func TestBottleneckCSVColumns(t *testing.T) { + det := bottleneckCSVColumns(&bottleneckVerdict{Component: "message-worker", Resource: "Cassandra", Confidence: "high", Determined: true}) + assert.Equal(t, []string{"message-worker", "cassandra", "high"}, det) + + und := bottleneckCSVColumns(&bottleneckVerdict{Determined: false}) + assert.Equal(t, []string{"undetermined", "", ""}, und) +} diff --git a/tools/loadgen/attribution_test.go b/tools/loadgen/attribution_test.go new file mode 100644 index 000000000..aa9d974a0 --- /dev/null +++ b/tools/loadgen/attribution_test.go @@ -0,0 +1,213 @@ +package main + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeProm returns canned counter samples per service name. The key is matched +// as a substring of the query (the query embeds the service selector). +type fakeProm struct { + // fn maps a query + window to samples; nil samples => empty result. + fn func(query string, start, end time.Time) []promSample + err error +} + +func (f fakeProm) RangeQuery(_ context.Context, query string, start, end time.Time, _ time.Duration) ([]promSeries, error) { + if f.err != nil { + return nil, f.err + } + samples := f.fn(query, start, end) + if samples == nil { + return nil, nil + } + return []promSeries{{Labels: map[string]string{}, Samples: samples}}, nil +} + +func counterSamples(start time.Time, startVal, cores float64, windowSec int) []promSample { + // Linear counter: startVal at t0, rising `cores` per second for windowSec. + return []promSample{ + {T: start, V: startVal}, + {T: start.Add(time.Duration(windowSec) * time.Second), V: startVal + cores*float64(windowSec)}, + } +} + +func TestEngine_cpuCores(t *testing.T) { + start := time.Unix(1000, 0) + q := fakeProm{fn: func(_ string, s, _ time.Time) []promSample { + return counterSamples(s, 100, 2.5, 30) // 2.5 cores over 30s + }} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + cores, reset, ok := eng.cpuCores(context.Background(), "message-worker", start, start.Add(30*time.Second)) + require.True(t, ok) + assert.False(t, reset) + assert.InDelta(t, 2.5, cores, 0.001) +} + +func TestEngine_cpuCores_CounterReset(t *testing.T) { + start := time.Unix(1000, 0) + q := fakeProm{fn: func(_ string, s, _ time.Time) []promSample { + return []promSample{{T: s, V: 500}, {T: s.Add(30 * time.Second), V: 3}} // dropped -> restart + }} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + _, reset, ok := eng.cpuCores(context.Background(), "x", start, start.Add(30*time.Second)) + require.True(t, ok) + assert.True(t, reset) +} + +func TestEngine_cpuCores_QueryError(t *testing.T) { + q := fakeProm{err: assertErr{}} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + _, _, ok := eng.cpuCores(context.Background(), "x", time.Unix(0, 0), time.Unix(30, 0)) + assert.False(t, ok) +} + +func TestEngine_cpuCores_EmptyResult(t *testing.T) { + q := fakeProm{fn: func(_ string, _, _ time.Time) []promSample { return nil }} + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + _, _, ok := eng.cpuCores(context.Background(), "x", time.Unix(0, 0), time.Unix(30, 0)) + assert.False(t, ok) +} + +type assertErr struct{} + +func (assertErr) Error() string { return "prom down" } + +func tripResult(window time.Time) *rpsStepResult { + return &rpsStepResult{ + TargetRPS: 2000, + HoldStart: window, HoldEnd: window.Add(30 * time.Second), + Latencies: []seriesPercentile{ + {Name: "E1", Pct: Percentiles{P95: 20 * time.Millisecond}}, + {Name: "E2", Pct: Percentiles{P95: 200 * time.Millisecond}}, + }, + Pending: []consumerPendingDelta{ + {Durable: "message-worker", Start: 0, End: 12000}, + {Durable: "broadcast-worker", Start: 0, End: 0}, + }, + } +} + +func passResult(window time.Time) *rpsStepResult { + return &rpsStepResult{ + TargetRPS: 1000, + HoldStart: window, HoldEnd: window.Add(30 * time.Second), + } +} + +var slo = buildThresholds(100*time.Millisecond, 250*time.Millisecond, 0.001, 1000, 0.05) + +// stageProm returns per-service cores keyed by service, with a plateau for +// services in `plateau` (same cores in both windows) and growth otherwise. +// Caller contract: plateau keys must be disjoint — no key may be a substring +// of another — because the helper matches the first key contained in the query +// and map iteration order is unspecified. +func stageProm(passT, tripT time.Time, plateau map[string]float64) fakeProm { + return fakeProm{fn: func(query string, s, _ time.Time) []promSample { + for svc, cores := range plateau { + if strings.Contains(query, svc) { + return counterSamples(s, 0, cores, 30) // same cores both windows -> plateau + } + } + // non-plateau services: grow a lot from pass to trip window + base := 0.2 + if s.Equal(tripT) { + base = 2.0 + } + return counterSamples(s, 0, base, 30) + }} +} + +func TestEngine_DependencyBound(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // message-worker backs up; cassandra CPU plateaus -> Cassandra-bound, high. + q := stageProm(passT, tripT, map[string]float64{"cassandra": 3.8, "message-worker": 0.4}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(tripT), passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-worker", v.Component) + assert.Equal(t, "Cassandra", v.Resource) + assert.Equal(t, "high", v.Confidence) +} + +func TestEngine_StageCPUBound(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // E1 (gatekeeper) breaches and gatekeeper CPU plateaus -> CPU-bound, high. + trip := tripResult(tripT) + trip.Latencies[0].Pct.P95 = 150 * time.Millisecond // E1 over SLO + trip.Pending[0].End = 0 // no worker backlog + q := stageProm(passT, tripT, map[string]float64{"message-gatekeeper": 4.0}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), trip, passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-gatekeeper", v.Component) + assert.Equal(t, "CPU", v.Resource) + assert.Equal(t, "high", v.Confidence) +} + +func TestEngine_BacksUpNoKnee_Medium(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + // worker backs up, but nothing plateaus (all CPU still rising) -> medium/unknown. + q := stageProm(passT, tripT, nil) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(tripT), passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "message-worker", v.Component) + assert.Equal(t, "unknown", v.Resource) + assert.Equal(t, "medium", v.Confidence) +} + +func TestEngine_NoBackup_FallbackRanking_Low(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + trip := tripResult(tripT) + trip.Pending[0].End = 0 // nothing backs up, no latency breach + trip.Latencies[1].Pct.P95 = 50 * time.Millisecond // E2 under SLO: broadcast-worker not backing up + // cassandra has the clearest plateau at the highest cores -> low-confidence pick. + q := stageProm(passT, tripT, map[string]float64{"cassandra": 3.8}) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), trip, passResult(passT), messagesStageGraph(), slo) + require.True(t, v.Determined) + assert.Equal(t, "Cassandra", v.Component) + assert.Equal(t, "low", v.Confidence) +} + +func TestEngine_NoPassStep_Undetermined(t *testing.T) { + eng := newBottleneckEngine(stageProm(time.Unix(1, 0), time.Unix(2, 0), nil), identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(time.Unix(2000, 0)), nil, messagesStageGraph(), slo) + assert.False(t, v.Determined) + assert.Contains(t, v.Reasons[0], "no passing step") +} + +func TestEngine_PromError_Undetermined(t *testing.T) { + eng := newBottleneckEngine(fakeProm{err: assertErr{}}, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), tripResult(time.Unix(2000, 0)), passResult(time.Unix(1000, 0)), messagesStageGraph(), slo) + assert.False(t, v.Determined) +} + +func TestEngine_AllClear_Undetermined(t *testing.T) { + passT, tripT := time.Unix(1000, 0), time.Unix(2000, 0) + trip := &rpsStepResult{ + TargetRPS: 2000, + HoldStart: tripT, HoldEnd: tripT.Add(30 * time.Second), + // no latency breaches, no backlog growth + Latencies: []seriesPercentile{ + {Name: "E1", Pct: Percentiles{P95: 10 * time.Millisecond}}, + {Name: "E2", Pct: Percentiles{P95: 20 * time.Millisecond}}, + }, + Pending: []consumerPendingDelta{ + {Durable: "message-worker", Start: 0, End: 0}, + {Durable: "broadcast-worker", Start: 0, End: 0}, + }, + } + // nil plateau -> every container's CPU grows (rise >> knee) -> nothing saturated. + q := stageProm(passT, tripT, nil) + eng := newBottleneckEngine(q, identityResolver{}, 0.10, 5*time.Second) + v := eng.Diagnose(context.Background(), trip, passResult(passT), messagesStageGraph(), slo) + assert.False(t, v.Determined) + assert.Contains(t, v.Reasons[0], "no stage backed up") +} diff --git a/tools/loadgen/config_bottleneck_test.go b/tools/loadgen/config_bottleneck_test.go new file mode 100644 index 000000000..44b8bbe21 --- /dev/null +++ b/tools/loadgen/config_bottleneck_test.go @@ -0,0 +1,14 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBottleneckConfig_Defaults(t *testing.T) { + var c bottleneckConfig + // zero value should be safe; the wiring treats Enabled=false as off. + assert.False(t, c.Enabled) + assert.Equal(t, "", c.PromURL) +} diff --git a/tools/loadgen/deploy/Makefile b/tools/loadgen/deploy/Makefile index 68e6a31ac..4bec27393 100644 --- a/tools/loadgen/deploy/Makefile +++ b/tools/loadgen/deploy/Makefile @@ -77,6 +77,7 @@ run-dashboards: run-max-rps: ## Ramp RPS to find the max under SLO (WORKLOAD=messages|history PRESET=.. STEPS=..) @test -n "$(PRESET)" || (echo "PRESET= required" && exit 1) + $(COMPOSE) --profile dashboards up -d cadvisor prometheus $(COMPOSE) exec -T loadgen /loadgen max-rps \ --workload=$(WORKLOAD) \ --preset=$(PRESET) \ diff --git a/tools/loadgen/deploy/docker-compose.yml b/tools/loadgen/deploy/docker-compose.yml index c4fe74f18..21c1eeb46 100644 --- a/tools/loadgen/deploy/docker-compose.yml +++ b/tools/loadgen/deploy/docker-compose.yml @@ -28,6 +28,7 @@ services: # the port. Off by default so the metrics endpoint doesn't expose # profiling. # - PPROF_ADDR=:6060 + - BOTTLENECK_PROM_URL=http://prometheus:9090 volumes: - ../../../docker-local/backend.creds:/etc/nats/backend.creds:ro ports: @@ -36,9 +37,35 @@ services: networks: - chat-local + cadvisor: + image: gcr.io/cadvisor/cadvisor:v0.49.1 + profiles: [dashboards] + privileged: true + cgroup: host + pid: host + devices: + - /dev/kmsg + volumes: + - /:/rootfs:ro + - /var/run:/var/run:ro + - /sys:/sys:ro + - /var/lib/docker/:/var/lib/docker:ro + - /var/run/docker.sock:/var/run/docker.sock:ro + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:8080/healthz"] + interval: 5s + timeout: 3s + retries: 10 + start_period: 5s + networks: + - chat-local + prometheus: image: prom/prometheus:v2.55.0 profiles: [dashboards] + depends_on: + cadvisor: + condition: service_healthy volumes: - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro ports: diff --git a/tools/loadgen/deploy/prometheus/prometheus.yml b/tools/loadgen/deploy/prometheus/prometheus.yml index 9c7a81809..dbd373596 100644 --- a/tools/loadgen/deploy/prometheus/prometheus.yml +++ b/tools/loadgen/deploy/prometheus/prometheus.yml @@ -6,5 +6,8 @@ scrape_configs: - job_name: loadgen static_configs: - targets: ["loadgen:9099"] + - job_name: cadvisor + static_configs: + - targets: ["cadvisor:8080"] # NATS monitoring on :8222 serves JSON (/varz, /jsz) — not Prometheus. # Add prometheus-nats-exporter as a sidecar if NATS metrics are needed. diff --git a/tools/loadgen/identity.go b/tools/loadgen/identity.go new file mode 100644 index 000000000..02e85aef6 --- /dev/null +++ b/tools/loadgen/identity.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "strings" +) + +// identityResolver maps a logical service name to a cAdvisor PromQL label +// selector. The fallback map (name -> 12-char container short-ID) is used on +// hosts where cAdvisor cannot populate the compose-service label. +type identityResolver struct { + fallback map[string]string +} + +// parseContainerMap parses "shortid:name,shortid2:name2" into a name->shortid +// map. An empty string yields an empty map. +func parseContainerMap(s string) (map[string]string, error) { + out := map[string]string{} + if strings.TrimSpace(s) == "" { + return out, nil + } + for _, pair := range strings.Split(s, ",") { + id, name, ok := strings.Cut(strings.TrimSpace(pair), ":") + if !ok || id == "" || name == "" { + return nil, fmt.Errorf("bad container-map entry %q (want shortid:name)", pair) + } + out[name] = id + } + return out, nil +} + +// selector returns the inner PromQL label matcher (no metric name, no braces) +// that identifies the given service's container. +func (r identityResolver) selector(service string) string { + if id, ok := r.fallback[service]; ok { + return fmt.Sprintf(`id=~".*%s.*"`, id) + } + return fmt.Sprintf(`container_label_com_docker_compose_service=%q`, service) +} diff --git a/tools/loadgen/identity_test.go b/tools/loadgen/identity_test.go new file mode 100644 index 000000000..789374820 --- /dev/null +++ b/tools/loadgen/identity_test.go @@ -0,0 +1,33 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseContainerMap(t *testing.T) { + m, err := parseContainerMap("0a1b2c3d4e5f:cassandra,deadbeef0000:mongo") + require.NoError(t, err) + assert.Equal(t, "0a1b2c3d4e5f", m["cassandra"]) + assert.Equal(t, "deadbeef0000", m["mongo"]) +} + +func TestParseContainerMap_Empty(t *testing.T) { + m, err := parseContainerMap("") + require.NoError(t, err) + assert.NotNil(t, m) // empty string must yield a non-nil (usable) map + assert.Empty(t, m) +} + +func TestParseContainerMap_Malformed(t *testing.T) { + _, err := parseContainerMap("noseparator") + require.Error(t, err) +} + +func TestIdentityResolver_Selector(t *testing.T) { + r := identityResolver{fallback: map[string]string{"cassandra": "0a1b2c3d4e5f"}} + assert.Equal(t, `container_label_com_docker_compose_service="message-worker"`, r.selector("message-worker")) + assert.Equal(t, `id=~".*0a1b2c3d4e5f.*"`, r.selector("cassandra")) +} diff --git a/tools/loadgen/main.go b/tools/loadgen/main.go index 9204b9c0b..1525b04e8 100644 --- a/tools/loadgen/main.go +++ b/tools/loadgen/main.go @@ -30,6 +30,17 @@ import ( "github.com/hmchangw/chat/pkg/subject" ) +// bottleneckConfig tunes the max-rps(messages) bottleneck attribution. It is +// additive: when Enabled is false (or PromURL is empty) the run behaves exactly +// as before. +type bottleneckConfig struct { + Enabled bool `env:"ENABLED" envDefault:"true"` + PromURL string `env:"PROM_URL" envDefault:""` + KneeTolerance float64 `env:"KNEE_TOLERANCE" envDefault:"0.10"` + QueryStep time.Duration `env:"QUERY_STEP" envDefault:"5s"` + ContainerMap string `env:"CONTAINER_MAP" envDefault:""` +} + type config struct { NatsURL string `env:"NATS_URL,required"` NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""` @@ -56,7 +67,8 @@ type config struct { // JetStream consumer pending counts. Defaults to the docker-compose // service name. Override (e.g. `http://127.0.0.1:8222/jsz` on the host, // or a custom monitoring port) when running against non-default infra. - NatsMonitoringURL string `env:"NATS_MONITORING_URL" envDefault:"http://nats:8222/jsz"` + NatsMonitoringURL string `env:"NATS_MONITORING_URL" envDefault:"http://nats:8222/jsz"` + Bottleneck bottleneckConfig `envPrefix:"BOTTLENECK_"` } func main() { diff --git a/tools/loadgen/maxrps.go b/tools/loadgen/maxrps.go index f60ff7a3f..44b59172d 100644 --- a/tools/loadgen/maxrps.go +++ b/tools/loadgen/maxrps.go @@ -133,7 +133,8 @@ func runMaxRPS(ctx context.Context, cfg *config, args []string) int { Thresholds: thresholds, StopOnTrip: *stopOnTrip, }) - if err := renderRPSReport(os.Stdout, results, w.Label(), presetID); err != nil { + bn := diagnoseBottleneck(ctx, cfg, *workload, results, thresholds) + if err := renderRPSReportWithBottleneck(os.Stdout, results, w.Label(), presetID, bn); err != nil { slog.Warn("render report", "error", err) } if *csvPath != "" { @@ -141,7 +142,7 @@ func runMaxRPS(ctx context.Context, cfg *config, args []string) int { if err != nil { slog.Error("create csv", "error", err) } else { - if err := writeRPSCSV(f, results); err != nil { + if err := writeRPSCSV(f, results, bn); err != nil { slog.Error("write csv", "error", err) } _ = f.Close() @@ -149,3 +150,32 @@ func runMaxRPS(ctx context.Context, cfg *config, args []string) int { } return maxRPSExitCode(results) } + +// diagnoseBottleneck runs the attribution engine for a messages ramp that +// tripped. Returns nil when disabled, unconfigured, or no step tripped — the +// report then prints normally with no BOTTLENECK line. +func diagnoseBottleneck(ctx context.Context, cfg *config, workload string, results []rpsStepResult, th rpsThresholds) *bottleneckVerdict { + bc := cfg.Bottleneck + // Attribution is messages-only for v1; other workloads have no stage graph. + if workload != "messages" || !bc.Enabled || bc.PromURL == "" { + return nil + } + trip := firstTrip(results) + if trip == nil { + return nil + } + var pass *rpsStepResult + for i := range results { + if results[i].Kind == verdictPass { + pass = &results[i] + } + } + fallback, err := parseContainerMap(bc.ContainerMap) + if err != nil { + slog.Warn("bad BOTTLENECK_CONTAINER_MAP; ignoring", "value", bc.ContainerMap, "error", err) + fallback = map[string]string{} + } + eng := newBottleneckEngine(newPromClient(bc.PromURL), identityResolver{fallback: fallback}, bc.KneeTolerance, bc.QueryStep) + v := eng.Diagnose(ctx, trip, pass, messagesStageGraph(), th) + return &v +} diff --git a/tools/loadgen/maxrps_report.go b/tools/loadgen/maxrps_report.go index fdacb503a..d775e074f 100644 --- a/tools/loadgen/maxrps_report.go +++ b/tools/loadgen/maxrps_report.go @@ -56,8 +56,14 @@ func pctFor(r *rpsStepResult, name string) Percentiles { return Percentiles{} } -// renderRPSReport writes the per-step table and the ANSWER line. +// renderRPSReport delegates to renderRPSReportWithBottleneck with no bottleneck block. func renderRPSReport(w io.Writer, results []rpsStepResult, workload, preset string) error { + return renderRPSReportWithBottleneck(w, results, workload, preset, nil) +} + +// renderRPSReportWithBottleneck writes the per-step table and ANSWER line, then +// appends the BOTTLENECK block when bn is non-nil. renderRPSReport delegates here. +func renderRPSReportWithBottleneck(w io.Writer, results []rpsStepResult, workload, preset string, bn *bottleneckVerdict) error { fmt.Fprintf(w, "=== loadgen max-rps complete (workload=%s, preset=%s) ===\n\n", workload, preset) names := seriesNames(results) @@ -97,12 +103,16 @@ func renderRPSReport(w io.Writer, results []rpsStepResult, workload, preset stri if trip := firstTrip(results); trip != nil { fmt.Fprintf(w, " Next limit: %s\n", strings.Join(trip.Reasons, "; ")) } + if bn != nil { + renderBottleneck(w, bn) + } return nil } // writeRPSCSV writes one row per step. Series percentile columns are emitted in -// the union order of series names across all steps. -func writeRPSCSV(w io.Writer, results []rpsStepResult) error { +// the union order of series names across all steps. When bn is non-nil the three +// bottleneck columns are appended to the trip row; pass rows get empty cells. +func writeRPSCSV(w io.Writer, results []rpsStepResult, bn *bottleneckVerdict) error { cw := csv.NewWriter(w) names := seriesNames(results) @@ -110,7 +120,12 @@ func writeRPSCSV(w io.Writer, results []rpsStepResult) error { for _, n := range names { header = append(header, n+"_p95_ms", n+"_p99_ms") } - header = append(header, "error_rate", "attempted", "failed", "saturation", "worst_durable", "worst_pending_delta", "verdict", "reasons") + header = append(header, + "error_rate", "attempted", "failed", + "saturation", "worst_durable", "worst_pending_delta", "verdict", "reasons", + // bottleneck attribution columns (nil when bottleneck detection is disabled) + "bottleneck_component", "bottleneck_resource", "bottleneck_confidence", + ) if err := cw.Write(header); err != nil { return fmt.Errorf("write csv header: %w", err) } @@ -129,6 +144,11 @@ func writeRPSCSV(w io.Writer, results []rpsStepResult) error { strconv.Itoa(r.AttemptedOps), strconv.Itoa(r.FailedOps), strconv.Itoa(r.Saturation), r.WorstDurable, strconv.FormatInt(r.WorstDelta, 10), r.Kind.String(), strings.Join(r.Reasons, "; ")) + if bn != nil && r.Kind == verdictTrip { + row = append(row, bottleneckCSVColumns(bn)...) + } else { + row = append(row, "", "", "") + } if err := cw.Write(row); err != nil { return fmt.Errorf("write csv row: %w", err) } diff --git a/tools/loadgen/maxrps_report_test.go b/tools/loadgen/maxrps_report_test.go index 19a5e7b32..d220d5391 100644 --- a/tools/loadgen/maxrps_report_test.go +++ b/tools/loadgen/maxrps_report_test.go @@ -49,13 +49,14 @@ func TestLastPassRPS(t *testing.T) { func TestWriteRPSCSV(t *testing.T) { var buf bytes.Buffer - require.NoError(t, writeRPSCSV(&buf, sampleResults())) + require.NoError(t, writeRPSCSV(&buf, sampleResults(), nil)) lines := strings.Split(strings.TrimSpace(buf.String()), "\n") require.Len(t, lines, 4) // header + 3 rows assert.Contains(t, lines[0], "target_rps") assert.Contains(t, lines[0], "achieved_rps") assert.Contains(t, lines[0], "E1_p95_ms") assert.Contains(t, lines[0], "verdict") + assert.Contains(t, lines[0], "bottleneck_component") assert.Contains(t, lines[3], "2000") assert.Contains(t, lines[3], "TRIP") } @@ -104,7 +105,7 @@ func TestRenderRPSReport_MultiSeriesAlignment(t *testing.T) { assert.Contains(t, out, "E2 p95") var csvBuf bytes.Buffer - require.NoError(t, writeRPSCSV(&csvBuf, results)) + require.NoError(t, writeRPSCSV(&csvBuf, results, nil)) lines := strings.Split(strings.TrimSpace(csvBuf.String()), "\n") require.Len(t, lines, 3) // header + 2 rows assert.Contains(t, lines[0], "E1_p95_ms,E1_p99_ms,E2_p95_ms,E2_p99_ms") @@ -113,3 +114,24 @@ func TestRenderRPSReport_MultiSeriesAlignment(t *testing.T) { assert.Equal(t, "0", cols[4]) // E2_p95_ms assert.Equal(t, "0", cols[5]) // E2_p99_ms } + +func TestRenderRPSReport_AppendsBottleneck(t *testing.T) { + results := []rpsStepResult{ + {TargetRPS: 1000, Kind: verdictPass}, + {TargetRPS: 2000, Kind: verdictTrip, Reasons: []string{"E2 p95=143ms > 100ms"}}, + } + bn := bottleneckVerdict{Component: "message-worker", Resource: "Cassandra", Confidence: "high", Determined: true, + Reasons: []string{"message-worker consumer backlog grew"}} + var sb strings.Builder + require.NoError(t, renderRPSReportWithBottleneck(&sb, results, "messages", "medium", &bn)) + out := sb.String() + assert.Contains(t, out, "ANSWER: max RPS = 1000") + assert.Contains(t, out, "BOTTLENECK: message-worker (Cassandra-bound)") +} + +func TestRenderRPSReport_NilBottleneckUnchanged(t *testing.T) { + results := []rpsStepResult{{TargetRPS: 1000, Kind: verdictPass}} + var sb strings.Builder + require.NoError(t, renderRPSReportWithBottleneck(&sb, results, "messages", "medium", nil)) + assert.NotContains(t, sb.String(), "BOTTLENECK:") +} diff --git a/tools/loadgen/maxrps_test.go b/tools/loadgen/maxrps_test.go index b165d2069..377ec0abb 100644 --- a/tools/loadgen/maxrps_test.go +++ b/tools/loadgen/maxrps_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "testing" "time" @@ -26,3 +27,19 @@ func TestBuildThresholds(t *testing.T) { assert.Equal(t, uint64(1000), th.PendingGrowth) assert.Equal(t, 0.05, th.RateTolerance) } + +func TestDiagnoseBottleneck_NilGuards(t *testing.T) { + th := buildThresholds(100*time.Millisecond, 250*time.Millisecond, 0.001, 1000, 0.05) + tripped := []rpsStepResult{{TargetRPS: 1000, Kind: verdictPass}, {TargetRPS: 2000, Kind: verdictTrip}} + enabled := func() bottleneckConfig { return bottleneckConfig{Enabled: true, PromURL: "http://prom:9090"} } + + // non-messages workload -> nil + assert.Nil(t, diagnoseBottleneck(context.Background(), &config{Bottleneck: enabled()}, "history", tripped, th)) + // disabled -> nil + assert.Nil(t, diagnoseBottleneck(context.Background(), &config{Bottleneck: bottleneckConfig{Enabled: false, PromURL: "http://prom:9090"}}, "messages", tripped, th)) + // no prom url -> nil + assert.Nil(t, diagnoseBottleneck(context.Background(), &config{Bottleneck: bottleneckConfig{Enabled: true, PromURL: ""}}, "messages", tripped, th)) + // no trip -> nil + noTrip := []rpsStepResult{{TargetRPS: 1000, Kind: verdictPass}} + assert.Nil(t, diagnoseBottleneck(context.Background(), &config{Bottleneck: enabled()}, "messages", noTrip, th)) +} diff --git a/tools/loadgen/promclient.go b/tools/loadgen/promclient.go new file mode 100644 index 000000000..7c4dbde65 --- /dev/null +++ b/tools/loadgen/promclient.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/go-resty/resty/v2" + + "github.com/hmchangw/chat/pkg/restyutil" +) + +// promSample is one (timestamp, value) point from a Prometheus matrix result. +type promSample struct { + T time.Time + V float64 +} + +// promSeries is one labelled time-series returned by a range query. +type promSeries struct { + Labels map[string]string + Samples []promSample +} + +// promClient queries the Prometheus HTTP API. It satisfies the promQuerier +// interface (defined in attribution.go) used as the production querier. +type promClient struct { + rc *resty.Client +} + +// newPromClient builds a client against a Prometheus base URL (e.g. +// "http://prometheus:9090"). A short timeout keeps a slow/missing Prometheus +// from stalling the end-of-run report. +func newPromClient(baseURL string) *promClient { + return &promClient{rc: restyutil.New(baseURL, restyutil.WithTimeout(10*time.Second))} +} + +// rangeQueryResponse mirrors the subset of the query_range payload we read. +type rangeQueryResponse struct { + Status string `json:"status"` + Error string `json:"error"` + Data struct { + Result []struct { + Metric map[string]string `json:"metric"` + Values [][2]any `json:"values"` + } `json:"result"` + } `json:"data"` +} + +// RangeQuery runs a PromQL range query and returns one promSeries per result. +func (c *promClient) RangeQuery(ctx context.Context, query string, start, end time.Time, step time.Duration) ([]promSeries, error) { + resp, err := c.rc.R(). + SetContext(ctx). + SetQueryParams(map[string]string{ + "query": query, + "start": strconv.FormatInt(start.Unix(), 10), + "end": strconv.FormatInt(end.Unix(), 10), + "step": strconv.FormatFloat(step.Seconds(), 'f', -1, 64), + }). + Get("/api/v1/query_range") + if err != nil { + return nil, fmt.Errorf("query prometheus: %w", err) + } + + var parsed rangeQueryResponse + if err := json.Unmarshal(resp.Body(), &parsed); err != nil { + return nil, fmt.Errorf("decode prometheus response: %w", err) + } + if parsed.Status != "success" { + return nil, fmt.Errorf("prometheus query failed: %s", parsed.Error) + } + + out := make([]promSeries, 0, len(parsed.Data.Result)) + for _, r := range parsed.Data.Result { + s := promSeries{Labels: r.Metric} + for _, v := range r.Values { + ts, ok := v[0].(float64) + if !ok { + continue + } + raw, ok := v[1].(string) + if !ok { + continue + } + val, err := strconv.ParseFloat(raw, 64) + if err != nil { + continue + } + // Prometheus timestamps are float64 seconds; truncating to int64 + // loses at most 1s, acceptable at the attribution query step. + s.Samples = append(s.Samples, promSample{T: time.Unix(int64(ts), 0).UTC(), V: val}) + } + out = append(out, s) + } + return out, nil +} diff --git a/tools/loadgen/promclient_test.go b/tools/loadgen/promclient_test.go new file mode 100644 index 000000000..e5a2a0db6 --- /dev/null +++ b/tools/loadgen/promclient_test.go @@ -0,0 +1,81 @@ +package main + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPromClient_RangeQuery_ParsesMatrix(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/v1/query_range", r.URL.Path) + assert.NotEmpty(t, r.URL.Query().Get("query")) + _, _ = w.Write([]byte(`{ + "status":"success", + "data":{"resultType":"matrix","result":[ + {"metric":{"container_label_com_docker_compose_service":"cassandra"}, + "values":[[100,"10.5"],[105,"11.0"]]} + ]}}`)) + })) + defer srv.Close() + + c := newPromClient(srv.URL) + start := time.Unix(100, 0) + series, err := c.RangeQuery(context.Background(), `up`, start, start.Add(5*time.Second), 5*time.Second) + require.NoError(t, err) + require.Len(t, series, 1) + assert.Equal(t, "cassandra", series[0].Labels["container_label_com_docker_compose_service"]) + require.Len(t, series[0].Samples, 2) + assert.Equal(t, 10.5, series[0].Samples[0].V) + assert.Equal(t, time.Unix(100, 0).UTC(), series[0].Samples[0].T.UTC()) + assert.Equal(t, 11.0, series[0].Samples[1].V) + assert.Equal(t, time.Unix(105, 0).UTC(), series[0].Samples[1].T.UTC()) +} + +func TestPromClient_RangeQuery_NonSuccessStatus(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"status":"error","errorType":"bad_data","error":"boom"}`)) + })) + defer srv.Close() + + _, err := newPromClient(srv.URL).RangeQuery(context.Background(), `up`, time.Unix(0, 0), time.Unix(5, 0), time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "boom") +} + +func TestPromClient_RangeQuery_NonJSONBody(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("503 Service Unavailable")) + })) + defer srv.Close() + + _, err := newPromClient(srv.URL).RangeQuery(context.Background(), `up`, time.Unix(0, 0), time.Unix(5, 0), time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "decode prometheus response") +} + +func TestPromClient_RangeQuery_SkipsMalformedSamples(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // First pair has a null timestamp (not float64) and must be skipped; + // second pair has a non-numeric value and must be skipped; third is valid. + _, _ = w.Write([]byte(`{ + "status":"success", + "data":{"resultType":"matrix","result":[ + {"metric":{},"values":[[null,"1.0"],[100,"notanumber"],[105,"2.5"]]} + ]}}`)) + })) + defer srv.Close() + + series, err := newPromClient(srv.URL).RangeQuery(context.Background(), `up`, time.Unix(100, 0), time.Unix(105, 0), time.Second) + require.NoError(t, err) + require.Len(t, series, 1) + require.Len(t, series[0].Samples, 1) + assert.Equal(t, 2.5, series[0].Samples[0].V) + assert.Equal(t, time.Unix(105, 0).UTC(), series[0].Samples[0].T.UTC()) +} diff --git a/tools/loadgen/ramp.go b/tools/loadgen/ramp.go index dc223cf64..bcb4b8543 100644 --- a/tools/loadgen/ramp.go +++ b/tools/loadgen/ramp.go @@ -87,7 +87,9 @@ func runRamp(ctx context.Context, w rpsWorkload, cfg *rampConfig) []rpsStepResul if ctx.Err() != nil { break } + stepStart := time.Now() in, err := w.RunStep(ctx, n, cfg.Warmup, cfg.Hold) + stepEnd := time.Now() if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { break @@ -96,6 +98,12 @@ func runRamp(ctx context.Context, w rpsWorkload, cfg *rampConfig) []rpsStepResul break } res := evaluateRPSStep(&in, cfg.Thresholds) + // RunStep does warmup then hold sequentially; approximate the hold + // window as [start+warmup, end] so metric queries skip the ramp-up. + // Note: stepEnd includes any post-hold drain the adapter performs, so + // HoldEnd may trail the true hold end by the drain duration. + res.HoldStart = stepStart.Add(cfg.Warmup) + res.HoldEnd = stepEnd results = append(results, res) slog.Info("step complete", "rps", n, "verdict", res.Kind.String(), "achieved", res.AchievedRPS, "reasons", res.Reasons) diff --git a/tools/loadgen/ramp_test.go b/tools/loadgen/ramp_test.go index 2d6fb6fe8..1999df55d 100644 --- a/tools/loadgen/ramp_test.go +++ b/tools/loadgen/ramp_test.go @@ -139,6 +139,31 @@ func TestRunRamp_StopsWhenCooldownCancelled(t *testing.T) { require.Len(t, results, 1) } +type windowFakeWorkload struct{} + +func (f windowFakeWorkload) Label() string { return "fake" } +func (f windowFakeWorkload) RunStep(ctx context.Context, rps int, warmup, hold time.Duration) (rpsStepInputs, error) { + return rpsStepInputs{TargetRPS: rps, Hold: hold, AttemptedOps: rps}, nil +} + +func TestRunRamp_RecordsHoldWindow(t *testing.T) { + before := time.Now() + results := runRamp(context.Background(), windowFakeWorkload{}, &rampConfig{ + Steps: []int{100}, + Hold: 30 * time.Second, + Warmup: 1 * time.Millisecond, + Thresholds: buildThresholds(time.Second, time.Second, 1, 1<<62, 0), + }) + after := time.Now() + require.Len(t, results, 1) + assert.False(t, results[0].HoldStart.IsZero(), "HoldStart should be set") + assert.False(t, results[0].HoldEnd.IsZero(), "HoldEnd should be set") + // HoldStart = stepStart + Warmup, so it must not be before before+1ms. + assert.False(t, results[0].HoldStart.Before(before.Add(1*time.Millisecond)), "HoldStart should reflect start+warmup") + // HoldEnd = stepEnd, which is before the ramp returned to the caller. + assert.False(t, results[0].HoldEnd.After(after), "HoldEnd should be no later than when runRamp returned") +} + func TestMaxRPSExitCode(t *testing.T) { pass := []rpsStepResult{{Kind: verdictPass}, {Kind: verdictTrip}} none := []rpsStepResult{{Kind: verdictInconclusive}, {Kind: verdictTrip}} diff --git a/tools/loadgen/stagegraph.go b/tools/loadgen/stagegraph.go new file mode 100644 index 000000000..bb2247e36 --- /dev/null +++ b/tools/loadgen/stagegraph.go @@ -0,0 +1,37 @@ +package main + +// stage is one node in a workload's pipeline. The bottleneck engine walks +// stages in flow order, mapping loadgen's signals (durable backlog, latency +// series) and cAdvisor's container metrics onto each one. +type stage struct { + Name string // logical component name, used in the verdict + Container string // cAdvisor compose-service label value + Durable string // durable consumer fronting this stage; "" if none + LatencySeries string // loadgen latency series measuring this stage; "" if none + DependsOn []string // external dependencies this stage calls into (e.g. databases) +} + +// messagesStageGraph describes the messages pipeline: +// publish -> message-gatekeeper -> MESSAGES_CANONICAL -> {message-worker (Cassandra), +// broadcast-worker (MongoDB membership + Valkey keys)}. E1 latency measures the +// gatekeeper front door; E2 is the end-to-end publish->broadcast time. +func messagesStageGraph() []stage { + return []stage{ + {Name: "message-gatekeeper", Container: "message-gatekeeper", LatencySeries: "E1"}, + {Name: "message-worker", Container: "message-worker", Durable: "message-worker", DependsOn: []string{"cassandra"}}, + {Name: "broadcast-worker", Container: "broadcast-worker", Durable: "broadcast-worker", LatencySeries: "E2", DependsOn: []string{"mongodb", "valkey"}}, + } +} + +// dependencyDisplayName maps an internal dependency key to a human label for +// the verdict ("message-worker (Cassandra-bound)"). Unknown keys pass through. +func dependencyDisplayName(dep string) string { + switch dep { + case "cassandra": + return "Cassandra" + case "mongodb": + return "MongoDB" + default: + return dep + } +} diff --git a/tools/loadgen/stagegraph_test.go b/tools/loadgen/stagegraph_test.go new file mode 100644 index 000000000..acf114753 --- /dev/null +++ b/tools/loadgen/stagegraph_test.go @@ -0,0 +1,34 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMessagesStageGraph_Shape(t *testing.T) { + g := messagesStageGraph() + require.Len(t, g, 3) + assert.Equal(t, "message-gatekeeper", g[0].Name) + assert.Equal(t, "message-gatekeeper", g[0].Container) + assert.Equal(t, "E1", g[0].LatencySeries) + assert.Empty(t, g[0].Durable) + + assert.Equal(t, "message-worker", g[1].Name) + assert.Equal(t, "message-worker", g[1].Container) + assert.Equal(t, "message-worker", g[1].Durable) + assert.Equal(t, []string{"cassandra"}, g[1].DependsOn) + + assert.Equal(t, "broadcast-worker", g[2].Name) + assert.Equal(t, "broadcast-worker", g[2].Container) + assert.Equal(t, "broadcast-worker", g[2].Durable) + assert.Equal(t, "E2", g[2].LatencySeries) + assert.Equal(t, []string{"mongodb", "valkey"}, g[2].DependsOn) +} + +func TestDependencyDisplayName(t *testing.T) { + assert.Equal(t, "Cassandra", dependencyDisplayName("cassandra")) + assert.Equal(t, "MongoDB", dependencyDisplayName("mongodb")) + assert.Equal(t, "valkey", dependencyDisplayName("valkey")) // unknown -> as-is +} diff --git a/tools/loadgen/verdict.go b/tools/loadgen/verdict.go index 024c5da22..befcfe21b 100644 --- a/tools/loadgen/verdict.go +++ b/tools/loadgen/verdict.go @@ -82,6 +82,12 @@ type rpsStepResult struct { WorstDelta int64 Kind verdictKind Reasons []string + // Pending carries every durable's backlog delta for the step (not just the + // worst), so the bottleneck engine can map a delta to a pipeline stage. + Pending []consumerPendingDelta + // HoldStart/HoldEnd bound the step's measurement window in wall-clock time, + // set by runRamp. Used to query container metrics over the same interval. + HoldStart, HoldEnd time.Time } // evaluateRPSStep classifies a step PASS / TRIP / INCONCLUSIVE. @@ -100,6 +106,7 @@ func evaluateRPSStep(in *rpsStepInputs, th rpsThresholds) rpsStepResult { FailedOps: in.FailedOps, Saturation: in.Saturation, } + res.Pending = in.Pending if in.Hold > 0 { res.AchievedRPS = float64(in.AttemptedOps) / in.Hold.Seconds() } diff --git a/tools/loadgen/verdict_test.go b/tools/loadgen/verdict_test.go index 0ace99e8a..26a0796ca 100644 --- a/tools/loadgen/verdict_test.go +++ b/tools/loadgen/verdict_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func ms(n int) time.Duration { return time.Duration(n) * time.Millisecond } @@ -195,3 +196,19 @@ func TestVerdictKind_String(t *testing.T) { assert.Equal(t, "TRIP", verdictTrip.String()) assert.Equal(t, "INCONCLUSIVE", verdictInconclusive.String()) } + +func TestEvaluateRPSStep_CopiesPending(t *testing.T) { + in := &rpsStepInputs{ + TargetRPS: 1000, + Hold: 30 * time.Second, + AttemptedOps: 30000, + Pending: []consumerPendingDelta{ + {Durable: "message-worker", Start: 0, End: 5000}, + {Durable: "broadcast-worker", Start: 0, End: 10}, + }, + } + res := evaluateRPSStep(in, buildThresholds(100*time.Millisecond, 250*time.Millisecond, 0.001, 1000, 0.05)) + require.Len(t, res.Pending, 2) + assert.Equal(t, "message-worker", res.Pending[0].Durable) + assert.Equal(t, int64(5000), res.Pending[0].Delta()) +}