Skip to content
222 changes: 182 additions & 40 deletions benchmarks/benchmark_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import statistics
import subprocess
import threading
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -83,22 +84,62 @@ class BenchmarkWorkload(ShapeDtypeWorkload, InputGeneratingWorkload, Protocol):
# A single test function may call record() multiple times (tileops + baseline).
_bench_results = threading.local()

# Kernel name substrings that identify L2-flush operations (cache.zero_() on
# the dedicated _l2_flush_cache buffer). Filtered out of CUPTI timing so flush
# overhead is never counted as benchmark kernel time.
#
# The flush buffer is a large int32 tensor (sized to L2 cache) whose sole
# purpose is L2 eviction via cache.zero_(). To avoid false-positive exclusion
# of user kernels, we match BOTH the FillFunctor pattern (from Tensor.zero_())
# AND the vectorized_elementwise pattern (specific to PyTorch's unary kernel
# dispatch). User code that calls fill_() or zero_() on regular tensors will
# still trigger FillFunctor, but typically with different kernel signatures
# (e.g., different element types or lack of vectorization for small tensors).
#
# If false positives persist, consider: (1) tracking the flush buffer's pointer
# address via kineto correlation IDs, or (2) using a uniquely named flush kernel
# via a custom CUDA extension instead of relying on Tensor.zero_().
_FLUSH_PATTERNS: tuple[str, ...] = ("vectorized_elementwise", "FillFunctor")


