diff --git a/CHANGES.next.md b/CHANGES.next.md index 6fb1ae6a83..6ad419f217 100644 --- a/CHANGES.next.md +++ b/CHANGES.next.md @@ -285,7 +285,8 @@ - Re-enable support for Rocky Linux 8, 9, and 10 for the Azure provider. - Add Ubuntu 26.04 support for GCP, AWS, and Azure Providers. - Add a kubernetes-native benchmark for MySQL using sysbench -- Add `kafka_benchmark` support. +- Add kubernetes_management benchmark for measuring GKE/EKS/AKS management + plane API responsiveness. ### Enhancements: diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py index 9f44b52c78..596a324d04 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_management_benchmark.py @@ -13,46 +13,553 @@ # limitations under the License. """Benchmark for Kubernetes management plane operations. -TODO: Add comments & implement. +Measures GKE/EKS/AKS control-plane API responsiveness via two scenarios: + concurrent_node_pool_ops: concurrent node-pool create/delete. + overlapping_cluster_update: node-pool create overlapping a cluster update. + +Optimizations for minimum run time: + - Reduced poll_interval in provider WaitForOperation (5s vs 10s) + - Per-op threads capped at _MAX_CONCURRENT to avoid OS limits + - Accurate delete success rate via attempted_ops denominator """ -from typing import Any +import copy +import dataclasses +import statistics +import threading +import time +from typing import Callable +from absl import flags +from absl import logging +from perfkitbenchmarker import background_tasks from perfkitbenchmarker import benchmark_spec as bm_spec from perfkitbenchmarker import configs +from perfkitbenchmarker import errors from perfkitbenchmarker import sample +from perfkitbenchmarker.configs import benchmark_config_spec +from perfkitbenchmarker.resources.container_service import ( + container as container_lib, +) +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_cluster + +_SLEEP_POD_NAME = "pkb-mgmt-sleep" -BENCHMARK_NAME = 'kubernetes_management' +BENCHMARK_NAME = "kubernetes_management" BENCHMARK_CONFIG = """ kubernetes_management: description: > Benchmarks GKE/EKS/AKS management plane operations: concurrent node pool - create/upgrade/delete, overlapping cluster + node-pool ops, and large-scale - provisioning. Focused on control-plane API responsiveness. + create/delete, and overlapping cluster + node-pool ops. Focused on + control-plane API responsiveness. container_cluster: type: Kubernetes vm_count: 1 + vm_spec: *default_dual_core """ +# Scenarios measured by this benchmark (select via --k8s_mgmt_scenarios): +# concurrent_node_pool_ops: concurrently create and delete N node pools; +# measures control-plane throughput under parallel ops. +# overlapping_cluster_update: run a cluster update and a node-pool create +# simultaneously; measures behaviour when a cluster-scoped op overlaps a +# node-pool-scoped one. +_VALID_SCENARIOS = frozenset({ + "concurrent_node_pool_ops", + "overlapping_cluster_update", +}) + +# ── Shared flags (apply across all scenarios) ── +_SCENARIOS = flags.DEFINE_list( + "k8s_mgmt_scenarios", + [ + "concurrent_node_pool_ops", + "overlapping_cluster_update", + ], + "Comma-separated subset of scenarios to run. Valid values: " + + "concurrent_node_pool_ops, overlapping_cluster_update.", +) +_NODES_PER_NODEPOOL = flags.DEFINE_integer( + "k8s_mgmt_nodes_per_nodepool", + 2, + "Number of nodes per node pool. Google spec: 2 nodes per pool.", +) +_MAX_CONCURRENT = flags.DEFINE_integer( + "k8s_mgmt_max_concurrent", + 50, + "Cap on concurrent provider API calls within a batch. " + + "Higher = faster but more aggressive on connection pools.", +) + +# ── concurrent_node_pool_ops flags ── +_CONCURRENT_NODEPOOLS = flags.DEFINE_integer( + "k8s_mgmt_concurrent_nodepools", + 5, + "Number of node pools to create and delete concurrently in the " + + "concurrent_node_pool_ops scenario.", +) +_INITIAL_VERSION = flags.DEFINE_string( + "k8s_mgmt_initial_version", + None, + "Kubernetes version for newly-created node pools (N-1). None = auto.", +) + +# AKS caps node-pool names at 12 chars — keep all names within that limit. +_PREFIX = "pkbm" + + +def _ConcurrentPoolName(i): + """Returns the i-th concurrent-ops pool name. + + Three-digit zero-padded so names stay within AKS's 12-char node-pool limit. + """ + return f"{_PREFIX}a{i:03d}" + -def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: - """Returns the configuration of a benchmark.""" +_OVERLAPPING_POOL_NAME = f"{_PREFIX}b" + + +@dataclasses.dataclass +class OpTiming: + """Latency of a single async management-plane operation. + + Pure timing data — the metric name is supplied by the sample builder, and + failures abort the run rather than being recorded here (so there is no + error field). + + Attributes: + initiation_latency: Seconds from issuing the async API call until it is + accepted and an operation handle is returned (time to *start*). + end_to_end_latency: Seconds from issuing the call until the operation + fully completes (initiation plus server-side execution). + """ + + initiation_latency: float + end_to_end_latency: float + + +def GetConfig(user_config): return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) -def CheckPrerequisites(benchmark_config: bm_spec.BenchmarkSpec) -> None: - del benchmark_config +def CheckPrerequisites( + benchmark_config: benchmark_config_spec.BenchmarkConfigSpec, +): + """Validates flag values and cluster type before any cloud calls.""" + invalid = [s for s in _SCENARIOS.value if s.strip() not in _VALID_SCENARIOS] + if invalid: + raise errors.Config.InvalidValue( + f"Invalid value(s) for --k8s_mgmt_scenarios: {invalid}. " + + f"Valid options: {sorted(_VALID_SCENARIOS)}." + ) + if benchmark_config.container_cluster.type != "Kubernetes": + raise errors.Config.InvalidValue( + "kubernetes_management benchmark requires a Kubernetes" + + " container cluster." + ) def Prepare(benchmark_spec: bm_spec.BenchmarkSpec) -> None: - del benchmark_spec + """Deploys a sleep pod to confirm data-plane reachability.""" + cluster = benchmark_spec.container_cluster + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + benchmark_spec.always_call_cleanup = True + logging.info( + "kubernetes_management Prepare: cluster=%s, version=%s", + cluster.name, + cluster.k8s_version, + ) + # Spec workload: "a simple container that sleeps for a given time". + # Confirms data-plane reachability; generates no data-plane load. + kubectl.RunKubectlCommand( + [ + "run", + _SLEEP_POD_NAME, + "--image=busybox", + "--restart=Never", + "--", + "sleep", + "86400", + ], + ) + + +def _ClearNodePools(cluster: kubernetes_cluster.KubernetesCluster) -> None: + """Clears all pkbm* node pools, blocking until each delete completes. + + Called after each scenario so the next one starts from a clean cluster, + and from Cleanup() as a final best-effort teardown. + """ + pools = [n for n in cluster.GetNodePoolNames() if n.startswith(_PREFIX)] + if not pools: + logging.info("Clear: no pkbm* pools present — cluster is clean.") + return + logging.info("Clear: deleting %d pkbm* pools: %s", len(pools), pools) + background_tasks.RunThreaded(cluster.DeleteNodePool, pools) def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: - del benchmark_spec - return [] + """Runs the selected scenarios and returns flat list of samples.""" + cluster = benchmark_spec.container_cluster + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + + # Resolve the initial node-pool version once; log clearly; tag every sample. + initial = _INITIAL_VERSION.value + source = "flag" if initial else "auto-resolved" + if not initial: + initial, _ = cluster.ResolveNodePoolVersions() + assert initial is not None + + logging.info( + "NodePool version (%s): initial=%s " + + "(cluster k8s_version=%s) | nodes_per_pool=%d | machine_type=%s", + source, + initial, + cluster.k8s_version, + _NODES_PER_NODEPOOL.value, + cluster.default_nodepool.machine_type + if hasattr(cluster, "default_nodepool") + else "unknown", + ) + + scenarios = {s.strip() for s in _SCENARIOS.value} + samples: list[sample.Sample] = [] + + if "concurrent_node_pool_ops" in scenarios: + samples += _RunConcurrentNodePoolOps(cluster, initial) + # Each scenario leaves the cluster clean for the next one. + _ClearNodePools(cluster) + if "overlapping_cluster_update" in scenarios: + samples += _RunOverlappingClusterUpdate(cluster, initial) + _ClearNodePools(cluster) + + # Tag all samples with version path and run config for published results. + run_meta = { + "initial_version": str(initial), + "cluster_k8s_version": str(cluster.k8s_version), + "nodes_per_nodepool": str(_NODES_PER_NODEPOOL.value), + "concurrent_nodepools": str(_CONCURRENT_NODEPOOLS.value), + } + for s in samples: + s.metadata.update(run_meta) + + return samples def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None: - del benchmark_spec + """Best-effort delete of leftover benchmark node pools and sleep pod.""" + cluster = benchmark_spec.container_cluster + if cluster is None: + return + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + kubectl.RunKubectlCommand( + ["delete", "pod", _SLEEP_POD_NAME, "--ignore-not-found"], + raise_on_failure=False, + ) + # Final teardown reuses the same sweep the scenarios use. + _ClearNodePools(cluster) + + +def _RunConcurrentNodePoolOps( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, +) -> list[sample.Sample]: + """Concurrent CreateNodePool then DeleteNodePool.""" + n = _CONCURRENT_NODEPOOLS.value + logging.info("concurrent_node_pool_ops: %d pools, initial=%s", n, initial) + pool_names = [_ConcurrentPoolName(i) for i in range(n)] + configs_ = [_MakeNodePoolConfig(cluster, name) for name in pool_names] + samples: list[sample.Sample] = [] + + # ── Phase 1: concurrent creates (fail-hard — any failure aborts) ──────── + create_results = _RunAsync( + kickoff=lambda cfg: cluster.CreateNodePoolAsync( + cfg, node_version=initial + ), + wait_fn=cluster.WaitForOperation, + items=configs_, + get_name=lambda cfg: cfg.name, + ) + samples += _OpSamples("ConcurrentOps_Create", create_results) + + # ── Phase 2: concurrent deletes (live-list; all creates succeeded) ────── + alive = _LiveNodePoolNames(cluster, f"{_PREFIX}a") + logging.info("concurrent_node_pool_ops: deleting %d pools", len(alive)) + delete_results = _RunAsync( + kickoff=cluster.DeleteNodePoolAsync, + wait_fn=cluster.WaitForOperation, + items=alive, + get_name=str, + ) + samples += _OpSamples("ConcurrentOps_Delete", delete_results) + return samples + + +def _RunOverlappingClusterUpdate( + cluster: kubernetes_cluster.KubernetesCluster, + initial: str, +) -> list[sample.Sample]: + """CreateNodePool fired concurrently with a long-running cluster update. + + Both ops kick off async on separate threads; initiation + E2E latency + recorded independently. Overlap window = ClusterUpdate E2E latency. + """ + logging.info( + "overlapping_cluster_update: cluster update + node-pool create" + ) + cfg = _MakeNodePoolConfig(cluster, _OVERLAPPING_POOL_NAME) + results = ThreadSafeResults() + + def DoClusterUpdate(): + timing = _TimedAsync(cluster.UpdateClusterAsync, cluster.WaitForOperation) + results.add("OverlappingUpdate_ClusterUpdate", timing) + logging.info( + "overlapping_cluster_update ClusterUpdate: init=%.2fs e2e=%.2fs", + timing.initiation_latency, + timing.end_to_end_latency, + ) + + def DoCreate(): + timing = _TimedAsync( + lambda: cluster.CreateNodePoolAsync(cfg, node_version=initial), + cluster.WaitForOperation, + ) + results.add("OverlappingUpdate_NodePoolCreate", timing) + logging.info( + "overlapping_cluster_update NodePoolCreate: init=%.2fs e2e=%.2fs", + timing.initiation_latency, + timing.end_to_end_latency, + ) + + background_tasks.RunThreaded(lambda fn: fn(), [DoClusterUpdate, DoCreate]) + + samples: list[sample.Sample] = [] + for name, timing in results.entries: + samples += _OpSamples(name, [(name, timing)]) + + # Remove test pool (best-effort). + cluster.DeleteNodePool(_OVERLAPPING_POOL_NAME) + return samples + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class ThreadSafeResults: + """Thread-safe collector of (name, OpTiming) pairs from concurrent ops.""" + + def __init__(self): + self._lock = threading.Lock() + self.entries: list[tuple[str, OpTiming]] = [] + self.failed: list[str] = [] + + def add(self, name: str, timing: OpTiming) -> None: + with self._lock: + self.entries.append((name, timing)) + + def add_failure(self, name: str) -> None: + with self._lock: + self.failed.append(name) + + +def _TimedAsync( + kickoff: Callable[[], str], + wait_fn: Callable[[str], None], +) -> OpTiming: + """Runs kickoff() then wait_fn(handle); returns the OpTiming. + + Lets exceptions propagate — a failed management-plane op aborts the + benchmark rather than being silently absorbed. initiation_latency is the + time for kickoff() to return (API accepted); end_to_end_latency is total + wall time including the wait. + """ + init_start = time.monotonic() + handle = kickoff() + initiation_latency = time.monotonic() - init_start + wait_fn(handle) + end_to_end_latency = time.monotonic() - init_start + return OpTiming(initiation_latency, end_to_end_latency) + + +def _RunAsync( + kickoff: Callable, + wait_fn: Callable[[str], None], + items: list, + get_name: Callable[[object], str], +) -> list[tuple[str, OpTiming]]: + """Fires kickoff(item) concurrently; returns (name, OpTiming) per item. + + Fail-hard: any op that raises aborts the run (RunThreaded propagates the + exception). Used by the create/upgrade/delete scenarios where a single + failure is a benchmark failure. Uses a concurrency cap for streaming + execution — completed ops free their slot immediately for the next one. + """ + if not items: + return [] + results = ThreadSafeResults() + cap = min(len(items), _MAX_CONCURRENT.value) + + def DoWrap(item): + timing = _TimedAsync(lambda: kickoff(item), wait_fn) + name = get_name(item) + results.add(name, timing) + logging.info( + "%s initiation=%.2fs end_to_end=%.2fs", + name, + timing.initiation_latency, + timing.end_to_end_latency, + ) + + background_tasks.RunThreaded(DoWrap, items, max_concurrent_threads=cap) + return results.entries + + +def _MakeNodePoolConfig( + cluster: kubernetes_cluster.KubernetesCluster, + name: str, +) -> container_lib.BaseNodePoolConfig: + """Builds a node-pool config from the cluster's default pool.""" + cfg = copy.copy(cluster.default_nodepool) + cfg.name = name + cfg.num_nodes = _NODES_PER_NODEPOOL.value + cfg.min_nodes = _NODES_PER_NODEPOOL.value + cfg.max_nodes = _NODES_PER_NODEPOOL.value + return cfg + + +def _LiveNodePoolNames( + cluster: kubernetes_cluster.KubernetesCluster, prefix: str +) -> list[str]: + """Returns current node-pool names matching the given prefix.""" + return [p for p in cluster.GetNodePoolNames() if p.startswith(prefix)] + + +def _OpSamples( + metric_prefix: str, + results: list[tuple[str, OpTiming]], +) -> list[sample.Sample]: + """Per-op + aggregate latency samples for fail-hard scenarios. + + Every op in `results` succeeded (a failure would have aborted the run), so + there is no success-rate or error accounting here — just initiation and + end-to-end latency per op, plus aggregate stats. + + Args: + metric_prefix: prefix for all metric names. + results: (name, OpTiming) pairs from _RunAsync. + """ + samples: list[sample.Sample] = [] + init_latencies: list[float] = [] + e2e_latencies: list[float] = [] + + for name, timing in results: + meta = {"operation_name": name} + init_latencies.append(timing.initiation_latency) + e2e_latencies.append(timing.end_to_end_latency) + samples.append( + sample.Sample( + f"{metric_prefix}_InitiationLatency", + timing.initiation_latency, + "seconds", + dict(meta), + ) + ) + samples.append( + sample.Sample( + f"{metric_prefix}_EndToEndLatency", + timing.end_to_end_latency, + "seconds", + dict(meta), + ) + ) + + samples += _AggregateAndOutlierSamples( + metric_prefix, init_latencies, e2e_latencies + ) + return samples + + +def _AggregateAndOutlierSamples( + metric_prefix: str, + init_latencies: list[float], + e2e_latencies: list[float], +) -> list[sample.Sample]: + """Emits aggregate stats (>=2 samples) and outlier counts (>=4 samples).""" + samples: list[sample.Sample] = [] + for phase_label, latencies in ( + ("InitiationLatency", init_latencies), + ("EndToEndLatency", e2e_latencies), + ): + if len(latencies) >= 2: + samples += _AggregateSamples(metric_prefix, phase_label, latencies) + if len(latencies) >= 4: + samples += _OutlierSamples(metric_prefix, phase_label, latencies) + return samples + + +def _AggregateSamples( + metric_prefix: str, phase_label: str, latencies: list[float] +) -> list[sample.Sample]: + """Emits Mean/StdDev/Min/Median/P90/P99/Max samples for a latency series.""" + n = len(latencies) + meta = {"sample_count": str(n)} + + # statistics.quantiles with method='inclusive' matches linear interpolation + # and returns n-1 cut points; index 89→P90, 98→P99. + quantiles = statistics.quantiles(latencies, n=100, method="inclusive") + + stats = [ + ("Mean", statistics.mean(latencies)), + ("StdDev", statistics.pstdev(latencies)), + ("Min", min(latencies)), + ("Median", statistics.median(latencies)), + ("P90", quantiles[89]), + ("P99", quantiles[98]), + ("Max", max(latencies)), + ] + result = [] + for label, value in stats: + result.append( + sample.Sample( + f"{metric_prefix}_{phase_label}_{label}", + value, + "seconds", + dict(meta), + ) + ) + return result + + +def _OutlierSamples( + metric_prefix: str, phase_label: str, latencies: list[float] +) -> list[sample.Sample]: + """Emits a single OutlierCount sample using IQR-fence outlier detection.""" + # statistics.quantiles(n=4) returns [Q1, Q2, Q3]; indices 0 and 2. + quartiles = statistics.quantiles(latencies, n=4, method="inclusive") + q1, q3 = quartiles[0], quartiles[2] + iqr = q3 - q1 + lower_fence = q1 - 1.5 * iqr + upper_fence = q3 + 1.5 * iqr + outlier_count = sum( + 1 for v in latencies if v < lower_fence or v > upper_fence + ) + meta = { + "q1": str(q1), + "q3": str(q3), + "iqr": str(iqr), + "upper_fence": str(upper_fence), + "lower_fence": str(lower_fence), + "sample_count": str(len(latencies)), + } + return [ + sample.Sample( + f"{metric_prefix}_{phase_label}_OutlierCount", + outlier_count, + "count", + meta, + ) + ] diff --git a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py index a57a43057a..753e1ec30f 100644 --- a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py +++ b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py @@ -375,9 +375,6 @@ def GetNodePoolNames(self) -> list[str]: nodegroups = json.loads(stdout) return [ng['Name'] for ng in nodegroups] - def AddNodepool(self, batch_name, pool_id): - pass - class EksCluster(BaseEksCluster): """Class representing an Elastic Kubernetes Service cluster.""" diff --git a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py index 9e20f57583..4cc386cd5c 100644 --- a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py +++ b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py @@ -1,5 +1,6 @@ """Classes related to KubernetesCluster.""" +import abc import functools import json import logging @@ -10,7 +11,9 @@ from perfkitbenchmarker import vm_util from perfkitbenchmarker.configs import container_spec as container_spec_lib from perfkitbenchmarker.resources import kubernetes_inference_server -from perfkitbenchmarker.resources.container_service import container as container_lib +from perfkitbenchmarker.resources.container_service import ( + container as container_lib, +) from perfkitbenchmarker.resources.container_service import container_cluster from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes @@ -297,9 +300,127 @@ def _GetAddressFromIngress(self, ingress_out: str): ) return 'http://' + ip.strip() - def AddNodepool(self, batch_name: str, pool_id: str): - """Adds an additional nodepool with the given name to the cluster.""" - pass + def AddNodepool(self, batch_name: str, pool_id: str) -> None: + """Adds a node pool; delegates to CreateNodePool for standard clusters. + + Karpenter-based subclasses override this to apply a manifest instead. + """ + nodepool_config = container_lib.BaseNodePoolConfig( + self.nodepools[container_cluster.DEFAULT_NODEPOOL].vm_spec, + name=f'{batch_name}-{pool_id}', + ) + self.CreateNodePool(nodepool_config) + + def CreateNodePool( + self, + nodepool_config: container_lib.BaseNodePoolConfig, + node_version: str | None = None, + ) -> None: + """Creates a single named node pool on the cluster (blocks until ready). + + Args: + nodepool_config: Node pool definition (name, machine type, node count). + node_version: Optional Kubernetes version to pin the node pool to. None + means use the cluster default. + """ + raise NotImplementedError + + def DeleteNodePool(self, name: str) -> None: + """Deletes the named node pool (blocks until removed).""" + raise NotImplementedError + + def UpgradeNodePool(self, name: str, target_version: str) -> None: + """Upgrades the named node pool to the given Kubernetes version.""" + raise NotImplementedError + + def UpdateCluster(self) -> None: + """Performs a lightweight cluster-level update operation (blocks). + + Intended for management-plane benchmarks that need to overlap a real + cluster-level operation with a node-pool operation. The implementation + should issue a control-plane mutation (so an actual operation runs) that + is non-destructive and idempotent across repeated invocations. + """ + raise NotImplementedError + + def CreateNodePoolAsync( + self, + nodepool_config: container_lib.BaseNodePoolConfig, + node_version: str | None = None, + ) -> str: + """Initiates node-pool create; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str: + """Initiates node-pool upgrade; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def DeleteNodePoolAsync(self, name: str) -> str: + """Initiates node-pool delete; returns opaque op handle. Does NOT wait.""" + raise NotImplementedError + + def UpdateClusterAsync(self) -> str: + """Initiates cluster-level update. Returns op handle; does NOT wait.""" + raise NotImplementedError + + @abc.abstractmethod + def GetNodePoolNames(self) -> list[str]: + """Returns the names of all node pools currently in the cluster. + + Used by the kubernetes_management benchmark to: + - Sweep stale pkbm* pools before each run (clean-start spec requirement) + - Re-list live pools after creates before deleting (avoids stale names) + """ + + def WaitForOperation(self, op_handle: str) -> None: + """Blocks until the operation identified by op_handle completes. + + Args: + op_handle: provider-specific opaque string from one of the *Async + methods above. + + Raises: + errors.Resource.RetryableCreationError or similar on timeout/failure. + """ + raise NotImplementedError + + def ResolveNodePoolVersions(self) -> tuple[str, str]: + """Returns (initial, target) K8s versions per benchmark spec. + + Spec contract: + target = cluster's current K8s version (the latest available) + initial = the adjacent minor below target (e.g., target=1.35 -> 1.34) + Default implementation returns bare-minor strings ("1.34", "1.35") which + EKS and AKS accept directly. Providers requiring fully-qualified versions + (notably GKE) must override. + """ + target = BareMinor(self.k8s_version) + initial = AdjacentMinorBelow(self.k8s_version) + return initial, target + + +def BareMinor(version: str) -> str: + """Returns the 'major.minor' part of a K8s version string. + + Accepts and normalizes formats like 'v1.35.4', '1.35.4-gke.1234', '1.35'. + """ + if version.startswith('v'): + version = version[1:] + bare = version.split('-', 1)[0] + parts = bare.split('.') + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): + raise ValueError(f'Cannot parse K8s version: {version!r}') + return f'{parts[0]}.{parts[1]}' + + +def AdjacentMinorBelow(version: str) -> str: + """Returns the bare minor one below the given version: '1.35.4' -> '1.34'.""" + bare = BareMinor(version) + major_s, minor_s = bare.split('.') + minor = int(minor_s) + if minor <= 0: + raise ValueError(f'No adjacent minor below {version!r}') + return f'{major_s}.{minor - 1}' def _DeleteAllFromDefaultNamespace(): diff --git a/tests/container_service_mock.py b/tests/container_service_mock.py index 5fb9829c38..98960c848d 100644 --- a/tests/container_service_mock.py +++ b/tests/container_service_mock.py @@ -26,6 +26,9 @@ def _Create(self): def _Delete(self): pass + def GetNodePoolNames(self) -> list[str]: + return [] + def CreateTestKubernetesCluster( container_cluster_spec: container_spec.ContainerClusterSpec | None = None, diff --git a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py index a58ad03497..d2afdd72f1 100644 --- a/tests/linux_benchmarks/kubernetes_management_benchmark_test.py +++ b/tests/linux_benchmarks/kubernetes_management_benchmark_test.py @@ -11,28 +11,824 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Tests for linux_benchmarks.kubernetes_management_benchmark.""" +import threading +import time import unittest + +# pylint: disable=invalid-name,protected-access from unittest import mock -from perfkitbenchmarker import benchmark_spec as bm_spec_lib +from absl import flags +from absl.testing import flagsaver +from absl.testing import parameterized +from perfkitbenchmarker import errors +from perfkitbenchmarker import sample from perfkitbenchmarker.linux_benchmarks import kubernetes_management_benchmark +from perfkitbenchmarker.resources.container_service import kubernetes_cluster from tests import pkb_common_test_case +FLAGS = flags.FLAGS + +_CLUSTER_NAME = 'test-cluster' + + +def _make_sample(metric, value, unit='seconds', metadata=None): + return sample.Sample(metric, value, unit, metadata or {}) + -class KubernetesManagementBenchmarkTestCase( - pkb_common_test_case.PkbCommonTestCase +def _make_mock_cluster( + name=_CLUSTER_NAME, + k8s_version='1.34', + pool_names=None, ): + """Creates a fully-stubbed KubernetesCluster mock for use in tests.""" + cluster = mock.create_autospec( + kubernetes_cluster.KubernetesCluster, instance=True + ) + cluster.name = name + cluster.k8s_version = k8s_version + cluster.cluster_version = k8s_version + cluster.GetNodePoolNames.return_value = pool_names or [] + cluster.ResolveNodePoolVersions.return_value = ('1.33', '1.34') + cluster.CreateNodePoolAsync.return_value = 'op-create-1' + cluster.UpgradeNodePoolAsync.return_value = 'op-upgrade-1' + cluster.DeleteNodePoolAsync.return_value = 'op-delete-1' + cluster.UpdateClusterAsync.return_value = 'op-update-1' + cluster.WaitForOperation.return_value = None + default_np = mock.MagicMock() + default_np.machine_type = 'e2-standard-2' + default_np.num_nodes = 1 + default_np.min_nodes = 1 + default_np.max_nodes = 1 + default_np.zone = 'us-central1-a' + default_np.disk_size = 100 + default_np.name = 'default-pool' + cluster.default_nodepool = default_np + return cluster + + +def _make_mock_benchmark_spec(cluster=None): + spec = mock.MagicMock() + spec.container_cluster = cluster or _make_mock_cluster() + return spec + + +def _make_mock_config(cluster_type='Kubernetes'): + cfg = mock.MagicMock() + cfg.container_cluster.type = cluster_type + return cfg + + +class NodePoolNameTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the node-pool name-generation helpers.""" + + @parameterized.named_parameters( + ('zero', 0, 'pkbma000'), + ('two_digit', 42, 'pkbma042'), + ('max_three_digit', 999, 'pkbma999'), + ) + def testConcurrentPoolNameZeroPadsToThreeDigits(self, index, expected): + self.assertEqual( + expected, kubernetes_management_benchmark._ConcurrentPoolName(index) + ) + + def testOverlappingPoolNameIsConstant(self): + self.assertEqual( + 'pkbmb', kubernetes_management_benchmark._OVERLAPPING_POOL_NAME + ) + + def testAllNamesWithinAksLimit(self): + for i in range(1000): + self.assertLessEqual( + len(kubernetes_management_benchmark._ConcurrentPoolName(i)), 12 + ) + self.assertLessEqual( + len(kubernetes_management_benchmark._OVERLAPPING_POOL_NAME), 12 + ) + + +class CheckPrerequisitesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the CheckPrerequisites validation function.""" + + def testValidScenariosPass(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=[ + 'concurrent_node_pool_ops', + 'overlapping_cluster_update', + ] + ): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testInvalidScenarioRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['X']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testMixedValidInvalidRaises(self): + with flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops', 'Z'] + ): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + def testNonKubernetesClusterTypeRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['concurrent_node_pool_ops']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites( + _make_mock_config(cluster_type='Mesos') + ) + + def testLowercaseScenarioRaises(self): + with flagsaver.flagsaver(k8s_mgmt_scenarios=['a']): + with self.assertRaises(errors.Config.InvalidValue): + kubernetes_management_benchmark.CheckPrerequisites(_make_mock_config()) + + +class PrepareTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Prepare benchmark lifecycle function.""" + + def _patch_kubectl(self, rc=0): + return mock.patch( + 'perfkitbenchmarker.resources.container_service.kubectl' + + '.RunKubectlCommand', + return_value=('', '', rc), + ) + + def testPrepareRunsKubectlSleepPod(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl() as mock_kubectl: + kubernetes_management_benchmark.Prepare(bm_spec) + mock_kubectl.assert_called_once() + args = mock_kubectl.call_args[0][0] + self.assertIn('run', args) + self.assertIn('pkb-mgmt-sleep', args) + self.assertIn('sleep', args) + + def testPrepareSetsAlwaysCallCleanup(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Prepare(bm_spec) + self.assertTrue(bm_spec.always_call_cleanup) + + def testPrepareToleratesKubectlNonZeroReturn(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(rc=1): + kubernetes_management_benchmark.Prepare(bm_spec) + + +class CleanupTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Cleanup benchmark lifecycle function.""" + + def _patch_kubectl(self): + return mock.patch( + 'perfkitbenchmarker.resources.container_service.kubectl' + + '.RunKubectlCommand', + return_value=('', '', 0), + ) + + def testCleanupDeletesSleepPod(self): + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl() as mock_kubectl: + kubernetes_management_benchmark.Cleanup(bm_spec) + delete_calls = [ + str(c) + for c in mock_kubectl.call_args_list + if 'pkb-mgmt-sleep' in str(c) + ] + self.assertNotEmpty(delete_calls) + + def testCleanupDeletesAllPkbmPrefixedPools(self): + cluster = _make_mock_cluster( + pool_names=['pkbma000', 'default-pool', 'pkbmc0001'] + ) + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Cleanup(bm_spec) + deleted = {c.args[0] for c in cluster.DeleteNodePool.call_args_list} + self.assertIn('pkbma000', deleted) + self.assertIn('pkbmc0001', deleted) + self.assertNotIn('default-pool', deleted) + + def testCleanupSkipsDeleteWhenNoLeftoverPools(self): + cluster = _make_mock_cluster(pool_names=['default-pool']) + bm_spec = _make_mock_benchmark_spec(cluster) + with self._patch_kubectl(): + kubernetes_management_benchmark.Cleanup(bm_spec) + cluster.DeleteNodePool.assert_not_called() + + def testCleanupHandlesNoneCluster(self): + bm_spec = _make_mock_benchmark_spec() + bm_spec.container_cluster = None + kubernetes_management_benchmark.Cleanup(bm_spec) + + +class ClearNodePoolsTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _ClearNodePools helper function.""" + + def testDeletesStalePkbmPools(self): + cluster = _make_mock_cluster( + pool_names=['pkbma000', 'pkbmc0001', 'user-pool'] + ) + kubernetes_management_benchmark._ClearNodePools(cluster) + deleted = {c.args[0] for c in cluster.DeleteNodePool.call_args_list} + self.assertIn('pkbma000', deleted) + self.assertIn('pkbmc0001', deleted) + self.assertNotIn('user-pool', deleted) + + def testDoesNothingWhenNoPkbmPools(self): + cluster = _make_mock_cluster(pool_names=['user-pool', 'default-pool']) + kubernetes_management_benchmark._ClearNodePools(cluster) + cluster.DeleteNodePool.assert_not_called() + + def testClearRaisesOnGetNodePoolNamesException(self): + cluster = _make_mock_cluster() + cluster.GetNodePoolNames.side_effect = RuntimeError('API error') + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._ClearNodePools(cluster) + + +class ThreadSafeResultsTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the ThreadSafeResults collector.""" + + def testAddSingleEntry(self): + r = kubernetes_management_benchmark.ThreadSafeResults() + r.add('op1', kubernetes_management_benchmark.OpTiming(0.1, 1.0)) + self.assertLen(r.entries, 1) + name, timing = r.entries[0] + self.assertEqual('op1', name) + self.assertAlmostEqual(0.1, timing.initiation_latency, places=5) + self.assertAlmostEqual(1.0, timing.end_to_end_latency, places=5) + + def testAddFailureRecordsName(self): + r = kubernetes_management_benchmark.ThreadSafeResults() + r.add_failure('bad-op') + self.assertEqual(['bad-op'], r.failed) + self.assertEmpty(r.entries) + + def testAddIsThreadSafe(self): + """Tests that concurrent add() calls from multiple threads are safe.""" + r = kubernetes_management_benchmark.ThreadSafeResults() + n = 100 + + def _add(i): + r.add( + f'op{i}', + kubernetes_management_benchmark.OpTiming(float(i), float(i) * 2), + ) + + threads = [threading.Thread(target=_add, args=(i,)) for i in range(n)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertLen(r.entries, n) + + +class TimedAsyncTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _TimedAsync timing helper.""" + + def testSuccessfulKickoffAndWait(self): + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(return_value=None) + timing = kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + kickoff.assert_called_once() + wait_fn.assert_called_once_with('op-handle') + self.assertGreaterEqual(timing.initiation_latency, 0.0) + self.assertGreaterEqual( + timing.end_to_end_latency, timing.initiation_latency + ) + + def testKickoffFailurePropagates(self): + exc = RuntimeError('kickoff failed') + kickoff = mock.Mock(side_effect=exc) + wait_fn = mock.Mock() + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + wait_fn.assert_not_called() + + def testWaitFailurePropagates(self): + exc = RuntimeError('wait failed') + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(side_effect=exc) + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + + def testInitLatencyNotGreaterThanE2eLatency(self): + kickoff = mock.Mock(return_value='handle') + wait_fn = mock.Mock(side_effect=lambda _: time.sleep(0.01)) + timing = kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + self.assertLessEqual(timing.initiation_latency, timing.end_to_end_latency) + + def testHandlePassedToWaitFn(self): + kickoff = mock.Mock(return_value='my-op-handle') + wait_fn = mock.Mock() + kubernetes_management_benchmark._TimedAsync(kickoff, wait_fn) + wait_fn.assert_called_once_with('my-op-handle') + + +class RunAsyncTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _RunAsync fail-hard concurrent execution helper.""" + + def testEmptyItemsReturnsEmptyList(self): + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(), + wait_fn=mock.Mock(), + items=[], + get_name=str, + ) + self.assertEmpty(results) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testReturnsOneResultPerItem(self): + kickoff = mock.Mock(return_value='op-handle') + wait_fn = mock.Mock(return_value=None) + results = kubernetes_management_benchmark._RunAsync( + kickoff=kickoff, wait_fn=wait_fn, items=['a', 'b', 'c'], get_name=str + ) + self.assertLen(results, 3) + self.assertEqual({'a', 'b', 'c'}, {name for name, _ in results}) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testKickoffErrorPropagates(self): + """Fail-hard: a failing op raises rather than being captured.""" + kickoff = mock.Mock(side_effect=RuntimeError('kaboom')) + with self.assertRaises(Exception): + kubernetes_management_benchmark._RunAsync( + kickoff=kickoff, wait_fn=mock.Mock(), items=['x'], get_name=str + ) + + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=2) + def testConcurrencyCapDoesNotDropItems(self): + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(return_value='op'), + wait_fn=mock.Mock(return_value=None), + items=list(range(5)), + get_name=str, + ) + self.assertLen(results, 5) - def setUp(self): - super().setUp() - self.bm_spec = mock.create_autospec( - bm_spec_lib.BenchmarkSpec, instance=True + @flagsaver.flagsaver(k8s_mgmt_max_concurrent=50) + def testGetNameCallableApplied(self): + cfg = mock.MagicMock() + cfg.name = 'poolname' + results = kubernetes_management_benchmark._RunAsync( + kickoff=mock.Mock(return_value='h'), + wait_fn=mock.Mock(), + items=[cfg], + get_name=lambda c: c.name, ) + name, _ = results[0] + self.assertEqual('poolname', name) - def testRun(self): - samples = kubernetes_management_benchmark.Run(self.bm_spec) - self.assertEqual(samples, []) + +class MakeNodePoolConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _MakeNodePoolConfig factory.""" + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=3) + def testNameIsSet(self): + cluster = _make_mock_cluster() + cfg = kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'mypool') + self.assertEqual('mypool', cfg.name) + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=3) + def testNumNodesComesFromFlag(self): + cluster = _make_mock_cluster() + cfg = kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'p') + self.assertEqual(3, cfg.num_nodes) + self.assertEqual(3, cfg.min_nodes) + self.assertEqual(3, cfg.max_nodes) + + @flagsaver.flagsaver(k8s_mgmt_nodes_per_nodepool=1) + def testDoesNotMutateDefaultNodepool(self): + cluster = _make_mock_cluster() + original_name = cluster.default_nodepool.name + kubernetes_management_benchmark._MakeNodePoolConfig(cluster, 'newname') + self.assertEqual(original_name, cluster.default_nodepool.name) + + +class OpSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _OpSamples latency-only sample helper (fail-hard path).""" + + def testEmptyResultsYieldsNoSamples(self): + samples = kubernetes_management_benchmark._OpSamples('PrefixOp', []) + self.assertEmpty(samples) + + def testPerOpInitiationAndE2eSamplesGenerated(self): + results = [ + ('op1', kubernetes_management_benchmark.OpTiming(0.1, 1.0)), + ('op2', kubernetes_management_benchmark.OpTiming(0.2, 2.0)), + ] + samples = kubernetes_management_benchmark._OpSamples('MyOp', results) + metrics = [s.metric for s in samples] + self.assertIn('MyOp_InitiationLatency', metrics) + self.assertIn('MyOp_EndToEndLatency', metrics) + + def testNoSuccessRateOrCountMetrics(self): + """Fail-hard path emits no SuccessRate/count metrics (B1).""" + results = [ + ('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0)), + ('op2', kubernetes_management_benchmark.OpTiming(0.5, 1.5)), + ] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + metrics = {s.metric for s in samples} + self.assertNotIn('Op_SuccessRate', metrics) + self.assertNotIn('Op_TotalOps', metrics) + + def testOperationNameInMetadata(self): + results = [('mypool', kubernetes_management_benchmark.OpTiming(1.0, 2.0))] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + init_s = next(s for s in samples if s.metric == 'Op_InitiationLatency') + self.assertEqual('mypool', init_s.metadata['operation_name']) + + def testAggregatesGeneratedForTwoOrMore(self): + results = [ + ( + f'op{i}', + kubernetes_management_benchmark.OpTiming(float(i), float(i) * 2), + ) + for i in range(1, 4) + ] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + metrics = [s.metric for s in samples] + self.assertIn('Op_InitiationLatency_Mean', metrics) + self.assertIn('Op_EndToEndLatency_Mean', metrics) + + def testAggregatesNotGeneratedForSingle(self): + results = [('op1', kubernetes_management_benchmark.OpTiming(1.0, 2.0))] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + self.assertNotIn('Op_InitiationLatency_Mean', [s.metric for s in samples]) + + def testOutliersGeneratedForFourOrMore(self): + results = [ + ( + f'op{i}', + kubernetes_management_benchmark.OpTiming(float(i), float(i) * 2), + ) + for i in range(1, 6) + ] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + metrics = [s.metric for s in samples] + self.assertIn('Op_InitiationLatency_OutlierCount', metrics) + + def testOutliersNotGeneratedForThreeOrFewer(self): + results = [ + ( + f'op{i}', + kubernetes_management_benchmark.OpTiming(float(i), float(i) * 2), + ) + for i in range(1, 4) + ] + samples = kubernetes_management_benchmark._OpSamples('Op', results) + self.assertNotIn( + 'Op_InitiationLatency_OutlierCount', [s.metric for s in samples] + ) + + +class AggregateSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _AggregateSamples statistics helper.""" + + def testProducesAllExpectedStatMetrics(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Pfx', 'InitiationLatency', [1.0, 2.0, 3.0, 4.0, 5.0] + ) + metrics = {s.metric for s in samples} + for label in ('Mean', 'StdDev', 'Min', 'Median', 'P90', 'P99', 'Max'): + self.assertIn(f'Pfx_InitiationLatency_{label}', metrics) + + def testMeanValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] + ) + mean_s = next(s for s in samples if 'Mean' in s.metric) + self.assertAlmostEqual(3.0, mean_s.value, places=3) + + def testMinValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [10.0, 20.0, 30.0] + ) + min_s = next(s for s in samples if 'Min' in s.metric) + self.assertAlmostEqual(10.0, min_s.value, places=3) + + def testMaxValueCorrect(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [10.0, 20.0, 30.0] + ) + max_s = next(s for s in samples if 'Max' in s.metric) + self.assertAlmostEqual(30.0, max_s.value, places=3) + + def testSampleCountInMetadata(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0] + ) + for s in samples: + self.assertEqual('3', s.metadata.get('sample_count')) + + def testUnitsAreSeconds(self): + samples = kubernetes_management_benchmark._AggregateSamples( + 'Op', 'E2E', [1.0, 2.0] + ) + for s in samples: + self.assertEqual('seconds', s.unit) + + +class OutlierSamplesTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the _OutlierSamples IQR-based outlier detection helper.""" + + def testNoOutliersYieldsZeroCount(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 1.1, 1.2, 1.3, 1.4, 1.5] + ) + self.assertLen(samples, 1) + self.assertEqual(0, samples[0].value) + + def testClearOutlierDetected(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 1.0, 1.0, 1.0, 100.0] + ) + self.assertEqual(1, samples[0].value) + + def testMetricNameFormatted(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'MyPrefix', 'InitiationLatency', [1.0, 2.0, 3.0, 4.0] + ) + self.assertEqual( + 'MyPrefix_InitiationLatency_OutlierCount', samples[0].metric + ) + + def testMetadataContainsFenceFields(self): + """Tests that outlier samples contain fence metadata fields.""" + meta = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] + )[0].metadata + for field in ( + 'q1', + 'q3', + 'iqr', + 'upper_fence', + 'lower_fence', + 'sample_count', + ): + self.assertIn(field, meta) + + def testSampleCountInMetadata(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0, 5.0] + ) + self.assertEqual('5', samples[0].metadata['sample_count']) + + def testUnitIsCount(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', [1.0, 2.0, 3.0, 4.0] + ) + self.assertEqual('count', samples[0].unit) + + def testReturnsSingleSample(self): + samples = kubernetes_management_benchmark._OutlierSamples( + 'Op', 'E2E', list(range(1, 11)) + ) + self.assertLen(samples, 1) + + +class RunTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the Run benchmark entry-point function.""" + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + ) + def testRunOnlyScenarioACallsOnlyA(self): + """Run dispatches only to _RunConcurrentNodePoolOps for that scenario.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ) as mock_a, mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ) as mock_b: + kubernetes_management_benchmark.Run(bm_spec) + mock_a.assert_called_once() + mock_b.assert_not_called() + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['overlapping_cluster_update'], + ) + def testRunOnlyScenarioBCallsOnlyB(self): + """Run dispatches only to _RunOverlappingClusterUpdate for that scenario.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[], + ) as mock_a, mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ) as mock_b: + kubernetes_management_benchmark.Run(bm_spec) + mock_a.assert_not_called() + mock_b.assert_called_once() + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + ) + def testRunTagsAllSamplesWithRunMetadata(self): + """Tests that Run adds version and config keys to all sample metadata.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + test_sample = _make_sample('m', 1.0) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[test_sample], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + meta = samples[0].metadata + for key in ( + 'initial_version', + 'cluster_k8s_version', + 'nodes_per_nodepool', + 'concurrent_nodepools', + ): + self.assertIn(key, meta) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + k8s_mgmt_initial_version='1.30', + ) + def testRunUsesExplicitVersionFlags(self): + """Tests that Run uses explicit version flags over auto-resolved ones.""" + cluster = _make_mock_cluster() + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[_make_sample('m', 1.0)], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + cluster.ResolveNodePoolVersions.assert_not_called() + self.assertEqual('1.30', samples[0].metadata['initial_version']) + + @flagsaver.flagsaver( + k8s_mgmt_scenarios=['concurrent_node_pool_ops'], + ) + def testRunAutoResolvesVersionsWhenFlagsAbsent(self): + """Tests Run calls ResolveNodePoolVersions when version flags absent.""" + cluster = _make_mock_cluster() + cluster.ResolveNodePoolVersions.return_value = ('1.33', '1.34') + bm_spec = _make_mock_benchmark_spec(cluster) + with mock.patch.object( + kubernetes_management_benchmark, '_ClearNodePools' + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunConcurrentNodePoolOps', + return_value=[_make_sample('m', 1.0)], + ), mock.patch.object( + kubernetes_management_benchmark, + '_RunOverlappingClusterUpdate', + return_value=[], + ): + samples = kubernetes_management_benchmark.Run(bm_spec) + cluster.ResolveNodePoolVersions.assert_called_once() + self.assertEqual('1.33', samples[0].metadata['initial_version']) + + +class RunScenarioATest(pkb_common_test_case.PkbCommonTestCase): + """Tests the _RunConcurrentNodePoolOps create/delete path.""" + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testProducesCreateAndDeleteSamples(self): + """Tests Scenario A produces Create and Delete samples.""" + cluster = _make_mock_cluster(pool_names=['pkbma000', 'pkbma001']) + samples = kubernetes_management_benchmark._RunConcurrentNodePoolOps( + cluster, '1.33' + ) + metrics = {s.metric for s in samples} + self.assertTrue(any('ConcurrentOps_Create' in m for m in metrics)) + self.assertTrue(any('ConcurrentOps_Delete' in m for m in metrics)) + self.assertFalse(any('ConcurrentOps_Upgrade' in m for m in metrics)) + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testPassesInitialVersionToCreate(self): + """_RunConcurrentNodePoolOps passes initial_version to creates.""" + cluster = _make_mock_cluster(pool_names=['pkbma000', 'pkbma001']) + kubernetes_management_benchmark._RunConcurrentNodePoolOps(cluster, '1.33') + for call in cluster.CreateNodePoolAsync.call_args_list: + kw = call.kwargs if call.kwargs else {} + pos = call.args + node_version = kw.get('node_version') or ( + pos[1] if len(pos) > 1 else None + ) + self.assertEqual('1.33', node_version) + + @flagsaver.flagsaver( + k8s_mgmt_concurrent_nodepools=2, + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeleteUsesLivePoolList(self): + """_RunConcurrentNodePoolOps deletes only pools found at runtime.""" + cluster = _make_mock_cluster(pool_names=['pkbma000']) + kubernetes_management_benchmark._RunConcurrentNodePoolOps(cluster, '1.33') + self.assertEqual(1, cluster.DeleteNodePoolAsync.call_count) + + +class RunScenarioBTest(pkb_common_test_case.PkbCommonTestCase): + """Tests the _RunOverlappingClusterUpdate overlap scenario.""" + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testProducesClusterUpdateAndNodePoolCreateSamples(self): + """Overlap scenario emits both cluster-update and create samples.""" + cluster = _make_mock_cluster(pool_names=[]) + samples = kubernetes_management_benchmark._RunOverlappingClusterUpdate( + cluster, '1.33' + ) + metrics = {s.metric for s in samples} + self.assertTrue( + any('OverlappingUpdate_ClusterUpdate' in m for m in metrics) + ) + self.assertTrue( + any('OverlappingUpdate_NodePoolCreate' in m for m in metrics) + ) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeletesTestPoolAfterRun(self): + cluster = _make_mock_cluster(pool_names=[]) + kubernetes_management_benchmark._RunOverlappingClusterUpdate( + cluster, '1.33' + ) + cluster.DeleteNodePool.assert_called_once_with( + kubernetes_management_benchmark._OVERLAPPING_POOL_NAME + ) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testDeleteFailureRaisesInScenarioB(self): + cluster = _make_mock_cluster(pool_names=[]) + cluster.DeleteNodePool.side_effect = RuntimeError('delete failed') + with self.assertRaises(RuntimeError): + kubernetes_management_benchmark._RunOverlappingClusterUpdate( + cluster, '1.33' + ) + + @flagsaver.flagsaver( + k8s_mgmt_nodes_per_nodepool=1, + k8s_mgmt_max_concurrent=50, + ) + def testPassesInitialVersionToCreate(self): + """_RunOverlappingClusterUpdate passes initial_version to the create.""" + cluster = _make_mock_cluster(pool_names=[]) + kubernetes_management_benchmark._RunOverlappingClusterUpdate( + cluster, '1.33' + ) + for call in cluster.CreateNodePoolAsync.call_args_list: + kw = call.kwargs if call.kwargs else {} + pos = call.args + node_version = kw.get('node_version') or ( + pos[1] if len(pos) > 1 else None + ) + self.assertEqual('1.33', node_version) if __name__ == '__main__': diff --git a/tests/linux_benchmarks/provision_container_cluster_benchmark_test.py b/tests/linux_benchmarks/provision_container_cluster_benchmark_test.py index 7b64d89124..e0ab7ccf31 100644 --- a/tests/linux_benchmarks/provision_container_cluster_benchmark_test.py +++ b/tests/linux_benchmarks/provision_container_cluster_benchmark_test.py @@ -26,6 +26,9 @@ def __init__(self): kubernetes_cluster.kubernetes_commands.GetEvents ) + def GetNodePoolNames(self) -> list[str]: + return [] + _CONTAINER_START_YAML = """ - apiVersion: v1 diff --git a/tests/traces/kubernetes_tracker_test.py b/tests/traces/kubernetes_tracker_test.py index 6034ff2765..96613d28aa 100644 --- a/tests/traces/kubernetes_tracker_test.py +++ b/tests/traces/kubernetes_tracker_test.py @@ -264,6 +264,9 @@ def _Create(self): def _Delete(self): pass + def GetNodePoolNames(self) -> list[str]: + return [] + def _CreateEvent( name: str, reason: str, timestamp: float