diff --git a/backend/docs/README.md b/backend/docs/README.md index 27e33f854c..d0531f938a 100644 --- a/backend/docs/README.md +++ b/backend/docs/README.md @@ -19,6 +19,7 @@ This directory contains detailed documentation for the DeerFlow backend. | [STREAMING.md](STREAMING.md) | Token-level streaming design: Gateway vs DeerFlowClient paths, `stream_mode` semantics, per-id dedup | | [FILE_UPLOAD.md](FILE_UPLOAD.md) | File upload functionality | | [PATH_EXAMPLES.md](PATH_EXAMPLES.md) | Path types and usage examples | +| [SANDBOX_MEMORY_PROFILING.md](SANDBOX_MEMORY_PROFILING.md) | Sandbox memory baseline and runtime comparison guide | | [summarization.md](summarization.md) | Context summarization feature | | [plan_mode_usage.md](plan_mode_usage.md) | Plan mode with TodoList | | [AUTO_TITLE_GENERATION.md](AUTO_TITLE_GENERATION.md) | Automatic title generation | diff --git a/backend/docs/SANDBOX_MEMORY_PROFILING.md b/backend/docs/SANDBOX_MEMORY_PROFILING.md new file mode 100644 index 0000000000..7eb15f5799 --- /dev/null +++ b/backend/docs/SANDBOX_MEMORY_PROFILING.md @@ -0,0 +1,81 @@ +# Sandbox Memory Profiling + +This guide records a repeatable baseline before changing the sandbox runtime. +Issue #3213 reports per-sandbox memory near 1 GiB in Kubernetes. Before adding +or recommending a new provider, capture the current AIO sandbox baseline and +compare candidates with the same DeerFlow workload. + +## What to Measure + +Measure at least these samples: + +1. Empty sandbox after it becomes ready. +2. After a simple bash command. +3. After a Python task that imports common packages. +4. After a Node task when Node-based workloads are expected. +5. After generating files under `/mnt/user-data/outputs`. +6. After release and warm reuse. +7. At the target concurrency level, for example 10, 50, or 100 sandboxes. + +`kubectl top` reports Kubernetes/container working set memory. Treat it as a +capacity signal, not exclusive RSS/PSS. Pod-level memory includes every +container in the Pod and may include cache charged to the cgroup. If a result +looks surprising, inspect the sandbox processes and cgroup metrics on the node +before drawing conclusions. + +## Capture a Snapshot + +Run this from the repository root: + +```bash +python scripts/sandbox_memory_profile.py \ + --namespace deer-flow \ + --selector app=deer-flow-sandbox \ + --sample empty \ + --include-processes \ + --format markdown +``` + +Use a descriptive `--sample` value for each phase: + +```bash +python scripts/sandbox_memory_profile.py --sample after-bash --format json +python scripts/sandbox_memory_profile.py --sample after-python --format json +python scripts/sandbox_memory_profile.py --sample after-artifact --format json +``` + +`--include-processes` runs `kubectl exec ... ps` in each sandbox Pod and adds +the highest-RSS processes to the report. This helps distinguish Pod-level cgroup +memory from process RSS. The two numbers will not match exactly because cgroup +memory can include cache and other kernel-accounted memory. + +Save the raw JSON when comparing backends so totals, pod names, images, +requests, limits, and timestamps can be audited later. + +## Candidate Runtime Matrix + +For AIO, CubeSandbox, OpenSandbox, gVisor, Kata, or another candidate, compare +the same workload and record: + +| Area | Required Evidence | +| --- | --- | +| Capacity | Pod or instance count, total memory, average memory, max memory | +| Startup | Ready latency at 1, 10, 50, and 100 concurrent sandboxes | +| Commands | Bash output, timeout behavior, failure shape | +| Files | `read_file`, `write_file`, binary `update_file`, `list_dir`, `glob`, `grep` | +| Uploads | Files uploaded by the gateway are visible inside the sandbox | +| Artifacts | Files written to `/mnt/user-data/outputs` are readable by the backend artifact API | +| Paths | `/mnt/user-data/workspace`, `/mnt/user-data/uploads`, `/mnt/user-data/outputs`, `/mnt/acp-workspace`, and skills paths keep their expected semantics | +| Isolation | Different users and threads cannot read each other's data | +| Cleanup | Release, idle timeout, process restart, and orphan cleanup free resources | +| Operations | Deployment prerequisites, privileged components, networking, storage, and upgrade path | + +## PR Guidance + +Do not claim that a new provider fixes high-concurrency memory usage until the +same DeerFlow workload has been measured on both the current AIO sandbox and the +candidate backend. + +For an experimental provider PR, prefer `Related to #3213` unless the PR also +includes reproducible DeerFlow workload data that demonstrates the target memory +reduction and preserves uploads, outputs, artifacts, and isolation behavior. diff --git a/backend/tests/test_sandbox_memory_profile_script.py b/backend/tests/test_sandbox_memory_profile_script.py new file mode 100644 index 0000000000..5b49b1843c --- /dev/null +++ b/backend/tests/test_sandbox_memory_profile_script.py @@ -0,0 +1,265 @@ +from __future__ import annotations + +import importlib.util +import subprocess +import sys +from pathlib import Path + + +def _load_module(): + repo_root = Path(__file__).resolve().parents[2] + script_path = repo_root / "scripts" / "sandbox_memory_profile.py" + spec = importlib.util.spec_from_file_location("sandbox_memory_profile", script_path) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def test_parse_memory_bytes_handles_kubernetes_units(): + mod = _load_module() + + assert mod.parse_memory_bytes("512Ki") == 512 * 1024 + assert mod.parse_memory_bytes("256Mi") == 256 * 1024 * 1024 + assert mod.parse_memory_bytes("1Gi") == 1024 * 1024 * 1024 + assert mod.parse_memory_bytes("0.1Gi") == 107374182 + assert mod.parse_memory_bytes("100M") == 100 * 1000 * 1000 + assert mod.parse_memory_bytes("bad") is None + + +def test_parse_top_pods_skips_header_and_preserves_raw_values(): + mod = _load_module() + + pods = mod.parse_top_pods( + """NAME\tCPU(cores)\tMEMORY(bytes) +sandbox-abc 29m 792Mi +sandbox-def 1 501Mi +""" + ) + + assert [pod.name for pod in pods] == ["sandbox-abc", "sandbox-def"] + assert pods[0].cpu_millicores == 29 + assert pods[0].memory_bytes == 792 * 1024 * 1024 + assert pods[1].cpu_millicores == 1000 + + +def test_parse_processes_sorts_by_rss_and_limits_results(): + mod = _load_module() + + processes = mod.parse_processes( + """PID\tPPID\tRSS\tCOMMAND +1 0 100 init +20 1 2048 python worker.py +21 1 bad ignored +30 1 512 node server.js +""", + limit=2, + ) + + assert [(process.pid, process.rss_kib, process.command) for process in processes] == [ + (20, 2048, "python worker.py"), + (30, 512, "node server.js"), + ] + + +def test_parse_processes_rejects_invalid_limit(): + mod = _load_module() + + try: + mod.parse_processes("1 0 100 init\n", limit=0) + except ValueError as exc: + assert "process limit" in str(exc) + else: + raise AssertionError("expected ValueError") + + +def test_build_report_merges_top_and_pod_metadata(): + mod = _load_module() + top_pods = mod.parse_top_pods("sandbox-abc 29m 792Mi\n") + pod_json = { + "items": [ + { + "metadata": { + "name": "sandbox-abc", + "labels": {"sandbox-id": "abc"}, + }, + "status": { + "phase": "Running", + "startTime": "2026-05-26T00:00:00Z", + }, + "spec": { + "containers": [ + { + "name": "sandbox", + "image": "sandbox:latest", + "resources": { + "requests": {"memory": "256Mi"}, + "limits": {"memory": "1Gi"}, + }, + } + ] + }, + } + ] + } + + report = mod.build_report( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="empty", + top_pods=top_pods, + pod_json=pod_json, + process_samples={ + "sandbox-abc": [ + mod.ProcessSample(pid=20, ppid=1, rss_kib=2048, command="python worker.py"), + ] + }, + ) + + assert report["summary"]["pod_count"] == 1 + assert report["summary"]["total_memory_mib"] == 792 + assert report["summary"]["pods_with_process_samples"] == 1 + assert report["pods"][0]["phase"] == "Running" + assert report["pods"][0]["processes"][0]["rss_mib"] == 2 + assert report["pods"][0]["containers"]["sandbox"]["limits"]["memory"] == "1Gi" + + +def test_render_markdown_escapes_process_command_pipes(): + mod = _load_module() + report = mod.build_report( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="pipe-command", + top_pods=mod.parse_top_pods("sandbox-abc 29m 792Mi\n"), + pod_json={"items": []}, + process_samples={ + "sandbox-abc": [ + mod.ProcessSample(pid=20, ppid=1, rss_kib=2048, command="bash -c 'cat a | sort'"), + ] + }, + ) + + markdown = mod.render_markdown(report) + + assert "cat a \\| sort" in markdown + + +def test_build_report_counts_unparsed_memory_values(): + mod = _load_module() + report = mod.build_report( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="partial", + top_pods=mod.parse_top_pods("sandbox-abc 29m 792Mi\nsandbox-def bad unknown\n"), + pod_json={"items": []}, + ) + + assert report["summary"]["pod_count"] == 2 + assert report["summary"]["parsed_memory_count"] == 1 + assert report["summary"]["unparsed_memory_count"] == 1 + assert report["summary"]["parsed_cpu_count"] == 1 + assert report["summary"]["unparsed_cpu_count"] == 1 + + +def test_build_report_includes_process_sample_errors(): + mod = _load_module() + report = mod.build_report( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="partial", + top_pods=mod.parse_top_pods("sandbox-abc 29m 792Mi\n"), + pod_json={"items": []}, + process_errors={"sandbox-abc": "exec denied"}, + ) + + assert report["summary"]["pods_with_process_sample_errors"] == 1 + assert report["process_errors"] == {"sandbox-abc": "exec denied"} + + +def test_collect_process_samples_records_errors_and_continues(monkeypatch): + mod = _load_module() + pods = [ + mod.TopPod("sandbox-ok", "1m", "1Mi", 1, 1024 * 1024), + mod.TopPod("sandbox-denied", "1m", "1Mi", 1, 1024 * 1024), + ] + + def fake_run_kubectl(args, *, kubectl, timeout=mod.DEFAULT_KUBECTL_TIMEOUT): + if "sandbox-denied" in args: + raise subprocess.CalledProcessError(1, args, stderr="exec denied") + return "PID PPID RSS COMMAND\n20 1 2048 python worker.py\n" + + monkeypatch.setattr(mod, "run_kubectl", fake_run_kubectl) + + result = mod.collect_process_samples( + pods, + namespace="deer-flow", + kubectl="kubectl", + limit=5, + ) + + assert result.samples["sandbox-ok"][0].pid == 20 + assert result.errors == {"sandbox-denied": "exec denied"} + + +def test_collect_process_samples_records_timeout_and_continues(monkeypatch): + mod = _load_module() + pods = [ + mod.TopPod("sandbox-timeout", "1m", "1Mi", 1, 1024 * 1024), + mod.TopPod("sandbox-ok", "1m", "1Mi", 1, 1024 * 1024), + ] + + def fake_run_kubectl(args, *, kubectl, timeout=mod.DEFAULT_KUBECTL_TIMEOUT): + if "sandbox-timeout" in args: + raise subprocess.TimeoutExpired(args, timeout) + return "PID PPID RSS COMMAND\n20 1 2048 python worker.py\n" + + monkeypatch.setattr(mod, "run_kubectl", fake_run_kubectl) + + result = mod.collect_process_samples( + pods, + namespace="deer-flow", + kubectl="kubectl", + limit=5, + kubectl_timeout=7, + ) + + assert result.samples["sandbox-ok"][0].pid == 20 + assert result.errors == {"sandbox-timeout": "kubectl exec timed out after 7 seconds"} + + +def test_render_markdown_includes_sample_and_notes(): + mod = _load_module() + report = mod.build_report( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="after-python", + top_pods=mod.parse_top_pods("sandbox-abc 29m 792Mi\n"), + pod_json={"items": []}, + ) + + markdown = mod.render_markdown(report) + + assert "Sample: `after-python`" in markdown + assert "Pods with process samples: `0`" in markdown + assert "Pods with process sample errors: `0`" in markdown + assert "| sandbox-abc |" in markdown + assert "kubectl top reports Kubernetes/container working set memory" in markdown + + +def test_collect_rejects_invalid_kubectl_timeout(): + mod = _load_module() + + try: + mod.collect( + namespace="deer-flow", + selector="app=deer-flow-sandbox", + sample="empty", + kubectl="kubectl", + kubectl_timeout=0, + ) + except ValueError as exc: + assert "kubectl-timeout" in str(exc) + else: + raise AssertionError("expected ValueError") diff --git a/scripts/sandbox_memory_profile.py b/scripts/sandbox_memory_profile.py new file mode 100644 index 0000000000..0763daafd4 --- /dev/null +++ b/scripts/sandbox_memory_profile.py @@ -0,0 +1,518 @@ +#!/usr/bin/env python3 +"""Collect Kubernetes sandbox pod memory snapshots for DeerFlow. + +This script is intentionally lightweight: it shells out to ``kubectl`` and +emits either JSON or Markdown so maintainers can compare sandbox backends and +workloads without adding runtime dependencies. +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +import sys +from dataclasses import dataclass +from datetime import datetime, timezone +from decimal import Decimal, InvalidOperation +from typing import Any + +DEFAULT_NAMESPACE = "deer-flow" +DEFAULT_SELECTOR = "app=deer-flow-sandbox" +DEFAULT_KUBECTL_TIMEOUT = 30 + + +@dataclass(frozen=True) +class TopPod: + name: str + cpu_raw: str + memory_raw: str + cpu_millicores: int | None + memory_bytes: int | None + + +@dataclass(frozen=True) +class ProcessSample: + pid: int + ppid: int | None + rss_kib: int + command: str + + +@dataclass(frozen=True) +class ProcessSampleResult: + samples: dict[str, list[ProcessSample]] + errors: dict[str, str] + + +def parse_cpu_millicores(value: str) -> int | None: + value = value.strip() + if not value: + return None + if value.endswith("m"): + number = value[:-1] + return int(number) if number.isdigit() else None + if value.isdigit(): + return int(value) * 1000 + return None + + +def parse_memory_bytes(value: str) -> int | None: + value = value.strip() + if not value: + return None + + suffixes = { + "Ki": 1024, + "Mi": 1024**2, + "Gi": 1024**3, + "Ti": 1024**4, + "K": 1000, + "M": 1000**2, + "G": 1000**3, + "T": 1000**4, + } + + for suffix, multiplier in suffixes.items(): + if value.endswith(suffix): + number = value[: -len(suffix)] + try: + return int(Decimal(number) * multiplier) + except InvalidOperation: + return None + + try: + return int(value) + except ValueError: + return None + + +def format_mib(value: int | None) -> str: + if value is None: + return "-" + return f"{value / 1024 / 1024:.1f} MiB" + + +def run_kubectl( + args: list[str], *, kubectl: str, timeout: int = DEFAULT_KUBECTL_TIMEOUT +) -> str: + completed = subprocess.run( + [kubectl, *args], + check=True, + capture_output=True, + text=True, + timeout=timeout, + ) + return completed.stdout + + +def parse_top_pods(output: str) -> list[TopPod]: + pods: list[TopPod] = [] + for raw_line in output.splitlines(): + line = raw_line.strip() + if not line: + continue + parts = line.split() + if parts and parts[0].upper() == "NAME": + continue + if len(parts) < 3: + continue + name, cpu_raw, memory_raw = parts[:3] + pods.append( + TopPod( + name=name, + cpu_raw=cpu_raw, + memory_raw=memory_raw, + cpu_millicores=parse_cpu_millicores(cpu_raw), + memory_bytes=parse_memory_bytes(memory_raw), + ) + ) + return pods + + +def parse_processes(output: str, *, limit: int) -> list[ProcessSample]: + if limit < 1: + raise ValueError("process limit must be greater than 0") + + processes: list[ProcessSample] = [] + for raw_line in output.splitlines(): + line = raw_line.strip() + if not line: + continue + parts = line.split(maxsplit=3) + if parts and parts[0].upper() == "PID": + continue + if len(parts) < 4: + continue + pid_raw, ppid_raw, rss_raw, command = parts + try: + pid = int(pid_raw) + rss_kib = int(rss_raw) + except ValueError: + continue + try: + ppid = int(ppid_raw) + except ValueError: + ppid = None + processes.append( + ProcessSample(pid=pid, ppid=ppid, rss_kib=rss_kib, command=command) + ) + processes.sort(key=lambda process: process.rss_kib, reverse=True) + return processes[:limit] + + +def _container_resources(pod: dict[str, Any]) -> dict[str, Any]: + resources: dict[str, Any] = {} + for container in pod.get("spec", {}).get("containers", []): + name = container.get("name", "") + if not name: + continue + resources[name] = { + "image": container.get("image", ""), + "requests": container.get("resources", {}).get("requests", {}), + "limits": container.get("resources", {}).get("limits", {}), + } + return resources + + +def merge_pod_data( + top_pods: list[TopPod], pod_json: dict[str, Any] +) -> list[dict[str, Any]]: + pod_items = pod_json.get("items", []) if isinstance(pod_json, dict) else [] + metadata_by_name = { + pod.get("metadata", {}).get("name"): pod + for pod in pod_items + if pod.get("metadata", {}).get("name") + } + + rows: list[dict[str, Any]] = [] + for top in top_pods: + pod = metadata_by_name.get(top.name, {}) + metadata = pod.get("metadata", {}) + status = pod.get("status", {}) + rows.append( + { + "name": top.name, + "cpu": { + "raw": top.cpu_raw, + "millicores": top.cpu_millicores, + }, + "memory": { + "raw": top.memory_raw, + "bytes": top.memory_bytes, + "mib": None + if top.memory_bytes is None + else round(top.memory_bytes / 1024 / 1024, 2), + }, + "phase": status.get("phase", ""), + "start_time": status.get("startTime", ""), + "labels": metadata.get("labels", {}), + "containers": _container_resources(pod), + "processes": [], + } + ) + return rows + + +def attach_process_samples( + pods: list[dict[str, Any]], + process_samples: dict[str, list[ProcessSample]], +) -> list[dict[str, Any]]: + for pod in pods: + samples = process_samples.get(pod["name"], []) + pod["processes"] = [ + { + "pid": sample.pid, + "ppid": sample.ppid, + "rss_kib": sample.rss_kib, + "rss_mib": round(sample.rss_kib / 1024, 2), + "command": sample.command, + } + for sample in samples + ] + return pods + + +def build_report( + *, + namespace: str, + selector: str, + sample: str, + top_pods: list[TopPod], + pod_json: dict[str, Any], + process_samples: dict[str, list[ProcessSample]] | None = None, + process_errors: dict[str, str] | None = None, +) -> dict[str, Any]: + pods = merge_pod_data(top_pods, pod_json) + if process_samples: + pods = attach_process_samples(pods, process_samples) + memory_values = [ + pod["memory"]["bytes"] for pod in pods if pod["memory"]["bytes"] is not None + ] + cpu_values = [ + pod["cpu"]["millicores"] for pod in pods if pod["cpu"]["millicores"] is not None + ] + unparsed_memory_count = len(pods) - len(memory_values) + unparsed_cpu_count = len(pods) - len(cpu_values) + + return { + "schema_version": 1, + "captured_at": datetime.now(timezone.utc).isoformat(), # noqa: UP017 - keep Python 3.10 compatibility. + "namespace": namespace, + "selector": selector, + "sample": sample, + "summary": { + "pod_count": len(pods), + "parsed_memory_count": len(memory_values), + "unparsed_memory_count": unparsed_memory_count, + "total_memory_bytes": sum(memory_values), + "total_memory_mib": round(sum(memory_values) / 1024 / 1024, 2), + "average_memory_mib": round( + (sum(memory_values) / len(memory_values)) / 1024 / 1024, 2 + ) + if memory_values + else None, + "max_memory_mib": round(max(memory_values) / 1024 / 1024, 2) + if memory_values + else None, + "parsed_cpu_count": len(cpu_values), + "unparsed_cpu_count": unparsed_cpu_count, + "total_cpu_millicores": sum(cpu_values), + "pods_with_process_samples": sum(1 for pod in pods if pod["processes"]), + "pods_with_process_sample_errors": len(process_errors or {}), + }, + "pods": pods, + "process_errors": process_errors or {}, + "notes": [ + "kubectl top reports Kubernetes/container working set memory, not exclusive RSS/PSS.", + "Process RSS samples are collected with ps inside the sandbox container and do not include all cgroup memory such as page cache.", + "Compare multiple samples: empty sandbox, after bash, after Python/Node, after artifact generation, and warm reuse.", + "Use identical workloads when comparing AIO with another sandbox backend.", + ], + } + + +def render_markdown(report: dict[str, Any]) -> str: + summary = report["summary"] + lines = [ + "# DeerFlow Sandbox Memory Profile", + "", + f"- Captured at: `{report['captured_at']}`", + f"- Namespace: `{report['namespace']}`", + f"- Selector: `{report['selector']}`", + f"- Sample: `{report['sample']}`", + f"- Pods: `{summary['pod_count']}`", + f"- Parsed memory samples: `{summary['parsed_memory_count']}`", + f"- Unparsed memory samples: `{summary['unparsed_memory_count']}`", + f"- Total memory: `{format_mib(summary['total_memory_bytes'])}`", + f"- Average memory: `{summary['average_memory_mib']} MiB`" + if summary["average_memory_mib"] is not None + else "- Average memory: `-`", + f"- Max memory: `{summary['max_memory_mib']} MiB`" + if summary["max_memory_mib"] is not None + else "- Max memory: `-`", + f"- Total CPU: `{summary['total_cpu_millicores']}m`", + f"- Pods with process samples: `{summary['pods_with_process_samples']}`", + f"- Pods with process sample errors: `{summary['pods_with_process_sample_errors']}`", + "", + "| Pod | Phase | CPU | Memory | Start Time |", + "| --- | --- | ---: | ---: | --- |", + ] + + for pod in report["pods"]: + lines.append( + "| {name} | {phase} | {cpu} | {memory} | {start_time} |".format( + name=pod["name"], + phase=pod["phase"] or "-", + cpu=pod["cpu"]["raw"], + memory=pod["memory"]["raw"], + start_time=pod["start_time"] or "-", + ) + ) + + sampled_pods = [pod for pod in report["pods"] if pod["processes"]] + if sampled_pods: + lines.extend(["", "## Top Processes"]) + for pod in sampled_pods: + lines.extend( + [ + "", + f"### {pod['name']}", + "", + "| PID | PPID | RSS | Command |", + "| ---: | ---: | ---: | --- |", + ] + ) + for process in pod["processes"]: + lines.append( + "| {pid} | {ppid} | {rss} | `{command}` |".format( + pid=process["pid"], + ppid=process["ppid"] if process["ppid"] is not None else "-", + rss=format_mib(process["rss_kib"] * 1024), + command=str(process["command"]) + .replace("`", "'") + .replace("|", "\\|"), + ) + ) + + if report["process_errors"]: + lines.extend(["", "## Process Sample Errors"]) + for pod_name, error in sorted(report["process_errors"].items()): + lines.append(f"- `{pod_name}`: {error}") + + lines.extend(["", "## Notes"]) + lines.extend(f"- {note}" for note in report["notes"]) + lines.append("") + return "\n".join(lines) + + +def collect_process_samples( + top_pods: list[TopPod], + *, + namespace: str, + kubectl: str, + limit: int, + kubectl_timeout: int = DEFAULT_KUBECTL_TIMEOUT, +) -> ProcessSampleResult: + samples: dict[str, list[ProcessSample]] = {} + errors: dict[str, str] = {} + command = ( + "ps -eo pid,ppid,rss,args --sort=-rss 2>/dev/null || ps -eo pid,ppid,rss,args" + ) + for pod in top_pods: + try: + output = run_kubectl( + ["exec", "-n", namespace, pod.name, "--", "sh", "-c", command], + kubectl=kubectl, + timeout=kubectl_timeout, + ) + except subprocess.CalledProcessError as exc: + errors[pod.name] = (exc.stderr or str(exc)).strip() + continue + except subprocess.TimeoutExpired as exc: + errors[pod.name] = f"kubectl exec timed out after {exc.timeout} seconds" + continue + samples[pod.name] = parse_processes(output, limit=limit) + return ProcessSampleResult(samples=samples, errors=errors) + + +def collect( + namespace: str, + selector: str, + sample: str, + kubectl: str, + *, + include_processes: bool = False, + process_limit: int = 10, + kubectl_timeout: int = DEFAULT_KUBECTL_TIMEOUT, +) -> dict[str, Any]: + if process_limit < 1: + raise ValueError("--process-limit must be greater than 0") + if kubectl_timeout < 1: + raise ValueError("--kubectl-timeout must be greater than 0") + top_output = run_kubectl( + ["top", "pod", "-n", namespace, "-l", selector, "--no-headers"], + kubectl=kubectl, + timeout=kubectl_timeout, + ) + top_pods = parse_top_pods(top_output) + pods_output = run_kubectl( + ["get", "pods", "-n", namespace, "-l", selector, "-o", "json"], + kubectl=kubectl, + timeout=kubectl_timeout, + ) + process_result = None + if include_processes: + process_result = collect_process_samples( + top_pods, + namespace=namespace, + kubectl=kubectl, + limit=process_limit, + kubectl_timeout=kubectl_timeout, + ) + return build_report( + namespace=namespace, + selector=selector, + sample=sample, + top_pods=top_pods, + pod_json=json.loads(pods_output), + process_samples=process_result.samples if process_result else None, + process_errors=process_result.errors if process_result else None, + ) + + +def parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--namespace", + default=DEFAULT_NAMESPACE, + help=f"Kubernetes namespace (default: {DEFAULT_NAMESPACE})", + ) + parser.add_argument( + "--selector", + default=DEFAULT_SELECTOR, + help=f"Pod label selector (default: {DEFAULT_SELECTOR})", + ) + parser.add_argument( + "--sample", + default="unspecified", + help="Human-readable sample label, such as empty, after-bash, after-python", + ) + parser.add_argument("--kubectl", default="kubectl", help="kubectl executable path") + parser.add_argument( + "--format", + choices=("json", "markdown"), + default="markdown", + help="Output format", + ) + parser.add_argument( + "--include-processes", + action="store_true", + help="Run kubectl exec ps in each sandbox pod and include top process RSS samples", + ) + parser.add_argument( + "--process-limit", + type=int, + default=10, + help="Maximum processes to include per pod when --include-processes is set", + ) + parser.add_argument( + "--kubectl-timeout", + type=int, + default=DEFAULT_KUBECTL_TIMEOUT, + help=f"Timeout in seconds for each kubectl call (default: {DEFAULT_KUBECTL_TIMEOUT})", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(list(sys.argv[1:] if argv is None else argv)) + try: + report = collect( + namespace=args.namespace, + selector=args.selector, + sample=args.sample, + kubectl=args.kubectl, + include_processes=args.include_processes, + process_limit=args.process_limit, + kubectl_timeout=args.kubectl_timeout, + ) + except subprocess.CalledProcessError as exc: + print(exc.stderr or str(exc), file=sys.stderr) + return exc.returncode or 1 + except subprocess.TimeoutExpired as exc: + print(f"kubectl timed out after {exc.timeout} seconds", file=sys.stderr) + return 1 + except ValueError as exc: + print(str(exc), file=sys.stderr) + return 2 + + if args.format == "json": + print(json.dumps(report, indent=2, sort_keys=True)) + else: + print(render_markdown(report)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())