def _sum_kernel_time_us(kineto_results):
"""Extract total CUDA kernel time directly from C++ Kineto events.

Bypasses ``profiler.key_averages()`` which triggers expensive Python
event parsing (~120ms) and tree building (~10ms) for large traces.
Direct C++ iteration is ~16x faster for n_repeat=1280.

L2 flush kernels (``cache.zero_()`` on the flush buffer) are excluded.
Flush events are identified by kernel names containing BOTH
``vectorized_elementwise`` AND ``FillFunctor`` (the specific pattern
emitted by ``Tensor.zero_()`` on large int32 tensors). Generic patterns
like ``"Memset"``, ``"memset"``, and ``"fill_kernel"`` are intentionally
*not* filtered to avoid silently dropping real benchmark kernels. Matching
both substrings reduces false-positive exclusion of user code that calls
``fill_()`` on small or non-vectorized tensors.

Returns:
tuple[float, dict[str, float]]: (total_us, per_kernel_us) where
``per_kernel_us`` maps each kernel name to its total duration in
microseconds across all timed iterations. Use the breakdown to
detect helper / temporary-tensor kernels that inflate the stat
(e.g. MHA decode split-path workspace fills, cuBLAS epilogue
kernels, or any unexpected cuDNN helper).
"""
total_us = 0.0
per_kernel: dict[str, float] = {}
excluded_kernel: dict[str, float] = {}
for evt in kineto_results.events():
if evt.device_type() == DeviceType.CUDA:
name = evt.name()
if "vectorized_elementwise" in name and "FillFunctor" in name:
dur = evt.duration_ns() / 1000.0
# Match flush events by requiring ALL patterns (AND logic)
if all(p in name for p in _FLUSH_PATTERNS):
excluded_kernel[name] = excluded_kernel.get(name, 0.0) + dur
continue
total_us += evt.duration_ns() / 1000.0
return total_us
total_us += dur
per_kernel[name] = per_kernel.get(name, 0.0) + dur
return total_us, per_kernel, excluded_kernel


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -127,8 +168,8 @@ def bench_kernel(
args: tuple[Any, ...] = (),
n_warmup: int = 10,
n_repeat: int = 50,
n_trials: int = 3,
) -> float:
n_trials: int = 5,
) -> dict:
"""Benchmark a GPU kernel with pure kernel timing via CUPTI.

Protocol (adapted from NVIDIA SOL-ExecBench, arxiv.org/abs/2603.19173):
Expand All @@ -142,7 +183,7 @@ def bench_kernel(

Uses CUPTI via torch.profiler for accurate kernel-only timing, with
direct Kineto C++ event iteration to avoid Python parsing overhead.
Falls back to CUDA events if CUPTI is unavailable.
Falls back to CUDA event timing if CUPTI is unavailable (with a warning).

Args:
fn: Callable to benchmark. If *args* is provided, called as
Expand All @@ -151,10 +192,16 @@ def bench_kernel(
values are passed through unchanged.
n_warmup: Warmup iterations (default 10).
n_repeat: Timed iterations per trial (default 50).
n_trials: Independent trials (default 3).
n_trials: Independent trials (default 5).

Returns:
Kernel latency in **milliseconds**.
dict with keys:
- ``latency_ms``: median-of-trials mean kernel latency in milliseconds
- ``stdev_ms``: standard deviation across trial means (0.0 when only
one trial is available)
- ``timing_backend``: ``"cupti"`` (preferred) or ``"cuda_event"`` (fallback)
- ``event_breakdown``: dict mapping CUDA kernel name → total_us across
*all* timed iterations of the median trial (empty for CUDA event fallback).
"""
if not isinstance(args, tuple):
raise TypeError(
Expand Down Expand Up @@ -201,11 +248,23 @@ def _run(i):
# Timed trials with CUPTI (single profiler, n_trials cycles)
trial_means: list[float] = []

trial_breakdowns: list[dict[str, float]] = []

def _on_trace_ready(prof):
kr = prof.profiler.kineto_results
kernel_us = _sum_kernel_time_us(kr) / n_repeat
trial_means.append(kernel_us * 1e-3)
total_us, per_kernel, excluded_kernel = _sum_kernel_time_us(kr)
trial_means.append(total_us / n_repeat * 1e-3)
trial_breakdowns.append(per_kernel)
if excluded_kernel:
excluded_us = sum(excluded_kernel.values())
_logger.debug(
"CUPTI: excluded %.1f µs across %d flush/fill kernel(s): %s",
excluded_us,
len(excluded_kernel),
list(excluded_kernel.keys()),
)

cupti_ok = True
try:
with suppress_stdout_stderr():
schedule = torch.profiler.schedule(
Expand All @@ -229,30 +288,67 @@ def _on_trace_ready(prof):
_run(i)
profiler.step()
except RuntimeError:
pass
cupti_ok = False
finally:
# Free the arg pool and release cached GPU memory to prevent
# accumulation across hundreds of benchmark calls.
if arg_pool is not None:
del arg_pool
torch.cuda.empty_cache()

# Fallback to CUDA events if CUPTI failed
if not trial_means:
for _ in range(n_trials):
start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
for i in range(n_repeat):
cache.zero_()
start_events[i].record()
_run(i)
end_events[i].record()
torch.cuda.synchronize()
times = [s.elapsed_time(e) for s, e in zip(start_events, end_events, strict=True)]
trial_means.append(sum(times) / len(times))

# Free the arg pool and release cached GPU memory to prevent
# accumulation across hundreds of benchmark calls.
if arg_pool is not None:
del arg_pool
torch.cuda.empty_cache()

trial_means.sort()
return trial_means[len(trial_means) // 2]
cupti_ok = False

if cupti_ok:
# Pick median trial
timing_backend = "cupti"
# Sort by mean latency; pick median trial's breakdown too
paired = sorted(zip(trial_means, trial_breakdowns, strict=True), key=lambda x: x[0])
median_ms, median_breakdown = paired[len(paired) // 2]
stdev_ms = statistics.stdev(trial_means) if len(trial_means) > 1 else 0.0
return {
"latency_ms": median_ms,
"stdev_ms": stdev_ms,
"timing_backend": timing_backend,
"event_breakdown": median_breakdown,
}

# Fall back to CUDA event timing when CUPTI is unavailable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bench_kernel() still falls back to CUDA event timing after CUPTI fails or produces no trials, which contradicts the new mandatory-CUPTI contract and records timing_backend="cuda_event" results with a different timing envelope instead of failing the benchmark -> remove the fallback and raise a RuntimeError when CUPTI is unavailable or returns no kernel events.

_logger.warning(
"CUPTI unavailable or produced no results; falling back to CUDA event timing. "
"Ensure libcupti.so is on LD_LIBRARY_PATH for kernel-accurate measurements."
)
event_trial_means: list[float] = []
for _ in range(n_trials):
start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_repeat)]
# Warmup
for i in range(n_repeat):
Comment thread
stelladuyx marked this conversation as resolved.
cache.zero_()
_run(i)
torch.cuda.synchronize()
# Timed iterations: flush is outside the event window so only _run() is measured.
for i in range(n_repeat):
cache.zero_()
start_events[i].record()
_run(i)
end_events[i].record()
torch.cuda.synchronize()
trial_us = sum(
s.elapsed_time(e) * 1e3
for s, e in zip(start_events, end_events, strict=True)
)
event_trial_means.append(trial_us / n_repeat * 1e-3)

paired_ev = sorted(event_trial_means)
median_ms = paired_ev[len(paired_ev) // 2]
stdev_ms = statistics.stdev(event_trial_means) if len(event_trial_means) > 1 else 0.0
return {
"latency_ms": median_ms,
"stdev_ms": stdev_ms,
"timing_backend": "cuda_event",
"event_breakdown": {},
}


def _get_env_metadata() -> list[str]:
Expand All @@ -267,16 +363,30 @@ def _get_env_metadata() -> list[str]:
else:
lines.append("- **GPU model**: N/A (no CUDA device)")

# Try to get NVIDIA driver version from nvidia-smi
# Try to get NVIDIA driver version and GPU telemetry from nvidia-smi
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"],
[
"nvidia-smi",
"--query-gpu=driver_version,clocks.mem,clocks.gr,clocks.sm,power.draw,temperature.gpu",
"--format=csv,noheader,nounits",
],
capture_output=True, text=True, timeout=5,
)
driver = result.stdout.strip().split("\n")[0] if result.returncode == 0 else "N/A"
if result.returncode == 0:
parts = [p.strip() for p in result.stdout.strip().split("\n")[0].split(",")]
driver, mem_clock, gr_clock, sm_clock, power_draw, gpu_temp = (parts + ["N/A"] * 6)[:6]
else:
driver = mem_clock = gr_clock = sm_clock = power_draw = gpu_temp = "N/A"
except (FileNotFoundError, subprocess.TimeoutExpired):
driver = "N/A"
driver = mem_clock = gr_clock = sm_clock = power_draw = gpu_temp = "N/A"

lines.append(f"- **Driver version**: {driver}")
lines.append(f"- **Memory clock (MHz)**: {mem_clock}")
lines.append(f"- **Graphics clock (MHz)**: {gr_clock}")
lines.append(f"- **SM clock (MHz)**: {sm_clock}")
lines.append(f"- **Power draw (W)**: {power_draw}")
lines.append(f"- **GPU temperature (°C)**: {gpu_temp}")

return lines

Expand Down Expand Up @@ -325,8 +435,14 @@ def profile_autograd(self, functor: Any) -> dict:
latency = bench_kernel(functor)
return self._build_result(latency)

def _build_result(self, latency: float) -> dict:
result = {"latency_ms": latency}
def _build_result(self, bench_result: dict) -> dict:
latency = bench_result["latency_ms"]
result = {
"latency_ms": latency,
"stdev_ms": bench_result.get("stdev_ms", 0.0),
"timing_backend": bench_result.get("timing_backend", "unknown"),
"event_breakdown": bench_result.get("event_breakdown", {}),
Comment thread
stelladuyx marked this conversation as resolved.
}
flops = self.calculate_flops()
if flops is not None:
result["tflops"] = flops / latency * 1e-9
Expand Down Expand Up @@ -546,6 +662,14 @@ def _is_serializable(v: Any) -> bool:
entry = {"tag": tag, "op": name, **result}
if op_module:
entry["op_module"] = op_module
if op_config:
entry["config"] = op_config
# Limit event_breakdown to top-10 kernels by total time to keep
# JUnit XML properties compact; full breakdown is in profile_run.log.
breakdown = result.get("event_breakdown", {})
if breakdown:
top10 = dict(sorted(breakdown.items(), key=lambda x: x[1], reverse=True)[:10])
entry["event_breakdown_top10"] = top10
Comment thread
stelladuyx marked this conversation as resolved.
_bench_results.entries.append(entry)

_logger.info("op=%s module=%s tag=%s latency_ms=%.4f tflops=%.2f",
Expand All @@ -569,7 +693,7 @@ def dump(path: str) -> None:
lines.extend(_get_env_metadata())
lines.append("")

result_keys = ["latency_ms", "tflops", "bandwidth_tbs"]
result_keys = ["latency_ms", "stdev_ms", "timing_backend", "tflops", "bandwidth_tbs"]

for name, entries in BenchmarkReport._records.items():
if not entries:
Expand Down Expand Up @@ -599,12 +723,30 @@ def dump(path: str) -> None:
row = [str(entry["params"].get(k, "")) for k in param_keys]
for rk in result_keys:
val = entry["result"].get(rk)
row.append(f"{val:.4f}" if val is not None else "N/A")
if rk == "timing_backend":
row.append(str(val) if val else "N/A")
elif isinstance(val, (int, float)):
row.append(f"{val:.4f}")
else:
row.append("N/A")
if has_config:
cfg = entry.get("config")
row.append(str(cfg) if cfg else "")
lines.append("| " + " | ".join(row) + " |")

# Append full event_breakdown for this entry (if present)
breakdown = entry["result"].get("event_breakdown", {})
if breakdown:
lines.append("")
lines.append(f"**Event breakdown** (params: {entry['params']}):")
lines.append("")
sorted_breakdown = sorted(breakdown.items(), key=lambda x: x[1], reverse=True)
lines.append("| Kernel | Time (µs) |")
lines.append("| --- | --- |")
for kernel_name, time_us in sorted_breakdown:
lines.append(f"| `{kernel_name}` | {time_us:.1f} |")
lines.append("")

lines.append("")

with open(path, "w") as f:
Expand Down
14 changes: 14 additions & 0 deletions benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ def pytest_runtest_call(item):
bw = tileops_entry.get("bandwidth_tbs")
if bw is not None:
item.user_properties.append(("tileops_bandwidth_tbs", f"{bw:.2f}"))
# New diagnostics: timing backend, stdev, event breakdown
timing_backend = tileops_entry.get("timing_backend")
if timing_backend:
item.user_properties.append(("timing_backend", timing_backend))
stdev_ms = tileops_entry.get("stdev_ms")
if stdev_ms is not None:
item.user_properties.append(("stdev_ms", f"{stdev_ms:.4f}"))
event_breakdown_top10 = tileops_entry.get("event_breakdown_top10")
if event_breakdown_top10:
# Serialize as "kernel1:us1,kernel2:us2,..."
breakdown_str = ",".join(
f"{k}:{v:.1f}" for k, v in event_breakdown_top10.items()
)
item.user_properties.append(("event_breakdown_top10", breakdown_str))

# Write all baselines into JUnit XML properties.
# The first baseline uses the legacy unprefixed names (baseline_tag, etc.)
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/ops/attention/bench_gqa_fp8.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def test_gqa_prefill_fp8_tensor_core_bench(case: GQAFp8TensorCoreBenchCase) -> N
inputs = _make_inputs(case)
op(*inputs)
torch.cuda.synchronize()
latency_ms = bench_kernel(op, args=inputs, n_warmup=1, n_repeat=3, n_trials=1)
bench_result = bench_kernel(op, args=inputs, n_warmup=1, n_repeat=3, n_trials=1)
latency_ms = bench_result["latency_ms"]
flops, bytes_moved = op.eval_roofline()
result = {
"latency_ms": latency_ms,
Expand All @@ -124,7 +125,8 @@ def test_gqa_prefill_fp8_tensor_core_bench(case: GQAFp8TensorCoreBenchCase) -> N

fa3_fn = _fa3_gqa_fp8_fwd()
if fa3_fn is not None:
fa3_latency_ms = bench_kernel(fa3_fn, args=inputs, n_warmup=1, n_repeat=3, n_trials=1)
fa3_bench = bench_kernel(fa3_fn, args=inputs, n_warmup=1, n_repeat=3, n_trials=1)
fa3_latency_ms = fa3_bench["latency_ms"]
fa3_result = {
"latency_ms": fa3_latency_ms,
"tflops": flops / fa3_latency_ms * 1e-9 if fa3_latency_ms > 0 else 0.0,
Expand Down
Loading
Loading