diff --git a/perfkitbenchmarker/configs/container_spec.py b/perfkitbenchmarker/configs/container_spec.py index 1f808ad066..60302de330 100644 --- a/perfkitbenchmarker/configs/container_spec.py +++ b/perfkitbenchmarker/configs/container_spec.py @@ -243,6 +243,7 @@ def __init__( self.vm_spec: virtual_machine_spec.BaseVmSpec self.machine_families: list[str] | None self.sandbox_config: SandboxSpec | None + self.swap_config: SwapConfigSpec | None @classmethod def _GetOptionDecoderConstructions(cls): @@ -273,6 +274,7 @@ def _GetOptionDecoderConstructions(cls): ), 'vm_spec': (spec.PerCloudConfigDecoder, {}), 'sandbox_config': (_SandboxDecoder, {'default': None}), + 'swap_config': (_SwapConfigDecoder, {'default': None}), }) return result @@ -333,6 +335,100 @@ def Decode(self, value, component_full_name, flag_values): return result +class SwapConfigSpec(spec.BaseSpec): + """Configurable swap options for a GKE/EKS nodepool. + + Declared in BENCHMARK_CONFIG under nodepools..swap_config. + Consumed by the cloud provider's _AddNodeParamsToCmd() / equivalent to + apply the cloud-specific swap configuration during nodepool creation. + + Attributes: + enabled: Whether to enable swap on the nodepool (default True). + swappiness: vm.swappiness sysctl value (0-200, default 100). + min_free_kbytes: vm.min_free_kbytes sysctl (default 67584, GKE minimum). + watermark_scale_factor: vm.watermark_scale_factor sysctl (default 500). + lssd: True if the nodepool uses local NVMe SSDs for the swap device. + lssd_count: Number of local NVMe SSDs (GKE dedicatedLocalSsdProfile). + boot_disk_iops: Provisioned IOPS for hyperdisk-balanced (0 = not set). + boot_disk_throughput: Provisioned throughput MiB/s for hyperdisk-balanced. + """ + + def __init__(self, *args, **kwargs): + self.enabled: bool = True + self.swappiness: int = 100 + self.min_free_kbytes: int = 67584 + self.watermark_scale_factor: int = 500 + self.lssd: bool = False + self.lssd_count: int = 0 + self.boot_disk_iops: int = 0 + self.boot_disk_throughput: int = 0 + super().__init__(*args, **kwargs) + + @classmethod + def _GetOptionDecoderConstructions(cls): + result = super()._GetOptionDecoderConstructions() + result.update({ + 'enabled': ( + option_decoders.BooleanDecoder, + {'default': True}, + ), + 'swappiness': ( + option_decoders.IntDecoder, + {'default': 100, 'min': 0, 'max': 200}, + ), + 'min_free_kbytes': ( + option_decoders.IntDecoder, + {'default': 200, 'min': 0}, + ), + 'watermark_scale_factor': ( + option_decoders.IntDecoder, + {'default': 500, 'min': 0}, + ), + 'lssd': ( + option_decoders.BooleanDecoder, + {'default': False}, + ), + 'lssd_count': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + 'boot_disk_iops': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + 'boot_disk_throughput': ( + option_decoders.IntDecoder, + {'default': 0, 'min': 0}, + ), + }) + return result + + +class _SwapConfigDecoder(option_decoders.TypeVerifier): + """Decodes the swap_config option of a NodepoolSpec.""" + + def Decode(self, value, component_full_name, flag_values): + """Decodes the swap_config dictionary into a SwapConfigSpec. + + Args: + value: dict. Keys match SwapConfigSpec._GetOptionDecoderConstructions. + component_full_name: str. Fully qualified name of the parent component. + flag_values: flags.FlagValues. Runtime flags propagated to BaseSpec. + + Returns: + SwapConfigSpec instance. + + Raises: + errors.Config.InvalidValue upon invalid input value. + """ + super().Decode(value, component_full_name, flag_values) + return SwapConfigSpec( + self._GetOptionFullName(component_full_name), + flag_values=flag_values, + **value, + ) + + class SandboxSpec(spec.BaseSpec): """Configurable options for sandboxed node pools.""" diff --git a/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 b/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 new file mode 100644 index 0000000000..29cacfb3ce --- /dev/null +++ b/perfkitbenchmarker/data/cluster/swap_encryption_daemonset.yaml.j2 @@ -0,0 +1,120 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{ ds_name }} + namespace: {{ ds_namespace }} + labels: + app: {{ ds_label }} +spec: + selector: + matchLabels: + app: {{ ds_label }} + template: + metadata: + labels: + app: {{ ds_label }} + spec: + hostPID: true + hostNetwork: true + # Pin to the benchmark nodepool — never schedule on the dummy default pool. + nodeSelector: + pkb_nodepool: {{ benchmark_nodepool }} + tolerations: + - operator: Exists + containers: + - name: benchmark + image: {{ image }} + command: + - bash + - -c + - | + echo "[pkb] Installing measurement tools..." + # Only the tools needed for Phase 1 (raw-device fio) and Phase 2 + # (CPU/I/O overhead) are installed here. Workload benchmarks + # (redis, opensearch, kernel-build) run in separate pods via + # existing PKB benchmark modules and are NOT installed here. + PKB_APT_OK=0 + for _attempt in 1 2 3; do + apt-get update -qq 2>&1 || true + DEBIAN_FRONTEND=noninteractive apt-get install -y -qq \ + fio \ + cryptsetup \ + mdadm \ + sysstat \ + nvme-cli \ + 2>&1 && PKB_APT_OK=1 && break + echo "[pkb] apt-get attempt $_attempt failed, retrying in 15s..." >&2 + sleep 15 + done + if [ "$PKB_APT_OK" != "1" ] || ! command -v fio >/dev/null 2>&1; then + echo "[pkb] FATAL: fio not installed after 3 attempts" >&2 + exit 1 + fi + echo "[pkb] fio installed: $(fio --version 2>&1 | head -1)" + echo "[pkb] Verifying swap device is active..." + PKB_SWAP_FOUND=0 + for _attempt in $(seq 1 30); do + if awk 'NR>1{found=1} END{exit !found}' /proc/swaps 2>/dev/null; then + PKB_SWAP_DEV=$(awk 'NR==2{print $1}' /proc/swaps) + echo "[pkb] Swap device active: $PKB_SWAP_DEV" + PKB_SWAP_FOUND=1 + break + fi + echo "[pkb] Waiting for swap device (attempt $_attempt/30)..." >&2 + sleep 5 + done + if [ "$PKB_SWAP_FOUND" != "1" ]; then + echo "[pkb] WARNING: no active swap device after 150s — " \ + "check linuxConfig.swapConfig / kubelet swap config." >&2 + fi + echo "[pkb] Measurement tools ready. Writing ready sentinel." + touch /tmp/pkb_ready + sleep infinity + securityContext: + privileged: true + capabilities: + add: ["SYS_ADMIN", "IPC_LOCK"] + resources: + requests: + memory: "512Mi" + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + volumeMounts: + - name: dev + mountPath: /dev + - name: sys + mountPath: /sys + - name: run + mountPath: /run + - name: proc-host + mountPath: /proc-host + readOnly: true + - name: stateful-partition + mountPath: /mnt/stateful_partition + - name: lib-modules + mountPath: /lib/modules + readOnly: true + volumes: + - name: dev + hostPath: + path: /dev + - name: sys + hostPath: + path: /sys + - name: run + hostPath: + path: /run + - name: proc-host + hostPath: + path: /proc + - name: stateful-partition + hostPath: + path: /mnt/stateful_partition + type: DirectoryOrCreate + - name: lib-modules + hostPath: + path: /lib/modules + type: Directory diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_redis_memtier_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_redis_memtier_benchmark.py index d43c927bfe..08f88aca7f 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_redis_memtier_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_redis_memtier_benchmark.py @@ -19,12 +19,31 @@ Redis homepage: http://redis.io/ memtier_benchmark homepage: https://github.com/RedisLabs/memtier_benchmark + +Supported clouds: GCP (GKE) and AWS (EKS). Pass --cloud=AWS to run on EKS. + +Swap encryption toggle +---------------------- +Pass --kubernetes_redis_memtier_swap_enabled=true to run with dm-crypt +encrypted swap on the Redis servers nodepool. When the flag is set: + +* GetConfig() upgrades the servers nodepool to a high-memory machine type + (n4-highmem-32 on GCP, r6i.8xlarge on AWS), sets the appropriate boot + disk type (hyperdisk-balanced on GCP, gp3 on AWS), and injects a + swap_config block so the nodepool is provisioned with swap enabled. +* Prepare() deploys a privileged SwapDaemonSet onto the servers nodepool + that activates dm-crypt encrypted swap on every node. +* Run() attaches swap metadata (swap_enabled, swap_swappiness, + swap_machine_type, ...) to every sample. +* Cleanup() tears down the SwapDaemonSet. + +Note: EKS swap activation via nodeadm is deferred to PR #6780. """ from collections.abc import Callable, Iterable import itertools import textwrap -from typing import Any, NamedTuple +from typing import Any, NamedTuple, TypeVar from absl import flags from absl import logging @@ -36,25 +55,44 @@ from perfkitbenchmarker.linux_packages import redis_server from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_commands +from perfkitbenchmarker.resources.container_service import ( + swap_daemonset as _swap_daemonset_lib +) FLAGS = flags.FLAGS NAMESPACE = flags.DEFINE_string( 'kubernetes_redis_memtier_namespace', None, - 'Namespace to use for resources created by the benchmark. Intended for use' - ' in development and testing.', + ('Namespace to use for resources created by the benchmark. Intended for' + ' use in development and testing.'), ) SAVE = flags.DEFINE_string( 'kubernetes_redis_memtier_save', None, 'Value of the `save` directive in redis.conf.', ) +SWAP_ENABLED = flags.DEFINE_boolean( + 'kubernetes_redis_memtier_swap_enabled', + False, + ('If True, runs the Redis servers nodepool with dm-crypt encrypted swap. ' + 'Injects swap_config into the servers nodepool, deploys a SwapDaemonSet ' + 'in Prepare(), and attaches swap metadata to all samples in Run().'), +) +SWAP_SWAPPINESS = flags.DEFINE_integer( + 'kubernetes_redis_memtier_swap_swappiness', + 100, + 'Kernel vm.swappiness value when --kubernetes_redis_memtier_swap_enabled.', + lower_bound=0, + upper_bound=200, +) + BENCHMARK_NAME = 'kubernetes_redis_memtier' BENCHMARK_CONFIG = """ kubernetes_redis_memtier: description: > Run memtier_benchmark against Redis on Kubernetes. + Supports GCP (GKE) and AWS (EKS). Use --cloud=AWS to run on EKS. container_cluster: cloud: GCP type: Kubernetes @@ -65,15 +103,41 @@ vm_spec: GCP: machine_type: c4-standard-4 + AWS: + machine_type: m5.xlarge vm_count: 1 clients: vm_spec: GCP: machine_type: c4-standard-32 + AWS: + machine_type: c5.8xlarge vm_count: 1 """ +_SWAP_SERVERS_NODEPOOL = 'servers' +# GCP swap defaults: n4-highmem-32 + hyperdisk-balanced +_SWAP_MACHINE_TYPE = 'n4-highmem-32' +_SWAP_BOOT_DISK_TYPE = 'hyperdisk-balanced' +_SWAP_BOOT_DISK_SIZE = 500 +_SWAP_BOOT_DISK_IOPS = 160000 +_SWAP_BOOT_DISK_THROUGHPUT = 2400 +# AWS swap defaults: r6i.8xlarge (32 vCPU / 256 GiB) + gp3 +_SWAP_AWS_MACHINE_TYPE = 'r6i.8xlarge' +_SWAP_AWS_BOOT_DISK_TYPE = 'gp3' +_SWAP_AWS_BOOT_DISK_SIZE = 500 +_SWAP_AWS_BOOT_DISK_IOPS = 16000 +_SWAP_AWS_BOOT_DISK_THROUGHPUT = 1000 +# Shared sysctl defaults (GCP and AWS) +_SWAP_MIN_FREE_KBYTES = 67584 +_SWAP_WATERMARK_SCALE_FACTOR = 500 +_SWAP_DS_NAME = 'pkb-redis-memtier-swap' +_SWAP_DS_LABEL = 'pkb-redis-memtier-swap' +_SWAP_DS_IMAGE = 'ubuntu:22.04' +_SWAP_DS_NAMESPACE = 'kube-system' + _BenchmarkSpec = benchmark_spec.BenchmarkSpec +_T = TypeVar('_T') def CheckPrerequisites(_): @@ -89,7 +153,16 @@ def CheckPrerequisites(_): def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: - """Load and return benchmark config spec.""" + """Load and return benchmark config spec. + + When --kubernetes_redis_memtier_swap_enabled is True the servers nodepool is + upgraded to a high-memory machine type and a swap_config block is injected so + the nodepool provisions encrypted swap automatically. Supports GCP and AWS: + + GCP: n4-highmem-32 + hyperdisk-balanced (160k IOPS / 2400 MiB/s) + AWS: r6i.8xlarge + gp3 (16k IOPS / 1000 MiB/s); nodeadm swap deferred to + PR #6780 -- EksSwapConfig._Create() is a stub that logs a warning. + """ config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if FLAGS.redis_memtier_server_machine_type: @@ -102,11 +175,54 @@ def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: for cloud in vm_spec: vm_spec[cloud]['machine_type'] = FLAGS.redis_memtier_client_machine_type + if SWAP_ENABLED.value: + server_np = config['container_cluster']['nodepools'][_SWAP_SERVERS_NODEPOOL] + for cloud in server_np.get('vm_spec', {}): + if cloud == 'GCP': + # --redis_memtier_server_machine_type takes precedence over the swap + # default; only set the swap machine type if not explicitly overridden. + if not FLAGS.redis_memtier_server_machine_type: + server_np['vm_spec'][cloud]['machine_type'] = _SWAP_MACHINE_TYPE + server_np['vm_spec'][cloud]['boot_disk_type'] = _SWAP_BOOT_DISK_TYPE + server_np['vm_spec'][cloud]['boot_disk_size'] = _SWAP_BOOT_DISK_SIZE + elif cloud == 'AWS': + if not FLAGS.redis_memtier_server_machine_type: + server_np['vm_spec'][cloud]['machine_type'] = _SWAP_AWS_MACHINE_TYPE + server_np['vm_spec'][cloud]['boot_disk_type'] = _SWAP_AWS_BOOT_DISK_TYPE + server_np['vm_spec'][cloud]['boot_disk_size'] = _SWAP_AWS_BOOT_DISK_SIZE + # Shared sysctl settings apply to all clouds. + # boot_disk_iops/throughput are cloud-specific provisioned I/O params. + swap_config: dict[str, Any] = { + 'enabled': True, + 'swappiness': SWAP_SWAPPINESS.value, + 'min_free_kbytes': _SWAP_MIN_FREE_KBYTES, + 'watermark_scale_factor': _SWAP_WATERMARK_SCALE_FACTOR, + } + vm_spec = server_np.get('vm_spec', {}) + if 'GCP' in vm_spec: + swap_config['boot_disk_iops'] = _SWAP_BOOT_DISK_IOPS + swap_config['boot_disk_throughput'] = _SWAP_BOOT_DISK_THROUGHPUT + elif 'AWS' in vm_spec: + swap_config['boot_disk_iops'] = _SWAP_AWS_BOOT_DISK_IOPS + swap_config['boot_disk_throughput'] = _SWAP_AWS_BOOT_DISK_THROUGHPUT + server_np['swap_config'] = swap_config + return config def Prepare(_: _BenchmarkSpec) -> None: """Prepares a cluster to run the Redis benchmark.""" + # Deploy SwapDaemonSet before Redis so every servers node has encrypted swap + # active when the Redis pods schedule. + if SWAP_ENABLED.value: + _swap_daemonset_lib.SwapDaemonSet( + name=_SWAP_DS_NAME, + namespace=_SWAP_DS_NAMESPACE, + label=_SWAP_DS_LABEL, + nodepool=_SWAP_SERVERS_NODEPOOL, + image=_SWAP_DS_IMAGE, + ).Create() + # Set up the Kubernetes Service, Redis configuration, and Redis server. if not FLAGS['redis_server_io_threads_do_reads'].present: do_reads = None @@ -146,10 +262,16 @@ def _RunMemtier( kubectl_args.append(f'--namespace={NAMESPACE.value}') kubectl.RunKubectlCommand(kubectl_args) - server = f'redis.{NAMESPACE.value or 'default'}.svc.cluster.local' + server = f"redis.{NAMESPACE.value or 'default'}.svc.cluster.local" memtier_command = memtier.BuildMemtierCommand( server=server, - protocol=memtier.MEMTIER_PROTOCOL.value, + # Redis protocol required; MEMTIER_PROTOCOL defaults to memcache_binary + # which silently produces 0 ops against Redis. + protocol=( + memtier.MEMTIER_PROTOCOL.value + if FLAGS['memtier_protocol'].present + else 'redis' + ), clients=clients, threads=threads, ratio=memtier.MEMTIER_RATIO.value, @@ -195,9 +317,15 @@ def _RunMemtier( f'jobs/{job_name}', ['condition=Complete', 'condition=Failed'], namespace=NAMESPACE.value, - # Add 30 seconds to account for initial connection errors as Redis comes - # up. - timeout=memtier.MEMTIER_RUN_DURATION.value + 30, + # Add 30 seconds to account for initial connection errors as Redis + # comes up. When run_duration is None (request-count mode), fall back + # to 3600s to accommodate image pulls, pod scheduling, and large + # request counts. + timeout=( + memtier.MEMTIER_RUN_DURATION.value + 30 + if memtier.MEMTIER_RUN_DURATION.value is not None + else 3600 + ), ) if condition == 'condition=Failed': raise errors.Benchmarks.RunError(f"Memtier job '{job_name}' failed.") @@ -222,9 +350,9 @@ class MemtierRunConfig(NamedTuple): pipeline: int -def _CreateRunConfigMatrix[T]( - nt: Callable[..., T], **iterables: Iterable[Any] -) -> list[T]: +def _CreateRunConfigMatrix( + nt: Callable[..., _T], **iterables: Iterable[Any] +) -> list[_T]: """Creates a run matrix for the Redis benchmark. Args: @@ -243,6 +371,38 @@ def _CreateRunConfigMatrix[T]( return run_configs +def _SwapMetadata() -> dict[str, Any]: + """Returns metadata dict describing the active swap configuration. + + Picks cloud-specific machine type and disk constants based on FLAGS.cloud + so samples carry accurate metadata whether run on GCP or AWS. + """ + is_aws = FLAGS.cloud == 'AWS' + return { + 'swap_enabled': True, + 'swap_swappiness': SWAP_SWAPPINESS.value, + 'swap_machine_type': ( + _SWAP_AWS_MACHINE_TYPE if is_aws else _SWAP_MACHINE_TYPE + ), + 'swap_boot_disk_type': ( + _SWAP_AWS_BOOT_DISK_TYPE if is_aws else _SWAP_BOOT_DISK_TYPE + ), + 'swap_boot_disk_size_gb': ( + _SWAP_AWS_BOOT_DISK_SIZE if is_aws else _SWAP_BOOT_DISK_SIZE + ), + 'swap_boot_disk_iops': ( + _SWAP_AWS_BOOT_DISK_IOPS if is_aws else _SWAP_BOOT_DISK_IOPS + ), + 'swap_boot_disk_throughput': ( + _SWAP_AWS_BOOT_DISK_THROUGHPUT + if is_aws + else _SWAP_BOOT_DISK_THROUGHPUT + ), + 'swap_min_free_kbytes': _SWAP_MIN_FREE_KBYTES, + 'swap_watermark_scale_factor': _SWAP_WATERMARK_SCALE_FACTOR, + } + + def Run(_: _BenchmarkSpec) -> list[sample.Sample]: """Run the benchmark.""" samples: list[sample.Sample] = [] @@ -254,14 +414,27 @@ def Run(_: _BenchmarkSpec) -> list[sample.Sample]: pipeline=FLAGS.memtier_pipeline, ) + swap_meta = _SwapMetadata() if SWAP_ENABLED.value else {} + for run_config in run_configs: run_samples = _RunMemtier( run_config.clients, run_config.threads, run_config.pipeline ) + if swap_meta: + for s in run_samples: + s.metadata.update(swap_meta) samples.extend(run_samples) return samples def Cleanup(_: _BenchmarkSpec) -> None: - pass + """Tears down benchmark resources, including SwapDaemonSet if deployed.""" + if SWAP_ENABLED.value: + _swap_daemonset_lib.SwapDaemonSet( + name=_SWAP_DS_NAME, + namespace=_SWAP_DS_NAMESPACE, + label=_SWAP_DS_LABEL, + nodepool=_SWAP_SERVERS_NODEPOOL, + image=_SWAP_DS_IMAGE, + ).Delete() diff --git a/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py b/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py new file mode 100644 index 0000000000..4c023884cd --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/swap_encryption_benchmark.py @@ -0,0 +1,285 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""swap_encryption_benchmark: verifies encrypted swap on GKE/EKS nodepools. + +Architecture: + BENCHMARK_CONFIG declares a 'benchmark' nodepool with swap_config. + GkeCluster._AddNodeParamsToCmd() reads nodepool_config.swap_config and + applies --system-config-from-file (linuxConfig.swapConfig + sysctl) + sets + UBUNTU_CONTAINERD + boot-disk-provisioned-iops/throughput automatically + during cluster creation. No separate nodepool lifecycle management needed. + + Prepare() deploys a privileged SwapDaemonSet on the swap-enabled nodepool + for in-pod benchmark execution (fio / stress-ng / kernel build in later PRs). + + Run() verifies swap is active and dm-crypt encryption is configured, then + reports swap device metadata as PKB samples. + + Cleanup() is empty — PKB auto-deletes spec.resources (SwapDaemonSet). + +Subsequent PRs add phases: + PR3: fio microbenchmarks on raw swap device (Tier 1) + PR4: stress-ng CPU overhead + I/O interference (Tier 2) + PR5: kernel build under cgroup memory constraint (Phase 3b) +""" + +import logging +from typing import Any + +from absl import flags +from perfkitbenchmarker import benchmark_spec +from perfkitbenchmarker import configs +from perfkitbenchmarker import sample +from perfkitbenchmarker.resources.container_service import swap_daemonset + +FLAGS = flags.FLAGS + +BENCHMARK_NAME = 'swap_encryption' +BENCHMARK_CONFIG = """ +swap_encryption: + description: > + Verify dm-crypt encrypted swap on GKE/EKS. Subsequent PRs add fio, + stress-ng, and kernel build phases. + container_cluster: + cloud: GCP + type: Kubernetes + vm_count: 1 + vm_spec: + GCP: + machine_type: e2-medium + zone: us-central1-a + nodepools: + benchmark: + vm_count: 1 + vm_spec: + GCP: + machine_type: n4-highmem-32 + boot_disk_type: hyperdisk-balanced + boot_disk_size: 500 + zone: us-central1-a + swap_config: + enabled: true + swappiness: 100 + min_free_kbytes: 67584 + watermark_scale_factor: 500 + boot_disk_iops: 160000 + boot_disk_throughput: 2400 +""" + +_MACHINE_TYPE = flags.DEFINE_string( + 'swap_encryption_machine_type', + None, + 'Override machine type for the benchmark nodepool.', +) +_DISK_TYPE = flags.DEFINE_string( + 'swap_encryption_disk_type', + None, + 'Override disk type for the benchmark nodepool.', +) + +_DAEMONSET_IMAGE = flags.DEFINE_string( + 'swap_encryption_daemonset_image', + 'ubuntu:22.04', + 'Container image for the privileged benchmark DaemonSet.', +) + +_BenchmarkSpec = benchmark_spec.BenchmarkSpec +_BENCHMARK_NODEPOOL = 'benchmark' +_DEFAULT_POOL = 'default-pool' +_DS_NAME = 'pkb-swap-benchmark' +_DS_NAMESPACE = 'default' +_DS_LABEL = 'pkb-swap-benchmark' + + +def GetConfig(user_config: dict[str, Any]) -> dict[str, Any]: + """Load and return benchmark config spec.""" + config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) + nodepool = config['container_cluster']['nodepools'][_BENCHMARK_NODEPOOL] + if _MACHINE_TYPE.value: + for cloud in nodepool['vm_spec']: + nodepool['vm_spec'][cloud]['machine_type'] = _MACHINE_TYPE.value + if _DISK_TYPE.value: + for cloud in nodepool['vm_spec']: + nodepool['vm_spec'][cloud]['boot_disk_type'] = _DISK_TYPE.value + return config + + +def CheckPrerequisites(_) -> None: + """Verifies that benchmark setup is correct.""" + + +def Prepare(spec: _BenchmarkSpec) -> None: + """Deploys the privileged benchmark DaemonSet on the swap-enabled nodepool. + + The swap-enabled 'benchmark' nodepool is already created by GKE cluster + creation (swap_config declared in BENCHMARK_CONFIG). Prepare() deploys the + privileged DaemonSet used for in-pod command execution across all phases. + + After the DaemonSet pod is Running the dummy e2-medium default-pool is + deleted to stop its cost. + + Args: + spec: PKB BenchmarkSpec with spec.container_cluster already created. + """ + cluster = spec.container_cluster + daemonset = swap_daemonset.SwapDaemonSet( + name=_DS_NAME, + namespace=_DS_NAMESPACE, + label=_DS_LABEL, + nodepool=_BENCHMARK_NODEPOOL, + image=_DAEMONSET_IMAGE.value, + ) + daemonset.Create() + spec.resources.append(daemonset) + pod = daemonset.WaitForPod() + logging.info('[swap_encryption] Benchmark pod ready: %s', pod) + _delete_default_pool(cluster) + + +def Run(spec: _BenchmarkSpec) -> list[sample.Sample]: + """Verify swap is active and dm-crypt encryption is configured. + + Returns: + PKB samples: swap_active, swap_encrypted, swap_cipher, swap_total_kb. + """ + daemonset = _get_daemonset(spec) + daemonset.WaitForPod() + daemonset.oom_events.clear() + daemonset.pod_lost.clear() + + swap_dev = _detect_swap_device(daemonset) + base_meta = _build_metadata(daemonset, swap_dev) + results: list[sample.Sample] = [] + + # ── Verify swap is active ────────────────────────────────────────────────── + try: + swap_out, _ = daemonset.PodExec('cat /proc/swaps') + active = any( + l and not l.startswith('Filename') for l in swap_out.splitlines() + ) + results.append(sample.Sample('swap_active', int(active), 'bool', base_meta)) + logging.info('[swap_encryption] swap_active=%s /proc/swaps:\n%s', active, swap_out) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] Could not read /proc/swaps: %s', e) + + # ── Verify dm-crypt encryption ───────────────────────────────────────────── + if swap_dev: + try: + dm_out, _ = daemonset.PodExec( + f'dmsetup status {swap_dev} 2>/dev/null || echo not_encrypted' + ) + encrypted = 'crypt' in dm_out.lower() + cipher = _parse_cipher(dm_out) + meta = {**base_meta, 'dmsetup_status': dm_out.strip()[:200]} + results.append(sample.Sample('swap_encrypted', int(encrypted), 'bool', meta)) + if cipher: + results.append(sample.Sample('swap_cipher', 0, cipher, meta)) + logging.info('[swap_encryption] encrypted=%s cipher=%s', encrypted, cipher) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] dm-crypt check failed: %s', e) + + # ── Swap size ────────────────────────────────────────────────────────────── + try: + sz_out, _ = daemonset.PodExec( + "awk '/^SwapTotal/ {print $2}' /proc/meminfo" + ) + swap_kb = int(sz_out.strip() or '0') + results.append(sample.Sample('swap_total_kb', swap_kb, 'KB', base_meta)) + logging.info( + '[swap_encryption] SwapTotal: %d KB (%.1f GiB)', + swap_kb, swap_kb / 1024 / 1024, + ) + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] Could not read SwapTotal: %s', e) + + if daemonset.oom_events: + results.append( + sample.Sample('oom_events', len(daemonset.oom_events), 'count', base_meta) + ) + return results + + +def Cleanup(_: _BenchmarkSpec) -> None: + """Empty — PKB auto-deletes spec.resources (SwapDaemonSet).""" + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def _get_daemonset(spec: _BenchmarkSpec) -> swap_daemonset.SwapDaemonSet: + for r in spec.resources: + if isinstance(r, swap_daemonset.SwapDaemonSet): + return r + raise RuntimeError('[swap_encryption] SwapDaemonSet not found in spec.resources') + + +def _detect_swap_device(ds: swap_daemonset.SwapDaemonSet) -> str: + """Return the first active swap device name (e.g. 'dm-0') or ''.""" + try: + out, _ = ds.PodExec("awk 'NR>1 {print $1}' /proc/swaps") + dev = out.strip().split('\n')[0].strip() + return dev.split('/')[-1] if dev else '' + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] _detect_swap_device: %s', e) + return '' + + +def _build_metadata( + ds: swap_daemonset.SwapDaemonSet, swap_dev: str +) -> dict[str, Any]: + """Build base metadata dict for all samples.""" + meta: dict[str, Any] = {'swap_device': swap_dev or 'unknown'} + try: + kver, _ = ds.PodExec('uname -r') + meta['kernel_version'] = kver.strip() + except Exception: # pylint: disable=broad-except + pass + return meta + + +def _parse_cipher(dmsetup_status: str) -> str: + """Extract cipher name from dmsetup status output.""" + parts = dmsetup_status.split() + try: + idx = parts.index('crypt') + return parts[idx + 1] if idx + 1 < len(parts) else '' + except ValueError: + return '' + + +def _delete_default_pool(cluster) -> None: + """Delete the dummy e2-medium default-pool once the benchmark pod is Running. + + GKE requires at least one nodepool at cluster creation time; the e2-medium + default-pool satisfies that requirement. Deleting it before the DaemonSet + pod is Running can trigger a brief API-server timeout while two concurrent + nodepool operations are in progress. + """ + try: + cmd = cluster._GcloudCommand( # pylint: disable=protected-access + 'container', 'node-pools', 'delete', _DEFAULT_POOL, + '--cluster', cluster.name, + ) + cmd.args.append('--quiet') + logging.info('[swap_encryption] Deleting default nodepool: %s', _DEFAULT_POOL) + _, stderr, rc = cmd.Issue(timeout=300, raise_on_failure=False) + if rc != 0: + logging.warning( + '[swap_encryption] Could not delete default nodepool (rc=%d): %s', + rc, stderr, + ) + else: + logging.info('[swap_encryption] Default nodepool deleted') + except Exception as e: # pylint: disable=broad-except + logging.warning('[swap_encryption] _delete_default_pool failed: %s', e) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index f943a53ff1..86d8d7142a 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -37,6 +37,7 @@ from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_cluster from perfkitbenchmarker.resources.container_service import kubernetes_commands +from perfkitbenchmarker.resources.container_service import swap_config as swap_config_lib FLAGS = flags.FLAGS @@ -428,11 +429,15 @@ def _CreateNodePools(self): cmd = self._GcloudCommand( 'container', 'node-pools', 'create', name, '--cluster', self.name ) - self._AddNodeParamsToCmd( - nodepool, - cmd, - ) - self._IssueResourceCreationCommand(cmd) + self._AddNodeParamsToCmd(nodepool, cmd) + # If swap_config wrote a linuxConfig tempfile, clean it up after Issue(). + swap_cfg = getattr(nodepool, '_gke_swap_config', None) + try: + self._IssueResourceCreationCommand(cmd) + finally: + if swap_cfg is not None: + swap_cfg.CleanupYaml() + nodepool._gke_swap_config = None # pylint: disable=protected-access self._CreateCustomComputeClass(nodepool) def _CreateCustomComputeClass( @@ -570,13 +575,30 @@ def _AddNodeParamsToCmd( ): cmd.args.append('--enable-fast-socket') - if FLAGS.gke_node_system_config is not None: + # Per-nodepool swap config takes precedence over the global flag. + if nodepool_config.swap_config is not None: + gke_swap = swap_config_lib.GkeSwapConfig.from_spec(nodepool_config.swap_config) + cmd.flags['system-config-from-file'] = gke_swap.WriteLinuxConfigYaml() + # Store on nodepool so _CreateNodePools() can clean up the tempfile. + nodepool_config._gke_swap_config = gke_swap # pylint: disable=protected-access + # dm-crypt requires UBUNTU_CONTAINERD (Ajay r3472549985). + cmd.flags['image-type'] = 'UBUNTU_CONTAINERD' + # Prevent GKE from replacing the node after swap setup is complete. + cmd.args.append('--no-enable-autorepair') + sc = nodepool_config.swap_config + if sc.boot_disk_iops and not sc.lssd: + cmd.flags['boot-disk-provisioned-iops'] = sc.boot_disk_iops + cmd.flags['boot-disk-provisioned-throughput'] = ( + gke_swap.ValidHyperdiskThroughput() + ) + elif FLAGS.gke_node_system_config is not None: + # Fall back to global flag when no per-nodepool swap config is set. cmd.flags['system-config-from-file'] = FLAGS.gke_node_system_config if nodepool_config.sandbox_config is not None: cmd.flags['sandbox'] = nodepool_config.sandbox_config.ToSandboxFlag() - if self.image_type: + if self.image_type and 'image-type' not in cmd.flags: cmd.flags['image-type'] = self.image_type cmd.flags['node-labels'] = f'pkb_nodepool={nodepool_config.name}' diff --git a/perfkitbenchmarker/resources/container_service/container.py b/perfkitbenchmarker/resources/container_service/container.py index 3e05a1ec2b..b652eaab32 100644 --- a/perfkitbenchmarker/resources/container_service/container.py +++ b/perfkitbenchmarker/resources/container_service/container.py @@ -187,6 +187,10 @@ def __init__( # Defined by GceVirtualMachineConfig. Used by google_kubernetes_engine # pylint: disable=g-missing-from-attributes self.sandbox_config: container_spec_lib.SandboxSpec | None = None + # Set by container_cluster._InitializeNodePool() when NodepoolSpec + # declares swap_config. Consumed by _AddNodeParamsToCmd() in the cloud + # provider to apply swap configuration during nodepool creation. + self.swap_config: container_spec_lib.SwapConfigSpec | None = None self.max_local_disks: int | None self.ssd_interface: str | None self.threads_per_core: int diff --git a/perfkitbenchmarker/resources/container_service/container_cluster.py b/perfkitbenchmarker/resources/container_service/container_cluster.py index 9458662c98..ed67ff7adb 100644 --- a/perfkitbenchmarker/resources/container_service/container_cluster.py +++ b/perfkitbenchmarker/resources/container_service/container_cluster.py @@ -116,6 +116,7 @@ def _InitializeNodePool( nodepool_spec.machine_families, ) nodepool_config.sandbox_config = nodepool_spec.sandbox_config + nodepool_config.swap_config = nodepool_spec.swap_config nodepool_config.zone = zone nodepool_config.num_nodes = nodepool_spec.vm_count if nodepool_spec.min_vm_count is None: diff --git a/perfkitbenchmarker/resources/container_service/swap_config.py b/perfkitbenchmarker/resources/container_service/swap_config.py new file mode 100644 index 0000000000..b4a745b1e0 --- /dev/null +++ b/perfkitbenchmarker/resources/container_service/swap_config.py @@ -0,0 +1,323 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swap configuration as PKB BaseResource. + +Classes: BaseSwapConfig, GkeSwapConfig, EksSwapConfig. + +These resources encapsulate cloud-specific swap configuration for GKE and EKS +nodepools. They are referenced via NodepoolSpec.swap_config (declared in the +benchmark BENCHMARK_CONFIG YAML) and consumed by the cloud provider's +_AddNodeParamsToCmd() during cluster/nodepool creation. + +Class hierarchy: + BaseSwapConfig(BaseResource) -- common sysctl attrs + abstract from_spec() + GkeSwapConfig(BaseSwapConfig) -- linuxConfig YAML + for --system-config-from-file + EksSwapConfig(BaseSwapConfig) -- nodeadm kubelet config + (deferred to PR #6780) + +Usage in BENCHMARK_CONFIG: + container_cluster: + nodepools: + benchmark: + vm_spec: + GCP: + machine_type: n4-highmem-32 + boot_disk_type: hyperdisk-balanced + boot_disk_size: 500 + swap_config: + enabled: true + swappiness: 100 + min_free_kbytes: 67584 + watermark_scale_factor: 500 + boot_disk_iops: 160000 + boot_disk_throughput: 2400 + +GkeCluster._AddNodeParamsToCmd() creates a GkeSwapConfig from the +SwapConfigSpec and calls WriteLinuxConfigYaml() to obtain the path for +--system-config-from-file. No separate resource.Create() call is needed +for the swap config itself -- it is applied as part of nodepool creation. +""" + +import logging +import os +import tempfile + +from perfkitbenchmarker import resource + +# GCP Hyperdisk Balanced constraint: provisioned_iops <= 256 x throughput_MiB_s. +_HYPERDISK_MAX_IOPS_PER_MBPS = 256 + + +class BaseSwapConfig(resource.BaseResource): + """Abstract base class for cloud-specific nodepool swap configuration. + + Subclasses (GkeSwapConfig, EksSwapConfig) implement the cloud-specific + method for applying swap configuration during nodepool creation. + + Common sysctl attributes (vm.swappiness, vm.min_free_kbytes, + vm.watermark_scale_factor) are shared across all cloud providers. + + _Create() and _Delete() are no-ops: the swap config is applied as a + parameter to nodepool creation, not as a standalone cloud resource. + """ + + RESOURCE_TYPE = 'BaseSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 67584, + watermark_scale_factor: int = 500, + ) -> None: + super().__init__() + self.swappiness = swappiness + self.min_free_kbytes = min_free_kbytes + self.watermark_scale_factor = watermark_scale_factor + + @classmethod + def from_spec(cls, swap_spec) -> 'BaseSwapConfig': # pylint: disable=invalid-name + """Create a BaseSwapConfig subclass from a SwapConfigSpec. + + Subclasses must override this to instantiate with cloud-specific attrs. + """ + raise NotImplementedError( + f'{cls.__name__}.from_spec() must be implemented by subclasses.' + ) + + def _Create(self) -> None: + """No-op: swap config is applied during nodepool creation.""" + + def _Delete(self) -> None: + """No-op: cleaned up when the nodepool is deleted.""" + + +class GkeSwapConfig(BaseSwapConfig): + """GKE swap configuration for a nodepool. + + Encapsulates the linuxConfig (swapConfig + sysctl) YAML for + --system-config-from-file and optional Hyperdisk IOPS/throughput overrides. + + Consumed by GkeCluster._AddNodeParamsToCmd() when nodepool_config.swap_config + is set. + + Attributes: + swappiness: vm.swappiness sysctl value (0-200, default 100). + min_free_kbytes: vm.min_free_kbytes sysctl (default 67584, + GKE minimum >= 67584). + watermark_scale_factor: vm.watermark_scale_factor sysctl (default 500). + lssd: True if the nodepool uses local NVMe SSDs for swap device. + lssd_count: Number of local NVMe SSDs (dedicatedLocalSsdProfile.diskCount). + boot_disk_iops: Provisioned IOPS for hyperdisk-balanced (0 = not set). + boot_disk_throughput: Provisioned throughput MiB/s for hyperdisk-balanced. + """ + + RESOURCE_TYPE = 'GkeSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 67584, + watermark_scale_factor: int = 500, + lssd: bool = False, + lssd_count: int = 0, + boot_disk_iops: int = 0, + boot_disk_throughput: int = 0, + ) -> None: + super().__init__( + swappiness=swappiness, + min_free_kbytes=min_free_kbytes, + watermark_scale_factor=watermark_scale_factor, + ) + self.lssd = lssd + self.lssd_count = lssd_count + self.boot_disk_iops = boot_disk_iops + self.boot_disk_throughput = boot_disk_throughput + self._yaml_path: str | None = None + + @classmethod + def from_spec(cls, swap_spec) -> 'GkeSwapConfig': + """Create a GkeSwapConfig from a SwapConfigSpec.""" + return cls( + swappiness=swap_spec.swappiness, + min_free_kbytes=swap_spec.min_free_kbytes, + watermark_scale_factor=swap_spec.watermark_scale_factor, + lssd=swap_spec.lssd, + lssd_count=swap_spec.lssd_count, + boot_disk_iops=swap_spec.boot_disk_iops, + boot_disk_throughput=swap_spec.boot_disk_throughput, + ) + + def _Delete(self) -> None: + """Cleans up any written YAML tempfile.""" + self._CleanupYaml() + + def WriteLinuxConfigYaml(self) -> str: + """Write the GKE linuxConfig YAML to a tempfile; return the path. + + Called by GkeCluster._AddNodeParamsToCmd() to supply + --system-config-from-file. The caller is responsible for deleting the + tempfile via CleanupYaml() after the gcloud command completes. + + Per Ajay review r3472513706: + linuxConfig.swapConfig.enabled=true automatically sets + kubeletConfig.memorySwapBehavior=LimitedSwap -- no need to set + kubeletConfig explicitly. + For LSSD machines, dedicatedLocalSsdProfile.diskCount instructs GKE to + use local NVMe as the swap device. + + Returns: + Absolute path to the written tempfile. + """ + if self.lssd and self.lssd_count > 0: + swap_block = ( + ' swapConfig:\n' + ' enabled: true\n' + ' dedicatedLocalSsdProfile:\n' + f' diskCount: {self.lssd_count}\n' + ) + else: + swap_block = ' swapConfig:\n enabled: true\n' + + yaml_content = ( + 'linuxConfig:\n' + + swap_block + + ' sysctl:\n' + + f' vm.swappiness: "{self.swappiness}"\n' + + f' vm.min_free_kbytes: "{self.min_free_kbytes}"\n' + + f' vm.watermark_scale_factor: "{self.watermark_scale_factor}"\n' + ) + + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False + ) as tmp: + tmp.write(yaml_content) + tmp.flush() + self._yaml_path = tmp.name + + logging.info( + '[swap_config] Wrote linuxConfig YAML' + ' lssd=%s count=%d path=%s\n%s', + self.lssd, + self.lssd_count, + self._yaml_path, + yaml_content, + ) + return self._yaml_path + + def ValidHyperdiskThroughput(self) -> int: + """Return clamped throughput satisfying GCP Hyperdisk Balanced constraints. + + GCP Hyperdisk Balanced requires: provisioned_iops <= 256 x throughput_MiB_s. + Clamps throughput UP so a mismatched pair cannot abort nodepool creation. + """ + if not self.boot_disk_iops or not self.boot_disk_throughput: + return self.boot_disk_throughput + min_tput = -(-int(self.boot_disk_iops) // _HYPERDISK_MAX_IOPS_PER_MBPS) + if self.boot_disk_throughput < min_tput: + logging.warning( + '[swap_config] boot disk throughput %d MiB/s too low for %d IOPS;' + ' clamping to minimum %d MiB/s', + self.boot_disk_throughput, + self.boot_disk_iops, + min_tput, + ) + return min_tput + return self.boot_disk_throughput + + def CleanupYaml(self) -> None: + """Delete the linuxConfig tempfile if it was written.""" + if self._yaml_path and os.path.exists(self._yaml_path): + try: + os.unlink(self._yaml_path) + logging.info( + '[swap_config] Cleaned up YAML tempfile: %s', self._yaml_path + ) + except OSError: + pass + self._yaml_path = None + + def _CleanupYaml(self) -> None: + self.CleanupYaml() + + +class EksSwapConfig(BaseSwapConfig): + """EKS swap configuration for a nodepool (stub). + + Configures kubelet LimitedSwap via nodeadm bootstrap configuration. + Full implementation deferred to PR #6780. + + Attributes: + swappiness: vm.swappiness sysctl value (inherited from BaseSwapConfig). + min_free_kbytes: vm.min_free_kbytes sysctl (inherited from BaseSwapConfig). + watermark_scale_factor: vm.watermark_scale_factor + (inherited from BaseSwapConfig). + memory_swap_behavior: kubelet memorySwapBehavior value ('LimitedSwap'). + fail_swap_on: kubelet failSwapOn setting (False to allow swap on EKS). + """ + + RESOURCE_TYPE = 'EksSwapConfig' + REQUIRED_ATTRS = [] + + def __init__( + self, + swappiness: int = 100, + min_free_kbytes: int = 67584, + watermark_scale_factor: int = 500, + memory_swap_behavior: str = 'LimitedSwap', + fail_swap_on: bool = False, + ) -> None: + super().__init__( + swappiness=swappiness, + min_free_kbytes=min_free_kbytes, + watermark_scale_factor=watermark_scale_factor, + ) + self.memory_swap_behavior = memory_swap_behavior + self.fail_swap_on = fail_swap_on + + @classmethod + def from_spec(cls, swap_spec) -> 'EksSwapConfig': + """Create an EksSwapConfig from a SwapConfigSpec.""" + return cls( + swappiness=swap_spec.swappiness, + min_free_kbytes=swap_spec.min_free_kbytes, + watermark_scale_factor=swap_spec.watermark_scale_factor, + ) + + def _Create(self) -> None: + """Stub: EKS kubelet LimitedSwap via nodeadm (deferred to PR #6780).""" + logging.warning( + '[swap_config] EksSwapConfig._Create() is a stub. ' + 'EKS kubelet LimitedSwap config via nodeadm not yet implemented ' + '(deferred to PR #6780). Swap will not be enabled on EKS nodes.' + ) + + def GetNodeadmConfig(self) -> str: + """Return nodeadm bootstrap YAML for kubelet swap settings.""" + return ( + 'apiVersion: node.eks.aws/v1alpha1\n' + 'kind: NodeConfig\n' + 'spec:\n' + ' kubelet:\n' + ' config:\n' + f' memorySwapBehavior: {self.memory_swap_behavior}\n' + f' failSwapOn: {str(self.fail_swap_on).lower()}\n' + ' containerd:\n' + ' config:\n' + f' vm.swappiness: {self.swappiness}\n' + f' vm.min_free_kbytes: {self.min_free_kbytes}\n' + f' vm.watermark_scale_factor: {self.watermark_scale_factor}\n' + ) diff --git a/perfkitbenchmarker/resources/container_service/swap_daemonset.py b/perfkitbenchmarker/resources/container_service/swap_daemonset.py new file mode 100644 index 0000000000..fdc977e3f5 --- /dev/null +++ b/perfkitbenchmarker/resources/container_service/swap_daemonset.py @@ -0,0 +1,623 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""SwapDaemonSet: PKB BaseResource for the swap-encryption privileged DaemonSet. + +Manages the full lifecycle of the privileged benchmark pod used by the +swap_encryption benchmark: + + _Create() — apply the Jinja2 manifest via kubernetes_commands.ApplyManifest + and wait for the pod to reach Running + /tmp/pkb_ready. + _Delete() — run in-pod cleanup (swapoff, dmsetup remove, losetup teardown, + pkill fio/stress-ng) then kubectl delete daemonset. + PodExec() — kubectl exec wrapper with transient-reset retry, + OOM-kill (rc=137) + detection, and automatic RecoverPod() after eviction or container + restart. + WaitForPod() — polls for Running phase + sentinel; updates self.pod_name. + RecoverPod() — waits for DaemonSet to recreate / restart the container, + checking deletionTimestamp to avoid false-positive + Running state. + +Extracted from swap_encryption_benchmark.py to satisfy PKB resource pattern +(go/pkb-resources): infrastructure lifecycle belongs in BaseResource subclasses, +not in benchmark files. +""" + +import logging +import textwrap +import time +from typing import Optional + +from perfkitbenchmarker import errors +from perfkitbenchmarker import resource +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_commands + +# Transient kubectl errors that are safe to retry automatically. +_TRANSIENT_KUBECTL_ERRORS = ('connection reset by peer', 'websocket: close') + +# Errors indicating the container / pod is gone and needs full recovery. +_CONTAINER_GONE_KUBECTL_ERRORS = ( + 'container not found', + 'procready not received', + 'unable to upgrade connection', + 'not found', + 'deleted state', +) + + +class SwapDaemonSet(resource.BaseResource): + """PKB resource for the swap-encryption benchmark privileged DaemonSet. + + The DaemonSet runs a single privileged pod on the benchmark nodepool. + It installs measurement tools (fio, cryptsetup, mdadm, sysstat, nvme-cli), + verifies the swap device is active, then writes /tmp/pkb_ready. All + benchmark phases execute commands inside this pod via PodExec(). + + Attributes: + name: DaemonSet metadata.name (e.g. 'pkb-swap-benchmark'). + namespace: Kubernetes namespace (typically 'default'). + label: Pod label value for app= selector. + nodepool: pkb_nodepool label value pinning the DaemonSet to the + benchmark node. + image: Container image (e.g. 'ubuntu:22.04'). + pod_name: Name of the currently active pod; updated by WaitForPod / + RecoverPod on eviction. + oom_events: Pod names that triggered rc=137 OOM-kill; read by Run() + for the degradation gate. + pod_lost: Pod names that went NotFound during PodExec; read by Run() + for the degradation gate. + """ + + RESOURCE_TYPE = 'SwapDaemonSet' + REQUIRED_ATTRS = [] + + def __init__( + self, + name: str, + namespace: str, + label: str, + nodepool: str, + image: str, + ) -> None: + super().__init__() + self.name = name + self.namespace = namespace + self.label = label + self.nodepool = nodepool + self.image = image + # Active pod tracking — updated by WaitForPod / RecoverPod. + self.pod_name: Optional[str] = None + # Per-run accumulators read by Run() for the degradation gate. + self.oom_events: list[str] = [] + self.pod_lost: list[str] = [] + + # ── PKB lifecycle ───────────────────────────────────────────────────────── + + def _Create(self) -> None: + """Apply the DaemonSet manifest and wait for the pod to be ready.""" + kubernetes_commands.ApplyManifest( + 'cluster/swap_encryption_daemonset.yaml.j2', + ds_name=self.name, + ds_namespace=self.namespace, + ds_label=self.label, + benchmark_nodepool=self.nodepool, + image=self.image, + ) + logging.info('[swap_encryption] Swap-infra DaemonSet applied') + pod = self.WaitForPod() + if pod is None: + raise errors.Benchmarks.PrepareException( + '[swap_encryption] DaemonSet pod did not become ready within' + ' timeout' + ) + + def _Delete(self) -> None: + """Run in-pod teardown then delete the DaemonSet. + + Runs swapoff, dmsetup remove, losetup cleanup, and pkill inside the + pod (best-effort, ignore_failure=True) before deleting the DaemonSet. + This mirrors the original Cleanup() logic so no swap state is leaked. + """ + # Try to get the pod name quickly if not set. + if self.pod_name is None: + self.WaitForPod(timeout=30) + + if self.pod_name: + self.PodExec( + 'swapoff -a 2>/dev/null || true', + ignore_failure=True, + retries=0, + ) + self.PodExec( + textwrap.dedent("""\ + swapoff /dev/mapper/swap_encrypted 2>/dev/null || true + dmsetup remove --noudevrules --noudevsync \ + swap_encrypted 2>/dev/null || true + """), + ignore_failure=True, + retries=0, + ) + self.PodExec( + textwrap.dedent("""\ + for backing in \ + /var/pkb_swap_backing \ + /run/pkb_swap_backing \ + /mnt/stateful_partition/pkb_swap_backing + do + losetup -j "$backing" 2>/dev/null \ + | awk -F: '{print $1}' \ + | while read dev + do losetup -d "$dev" 2>/dev/null || true; done + rm -f "$backing" + done + """), + ignore_failure=True, + retries=0, + ) + self.PodExec( + "pkill -9 'stress-ng|fio' 2>/dev/null || true", + ignore_failure=True, + retries=0, + ) + + kubectl.RunKubectlCommand( + [ + 'delete', + 'daemonset', + self.name, + '-n', + self.namespace, + '--ignore-not-found', + ], + raise_on_failure=False, + ) + logging.info('[swap_encryption] DaemonSet deleted') + + # ── Pod lifecycle helpers ───────────────────────────────────────────────── + + def WaitForPod(self, timeout: int = 600) -> Optional[str]: + """Wait until the DaemonSet pod is Running AND /tmp/pkb_ready exists. + + Two-phase poll: + 1. Wait for status.phase == Running. + 2. kubectl exec test -f /tmp/pkb_ready. + + The DaemonSet init script writes /tmp/pkb_ready only after verifying + the swap device is active (up to 150 s) and installing all measurement + tools (~1-2 min on cold APT cache). The default 600 s covers + worst-case APT latency on a freshly-booted node. + + Args: + timeout: Maximum seconds to wait. + + Returns: + Pod name on success; None on timeout. Also updates self.pod_name. + """ + deadline = time.time() + timeout + last_phase = '' + ready_pod = None + + while time.time() < deadline: + # Step 1: wait for Running phase. + if ready_pod is None: + out, _, rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pods', + '-l', + f'app={self.label}', + '-n', + self.namespace, + '-o', + ( + r'jsonpath={range .items[*]}' + r'{.metadata.name}{"\t"}' + r'{.status.phase}{"\n"}{end}' + ), + ], + raise_on_failure=False, + ) + if rc == 0 and out.strip(): + ready_pod, last_phase = self._FindRunningPod( + out, last_phase) + else: + logging.info( + '[swap_encryption] Waiting for DaemonSet pod to' + ' appear...' + ) + + # Step 2: poll for /tmp/pkb_ready sentinel. + if ready_pod is not None: + _, sentinel_err, sentinel_rc = kubectl.RunKubectlCommand( + [ + 'exec', + ready_pod, + '-n', + self.namespace, + '--', + 'test', + '-f', + '/tmp/pkb_ready', + ], + raise_on_failure=False, + ) + if sentinel_rc == 0: + logging.info( + '[swap_encryption] Pod %s ready (swap device active)', + ready_pod, + ) + self.pod_name = ready_pod + return ready_pod + # Container crashed (CrashLoopBackOff / exited) — reset and + # re-check pod phase on the next iteration. + if 'container not found' in sentinel_err or ( + 'unable to upgrade connection' in sentinel_err + ): + logging.warning( + '[swap_encryption] Pod %s: container not running' + ' (%s) — will re-check pod state', + ready_pod, + sentinel_err.strip(), + ) + ready_pod = None + last_phase = '' + else: + logging.info( + '[swap_encryption] Pod %s: still installing tools...', + ready_pod, + ) + + time.sleep(15) + + logging.warning( + '[swap_encryption] Benchmark pod not ready after %ds', timeout + ) + return None + + def _FindRunningPod( + self, out: str, last_phase: str + ) -> tuple[Optional[str], str]: + """Parse 'kubectl get pods' output for a Running pod. + + Args: + out: Raw kubectl output (name\tphase per line). + last_phase: Last observed phase for change-detection logging. + + Returns: + Tuple of (pod_name_if_running_else_None, updated_last_phase). + """ + for line in out.strip().splitlines(): + parts = line.split('\t') + if len(parts) != 2: + continue + pod_name, phase = parts[0].strip(), parts[1].strip() + if phase == 'Running': + logging.info( + '[swap_encryption] Pod %s is Running' + ' — waiting for sentinel...', + pod_name, + ) + return pod_name, last_phase + if phase != last_phase: + logging.info( + '[swap_encryption] Pod %s phase: %s', pod_name, phase) + last_phase = phase + if phase == 'Pending': + self._LogPodEvents(pod_name) + return None, last_phase + + def _LogPodEvents(self, pod_name: str) -> None: + """Dump recent Kubernetes events for a pod to help diagnose hangs.""" + events_out, _, _ = kubectl.RunKubectlCommand( + ['describe', 'pod', pod_name, '-n', self.namespace], + raise_on_failure=False, + ) + in_events = False + lines = [] + for line in events_out.splitlines(): + if line.startswith('Events:'): + in_events = True + if in_events: + lines.append(line) + if lines: + logging.info( + '[swap_encryption] Pod events:\n%s', '\n'.join(lines[:30]) + ) + else: + logging.info( + '[swap_encryption] kubectl describe output:\n%s', + events_out[-2000:] if len(events_out) > 2000 else events_out, + ) + + def _IsPodGone(self, pod: str) -> bool: + """Return True if the named pod no longer exists in the cluster.""" + try: + _, err, rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pod', + pod, + '-n', + self.namespace, + '-o', + 'jsonpath={.metadata.name}', + ], + raise_on_failure=False, + timeout=15, + ) + return rc != 0 and 'not found' in (err or '').lower() + except Exception: # pylint: disable=broad-except + return False + + def PodExec( + self, + cmd: str, + ignore_failure: bool = False, + timeout: int = 300, + retries: int = 2, + ) -> tuple[str, str]: + """Run a shell command inside the benchmark pod via kubectl exec. + + Handles: + - Transient GKE websocket resets: automatic retry (up to retries). + - OOM kill (rc=137): records to self.oom_events, calls RecoverPod, + does NOT retry the OOM-triggering command itself. + - Container/pod gone: records to self.pod_lost, calls RecoverPod, + retries the command on the recovered pod. + + Uses self.pod_name as the active pod; RecoverPod updates it on eviction. + + Args: + cmd: Shell command string passed to bash -c. + ignore_failure: When True, non-zero exit codes are logged but not + raised. + timeout: Seconds before PKB kills the kubectl exec process. Pass a + larger value for long-running jobs (fio, stress-ng, kernel build). + retries: Max automatic retries on transient websocket resets. + + Returns: + Tuple of (stdout, stderr) strings. + """ + active = self.pod_name + + for attempt in range(retries + 1): + out, err, rc = kubectl.RunKubectlCommand( + [ + 'exec', + active, + '-n', + self.namespace, + '--', + 'bash', + '-c', + cmd, + ], + raise_on_failure=False, + raise_on_timeout=False, + timeout=timeout, + ) + + # Retry transient GKE websocket resets. + is_transient = rc != 0 and any( + e in err for e in _TRANSIENT_KUBECTL_ERRORS + ) + if is_transient and attempt < retries: + logging.warning( + '[swap_encryption] kubectl exec connection reset (attempt' + ' %d/%d); retrying in 10 s', + attempt + 1, + retries + 1, + ) + time.sleep(10) + continue + + # rc=137 (SIGKILL): OOM killer terminated the container process. + # Do NOT retry — log, recover, and return so the caller can decide. + if rc == 137: + if active not in self.oom_events: + self.oom_events.append(active) + # Kubernetes takes a few seconds to update pod state after + # eviction — sleep before checking to avoid false-positive Running. + logging.warning( + '[swap_encryption] rc=137 — sleeping 15 s for Kubernetes' + ' to update pod state before recovery check' + ) + time.sleep(15) + if self._IsPodGone(active): + logging.warning( + '[swap_encryption] OOM-eviction detected (rc=137, pod' + ' gone) — recovering pod name for subsequent commands' + ) + else: + logging.warning( + '[swap_encryption] Container OOM-killed (rc=137, pod' + ' still exists) — waiting for container restart' + ) + new_pod = self.RecoverPod(active) + if new_pod != active: + logging.info( + '[swap_encryption] Pod name updated: %s → %s', + active, + new_pod, + ) + self.pod_name = new_pod + active = new_pod + break # OOM cmd is never re-run on the recovered pod. + + # Container or pod gone: record loss, try RecoverPod, retry cmd. + is_container_gone = rc != 0 and any( + e in err.lower() for e in _CONTAINER_GONE_KUBECTL_ERRORS + ) + if is_container_gone: + if active and active not in self.pod_lost: + self.pod_lost.append(active) + logging.error( + '[swap_encryption] Benchmark pod %s is gone (%s) —' + ' recording run as degraded', + active, + (err or '').strip()[:160], + ) + if attempt < retries: + logging.warning( + '[swap_encryption] Container gone/restarting (attempt' + ' %d/%d) — waiting for pod to recover...', + attempt + 1, + retries + 1, + ) + new_pod = self.RecoverPod(active) + if new_pod != active: + logging.info( + '[swap_encryption] Pod name updated: %s → %s', + active, + new_pod, + ) + self.pod_name = new_pod + active = new_pod + continue + break + + if rc != 0 and not ignore_failure: + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] PodExec failed (rc={rc}): {err}' + ) + return out, err + + def RecoverPod(self, pod: str, timeout_sec: int = 600) -> str: + """Wait for the DaemonSet to recover after OOM kill or eviction. + + Handles two scenarios: + 1. Container OOM restart: same pod name, container restarting in + place (DaemonSet restartPolicy=Always). + 2. Pod eviction/deletion: pod is gone; DaemonSet creates a new pod + with a DIFFERENT name. + + Checks metadata.deletionTimestamp in addition to status.phase to + catch the Terminating state where phase may still read Running. + + Args: + pod: Original pod name to monitor. + timeout_sec: Maximum seconds to wait for recovery. + + Returns: + The (possibly new) pod name once Running and /tmp/pkb_ready is + present. + """ + deadline = time.time() + timeout_sec + logging.info( + '[swap_encryption] Waiting for pod %s to recover (up to %ds)...', + pod, + timeout_sec, + ) + + # Phase 1: find a Running pod that is NOT being terminated. + recovered_pod = pod + while time.time() < deadline: + # Query both phase and deletionTimestamp in a single call. + status_out, status_err, status_rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pod', + pod, + '-n', + self.namespace, + '-o', + 'jsonpath={.status.phase}|{.metadata.deletionTimestamp}', + ], + raise_on_failure=False, + timeout=30, + ) + fields = status_out.strip().split('|') + phase = fields[0].strip() if fields else '' + is_terminating = len(fields) > 1 and bool(fields[1].strip()) + + # Genuine Running (not being deleted) — move to Phase 2. + if status_rc == 0 and phase == 'Running' and not is_terminating: + break + + # Pod gone or Terminating — look for a replacement by label. + pod_gone_or_terminating = ( + status_rc != 0 + and 'not found' in (status_out + status_err).lower() + ) or is_terminating + if pod_gone_or_terminating: + label_out, _, label_rc = kubectl.RunKubectlCommand( + [ + 'get', + 'pods', + '-n', + self.namespace, + '-l', + f'app={self.label}', + '-o', + ( + 'jsonpath={range' + ' .items[?(@.status.phase=="Running")]}' + '{.metadata.name}{"\\n"}{end}' + ), + ], + raise_on_failure=False, + timeout=30, + ) + new_pods = [ + p.strip() + for p in label_out.strip().splitlines() + if p.strip() and p.strip() != pod + ] + if label_rc == 0 and new_pods: + recovered_pod = new_pods[0] + logging.info( + '[swap_encryption] Original pod %s gone/terminating;' + ' found replacement %s', + pod, + recovered_pod, + ) + break + + time.sleep(10) + else: + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] No Running pod found (original: {pod})' + f' within {timeout_sec}s after OOM kill / eviction' + ) + + # Phase 2: wait for init script to finish (sentinel written last). + while time.time() < deadline: + ready_out, _, ready_rc = kubectl.RunKubectlCommand( + [ + 'exec', + recovered_pod, + '-n', + self.namespace, + '--', + 'bash', + '-c', + 'test -f /tmp/pkb_ready && echo READY', + ], + raise_on_failure=False, + timeout=30, + ) + if ready_rc == 0 and 'READY' in ready_out: + logging.info( + '[swap_encryption] Pod %s recovered (swap device active)', + recovered_pod, + ) + self.pod_name = recovered_pod + return recovered_pod + time.sleep(15) + + raise errors.VmUtil.IssueCommandError( + f'[swap_encryption] Pod {recovered_pod} did not become ready' + f' within {timeout_sec}s after OOM kill / eviction' + ) diff --git a/tests/linux_benchmarks/kubernetes_redis_memtier_benchmark_test.py b/tests/linux_benchmarks/kubernetes_redis_memtier_benchmark_test.py index 2fac98e49d..0699045552 100644 --- a/tests/linux_benchmarks/kubernetes_redis_memtier_benchmark_test.py +++ b/tests/linux_benchmarks/kubernetes_redis_memtier_benchmark_test.py @@ -1,17 +1,42 @@ +# Copyright 2024 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kubernetes_redis_memtier_benchmark.""" + +# Tests intentionally access private module symbols (_SWAP_*, _SwapMetadata, +# _CreateRunConfigMatrix) to verify internal behaviour without a full cluster. +# pylint: disable=protected-access,invalid-name + import collections import unittest +from unittest import mock -from perfkitbenchmarker.linux_benchmarks import kubernetes_redis_memtier_benchmark +from absl.testing import flagsaver +from perfkitbenchmarker import sample as sample_lib +from perfkitbenchmarker.linux_benchmarks import ( + kubernetes_redis_memtier_benchmark as benchmark +) from tests import pkb_common_test_case class KubernetesRedisMemtierBenchmarkTest( pkb_common_test_case.PkbCommonTestCase ): + """Tests for the _CreateRunConfigMatrix helper.""" def test__CreateRunConfigMatrix(self): cls = collections.namedtuple('cls', ['field_a', 'field_b', 'field_c']) - run_configs = kubernetes_redis_memtier_benchmark._CreateRunConfigMatrix( + run_configs = benchmark._CreateRunConfigMatrix( cls, field_a=[1, 2], field_b=['x', 'y'], @@ -19,7 +44,6 @@ def test__CreateRunConfigMatrix(self): ) self.assertLen(run_configs, 2**3) self.assertIsInstance(run_configs[0], cls) - print(run_configs) self.assertEqual( run_configs, [ @@ -35,5 +59,258 @@ def test__CreateRunConfigMatrix(self): ) +class GetConfigNoSwapTest(pkb_common_test_case.PkbCommonTestCase): + """GetConfig() default behaviour -- swap flag is False.""" + + def test_get_config_returns_dict(self): + self.assertIsInstance(benchmark.GetConfig({}), dict) + + def test_get_config_has_container_cluster(self): + self.assertIn('container_cluster', benchmark.GetConfig({})) + + def test_get_config_servers_nodepool_present(self): + config = benchmark.GetConfig({}) + self.assertIn('servers', config['container_cluster']['nodepools']) + + def test_get_config_clients_nodepool_present(self): + config = benchmark.GetConfig({}) + self.assertIn('clients', config['container_cluster']['nodepools']) + + def test_get_config_gcp_default_server_machine_type(self): + config = benchmark.GetConfig({}) + machine_type = ( + config['container_cluster']['nodepools']['servers'] + ['vm_spec']['GCP']['machine_type'] + ) + self.assertEqual(machine_type, 'c4-standard-4') + + def test_get_config_aws_default_server_machine_type(self): + config = benchmark.GetConfig({}) + machine_type = ( + config['container_cluster']['nodepools']['servers'] + ['vm_spec']['AWS']['machine_type'] + ) + self.assertEqual(machine_type, 'm5.xlarge') + + def test_get_config_no_swap_config_by_default(self): + server_np = ( + benchmark.GetConfig({})['container_cluster']['nodepools']['servers'] + ) + self.assertNotIn('swap_config', server_np) + + +class GetConfigSwapEnabledGcpTest(pkb_common_test_case.PkbCommonTestCase): + """GetConfig() GCP behaviour when --kubernetes_redis_memtier_swap_enabled.""" + + def _server_np(self): + return ( + benchmark.GetConfig({})['container_cluster']['nodepools']['servers'] + ) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_config_injected_into_servers_nodepool(self): + self.assertIn('swap_config', self._server_np()) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_config_enabled_true(self): + self.assertTrue(self._server_np()['swap_config'].get('enabled', False)) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_server_machine_type_upgraded_to_highmem(self): + machine_type = self._server_np()['vm_spec']['GCP']['machine_type'] + self.assertEqual(machine_type, benchmark._SWAP_MACHINE_TYPE) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_server_boot_disk_type_is_hyperdisk(self): + disk_type = self._server_np()['vm_spec']['GCP']['boot_disk_type'] + self.assertEqual(disk_type, benchmark._SWAP_BOOT_DISK_TYPE) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_clients_nodepool_unaffected_by_swap_toggle(self): + config = benchmark.GetConfig({}) + client_np = config['container_cluster']['nodepools']['clients'] + self.assertEqual( + client_np['vm_spec']['GCP']['machine_type'], 'c4-standard-32') + self.assertNotIn('swap_config', client_np) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + kubernetes_redis_memtier_swap_swappiness=60, + ) + def test_swap_config_swappiness_reflects_flag(self): + self.assertEqual(self._server_np()['swap_config']['swappiness'], 60) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_config_has_gcp_disk_iops_and_throughput(self): + swap_cfg = self._server_np()['swap_config'] + self.assertEqual(swap_cfg['boot_disk_iops'], benchmark._SWAP_BOOT_DISK_IOPS) + self.assertEqual( + swap_cfg['boot_disk_throughput'], benchmark._SWAP_BOOT_DISK_THROUGHPUT) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + redis_memtier_server_machine_type='n2-highmem-16', + ) + def test_redis_memtier_server_machine_type_wins_over_swap_default(self): + # --redis_memtier_server_machine_type takes precedence over the swap + # default machine type. + machine_type = self._server_np()['vm_spec']['GCP']['machine_type'] + self.assertEqual(machine_type, 'n2-highmem-16') + + +class GetConfigSwapEnabledAwsTest(pkb_common_test_case.PkbCommonTestCase): + """GetConfig() AWS behaviour when --kubernetes_redis_memtier_swap_enabled.""" + + def _server_np(self): + return ( + benchmark.GetConfig({})['container_cluster']['nodepools']['servers'] + ) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_aws_server_machine_type_upgraded_to_r6i(self): + machine_type = self._server_np()['vm_spec']['AWS']['machine_type'] + self.assertEqual(machine_type, benchmark._SWAP_AWS_MACHINE_TYPE) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_aws_server_boot_disk_type_is_gp3(self): + disk_type = self._server_np()['vm_spec']['AWS']['boot_disk_type'] + self.assertEqual(disk_type, benchmark._SWAP_AWS_BOOT_DISK_TYPE) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_aws_server_boot_disk_size(self): + disk_size = self._server_np()['vm_spec']['AWS']['boot_disk_size'] + self.assertEqual(disk_size, benchmark._SWAP_AWS_BOOT_DISK_SIZE) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_config_has_iops_and_throughput(self): + swap_cfg = self._server_np()['swap_config'] + self.assertIn('boot_disk_iops', swap_cfg) + self.assertIn('boot_disk_throughput', swap_cfg) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + redis_memtier_server_machine_type='r7i.8xlarge', + ) + def test_aws_server_machine_type_flag_wins_over_swap_default(self): + machine_type = self._server_np()['vm_spec']['AWS']['machine_type'] + self.assertEqual(machine_type, 'r7i.8xlarge') + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_gcp_entry_unaffected_by_aws_swap_constants(self): + # GCP vm_spec should still get hyperdisk-balanced, not gp3. + disk_type = self._server_np()['vm_spec']['GCP']['boot_disk_type'] + self.assertEqual(disk_type, benchmark._SWAP_BOOT_DISK_TYPE) + + +class SwapMetadataGcpTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _SwapMetadata() -- GCP (default cloud).""" + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_metadata_swap_enabled_true(self): + self.assertTrue(benchmark._SwapMetadata()['swap_enabled']) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + kubernetes_redis_memtier_swap_swappiness=80, + ) + def test_swap_metadata_swappiness_reflects_flag(self): + self.assertEqual(benchmark._SwapMetadata()['swap_swappiness'], 80) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_metadata_includes_all_expected_keys(self): + """All metadata keys required for result analysis must be present.""" + meta = benchmark._SwapMetadata() + for key in ( + 'swap_machine_type', + 'swap_boot_disk_type', + 'swap_boot_disk_size_gb', + 'swap_boot_disk_iops', + 'swap_boot_disk_throughput', + 'swap_min_free_kbytes', + 'swap_watermark_scale_factor', + ): + self.assertIn(key, meta, msg=f'Missing key: {key}') + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_metadata_gcp_machine_type_by_default(self): + self.assertEqual( + benchmark._SwapMetadata()['swap_machine_type'], + benchmark._SWAP_MACHINE_TYPE, + ) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_swap_metadata_gcp_disk_type_by_default(self): + self.assertEqual( + benchmark._SwapMetadata()['swap_boot_disk_type'], + benchmark._SWAP_BOOT_DISK_TYPE, + ) + + +class SwapMetadataAwsTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _SwapMetadata() -- AWS.""" + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + cloud='AWS', + ) + def test_swap_metadata_aws_machine_type(self): + self.assertEqual( + benchmark._SwapMetadata()['swap_machine_type'], + benchmark._SWAP_AWS_MACHINE_TYPE, + ) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + cloud='AWS', + ) + def test_swap_metadata_aws_disk_type_is_gp3(self): + self.assertEqual( + benchmark._SwapMetadata()['swap_boot_disk_type'], + benchmark._SWAP_AWS_BOOT_DISK_TYPE, + ) + + @flagsaver.flagsaver( + kubernetes_redis_memtier_swap_enabled=True, + cloud='AWS', + ) + def test_swap_metadata_aws_iops(self): + self.assertEqual( + benchmark._SwapMetadata()['swap_boot_disk_iops'], + benchmark._SWAP_AWS_BOOT_DISK_IOPS, + ) + + +class RunSwapMetadataTest(pkb_common_test_case.PkbCommonTestCase): + """Verify that Run() attaches swap metadata when the flag is set.""" + + def _make_sample(self): + """Return a minimal Sample for use in Run() mock tests.""" + return sample_lib.Sample('throughput', 100.0, 'ops/s', {}) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=True) + def test_run_attaches_swap_metadata_to_each_sample(self): + fake_samples = [self._make_sample(), self._make_sample()] + with mock.patch.object( + benchmark, '_RunMemtier', return_value=fake_samples, + ): + result = benchmark.Run(mock.Mock()) + + for s in result: + self.assertIn('swap_enabled', s.metadata) + self.assertTrue(s.metadata['swap_enabled']) + self.assertIn('swap_machine_type', s.metadata) + + @flagsaver.flagsaver(kubernetes_redis_memtier_swap_enabled=False) + def test_run_no_swap_metadata_when_flag_false(self): + fake_samples = [self._make_sample()] + with mock.patch.object( + benchmark, '_RunMemtier', return_value=fake_samples, + ): + result = benchmark.Run(mock.Mock()) + + for s in result: + self.assertNotIn('swap_enabled', s.metadata) + + if __name__ == '__main__': unittest.main() diff --git a/tests/linux_benchmarks/swap_encryption_benchmark_test.py b/tests/linux_benchmarks/swap_encryption_benchmark_test.py new file mode 100644 index 0000000000..9a29939cde --- /dev/null +++ b/tests/linux_benchmarks/swap_encryption_benchmark_test.py @@ -0,0 +1,141 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for perfkitbenchmarker.linux_benchmarks.swap_encryption_benchmark.""" + +import unittest +from unittest import mock + +from perfkitbenchmarker.linux_benchmarks import swap_encryption_benchmark +from tests import pkb_common_test_case + + +class GetConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests that BENCHMARK_CONFIG is well-formed and loadable.""" + + def test_get_config_returns_dict(self): + config = swap_encryption_benchmark.GetConfig({}) + self.assertIsInstance(config, dict) + + def test_get_config_has_container_cluster(self): + # configs.LoadConfig returns the inner benchmark dict directly (no benchmark + # name wrapper), so top-level keys are 'container_cluster', 'description', etc. + config = swap_encryption_benchmark.GetConfig({}) + self.assertIn('container_cluster', config) + + def test_get_config_benchmark_nodepool_present(self): + config = swap_encryption_benchmark.GetConfig({}) + nodepools = config['container_cluster']['nodepools'] + self.assertIn( + swap_encryption_benchmark._BENCHMARK_NODEPOOL, + nodepools, + ) + + def test_get_config_swap_config_present_on_benchmark_nodepool(self): + config = swap_encryption_benchmark.GetConfig({}) + nodepool = config['container_cluster']['nodepools'][ + swap_encryption_benchmark._BENCHMARK_NODEPOOL + ] + self.assertIn('swap_config', nodepool) + self.assertTrue(nodepool['swap_config'].get('enabled', False)) + + +class ParseCipherTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _parse_cipher() output parsing.""" + + def test_parse_cipher_standard_aes_xts(self): + # Typical dmsetup status line: - crypt ... + status = '0 67108864 crypt aes-xts-plain64 0 8:16 0 1 sector_size:4096' + self.assertEqual( + swap_encryption_benchmark._parse_cipher(status), 'aes-xts-plain64' + ) + + def test_parse_cipher_returns_empty_when_no_crypt_token(self): + status = '0 67108864 linear 8:16 0' + self.assertEqual(swap_encryption_benchmark._parse_cipher(status), '') + + def test_parse_cipher_returns_empty_on_empty_string(self): + self.assertEqual(swap_encryption_benchmark._parse_cipher(''), '') + + def test_parse_cipher_crypt_at_end_returns_empty(self): + # 'crypt' present but no token after it. + status = 'something crypt' + self.assertEqual(swap_encryption_benchmark._parse_cipher(status), '') + + def test_parse_cipher_not_encrypted_string(self): + # Output from the benchmark when dm-crypt not active. + status = 'not_encrypted' + self.assertEqual(swap_encryption_benchmark._parse_cipher(status), '') + + +class DetectSwapDeviceTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _detect_swap_device() with mocked PodExec.""" + + def _make_ds(self, pod_exec_output): + ds = mock.Mock() + ds.PodExec.return_value = (pod_exec_output, '') + return ds + + def test_detect_swap_device_returns_device_basename(self): + # /proc/swaps first device column (after header skip via awk NR>1). + ds = self._make_ds('/dev/dm-0\n') + result = swap_encryption_benchmark._detect_swap_device(ds) + self.assertEqual(result, 'dm-0') + + def test_detect_swap_device_returns_first_device_when_multiple(self): + ds = self._make_ds('/dev/dm-0\n/dev/dm-1\n') + result = swap_encryption_benchmark._detect_swap_device(ds) + self.assertEqual(result, 'dm-0') + + def test_detect_swap_device_returns_empty_when_no_swap(self): + ds = self._make_ds('') + result = swap_encryption_benchmark._detect_swap_device(ds) + self.assertEqual(result, '') + + def test_detect_swap_device_returns_empty_on_pod_exec_exception(self): + ds = mock.Mock() + ds.PodExec.side_effect = Exception('pod not found') + result = swap_encryption_benchmark._detect_swap_device(ds) + self.assertEqual(result, '') + + +class BuildMetadataTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for _build_metadata() with mocked PodExec.""" + + def test_build_metadata_includes_swap_device(self): + ds = mock.Mock() + ds.PodExec.return_value = ('5.15.0-gke-1234\n', '') + meta = swap_encryption_benchmark._build_metadata(ds, 'dm-0') + self.assertEqual(meta['swap_device'], 'dm-0') + + def test_build_metadata_swap_device_unknown_when_empty(self): + ds = mock.Mock() + ds.PodExec.return_value = ('5.15.0\n', '') + meta = swap_encryption_benchmark._build_metadata(ds, '') + self.assertEqual(meta['swap_device'], 'unknown') + + def test_build_metadata_includes_kernel_version(self): + ds = mock.Mock() + ds.PodExec.return_value = ('5.15.0-gke-1234\n', '') + meta = swap_encryption_benchmark._build_metadata(ds, 'dm-0') + self.assertEqual(meta['kernel_version'], '5.15.0-gke-1234') + + def test_build_metadata_kernel_version_absent_on_pod_exec_exception(self): + ds = mock.Mock() + ds.PodExec.side_effect = Exception('timeout') + meta = swap_encryption_benchmark._build_metadata(ds, 'dm-0') + self.assertNotIn('kernel_version', meta) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/providers/gcp/google_kubernetes_engine_test.py b/tests/providers/gcp/google_kubernetes_engine_test.py index dbf8232f5e..3559f7378f 100644 --- a/tests/providers/gcp/google_kubernetes_engine_test.py +++ b/tests/providers/gcp/google_kubernetes_engine_test.py @@ -34,6 +34,7 @@ from perfkitbenchmarker.resources.container_service import container from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_commands +from perfkitbenchmarker.resources.container_service import swap_config as swap_config_lib from tests import pkb_common_test_case FLAGS = flgs.FLAGS @@ -949,5 +950,160 @@ def testCreateWithPerNodepoolAutoscaling(self): self.assertIn('--max-nodes 10', nodepool_cmd) +class GoogleKubernetesEngineSwapConfigTestCase(PatchedObjectsTestCase): + """Tests that _AddNodeParamsToCmd wires swap_config flags correctly.""" + + @staticmethod + def _make_swap_spec( + boot_disk_iops=160000, + boot_disk_throughput=2400, + lssd=False, + lssd_count=0, + ): + """Build a ContainerClusterSpec with swap_config on the benchmark nodepool.""" + return container_spec.ContainerClusterSpec( + 'NAME', + **{ + 'cloud': 'GCP', + 'vm_spec': { + 'GCP': { + 'machine_type': 'e2-medium', + 'zone': 'us-central1-a', + }, + }, + 'nodepools': { + 'benchmark': { + 'vm_spec': { + 'GCP': { + 'machine_type': 'n4-highmem-32', + 'zone': 'us-central1-a', + }, + }, + 'swap_config': { + 'enabled': True, + 'swappiness': 100, + 'min_free_kbytes': 67584, + 'watermark_scale_factor': 500, + 'lssd': lssd, + 'lssd_count': lssd_count, + 'boot_disk_iops': boot_disk_iops, + 'boot_disk_throughput': boot_disk_throughput, + }, + }, + }, + }, + ) + + def setUp(self): + super().setUp() + # Avoid real tempfile creation in GKE command-generation tests. + # GkeSwapConfig implementation is tested separately in swap_config_test.py. + self.enter_context( + mock.patch.object( + swap_config_lib.GkeSwapConfig, + 'WriteLinuxConfigYaml', + return_value='/tmp/fake_linux_config.yaml', + ) + ) + + def test_swap_config_sets_system_config_from_file_flag(self): + spec = self._make_swap_spec() + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertIsNotNone(nodepool_cmd) + self.assertIn('--system-config-from-file', nodepool_cmd) + self.assertIn('/tmp/fake_linux_config.yaml', nodepool_cmd) + + def test_swap_config_sets_ubuntu_containerd_image_type(self): + spec = self._make_swap_spec() + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertIn('UBUNTU_CONTAINERD', nodepool_cmd) + + def test_swap_config_sets_no_enable_autorepair(self): + spec = self._make_swap_spec() + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertIn('--no-enable-autorepair', nodepool_cmd) + + def test_swap_config_with_boot_disk_iops_sets_provisioned_flags(self): + spec = self._make_swap_spec(boot_disk_iops=160000, boot_disk_throughput=2400) + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertIn('--boot-disk-provisioned-iops', nodepool_cmd) + self.assertIn('--boot-disk-provisioned-throughput', nodepool_cmd) + + def test_swap_config_lssd_omits_boot_disk_provisioned_flags(self): + # When lssd=True the swap device is local NVMe, not the boot disk. + spec = self._make_swap_spec(lssd=True, lssd_count=2, boot_disk_iops=0) + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertNotIn('--boot-disk-provisioned-iops', nodepool_cmd) + self.assertNotIn('--boot-disk-provisioned-throughput', nodepool_cmd) + + def test_nodepool_without_swap_config_omits_all_swap_flags(self): + spec = container_spec.ContainerClusterSpec( + 'NAME', + **{ + 'cloud': 'GCP', + 'vm_spec': { + 'GCP': { + 'machine_type': 'e2-medium', + 'zone': 'us-central1-a', + }, + }, + 'nodepools': { + 'benchmark': { + 'vm_spec': { + 'GCP': { + 'machine_type': 'n4-highmem-32', + 'zone': 'us-central1-a', + }, + }, + }, + }, + }, + ) + with self.patch_critical_objects() as issue_command: + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + nodepool_cmd = issue_command.GetCommandWithSubstring( + 'node-pools create benchmark' + ) + self.assertNotIn('--system-config-from-file', nodepool_cmd) + self.assertNotIn('UBUNTU_CONTAINERD', nodepool_cmd) + self.assertNotIn('--no-enable-autorepair', nodepool_cmd) + + def test_cleanup_yaml_called_after_nodepool_create(self): + spec = self._make_swap_spec() + with mock.patch.object( + swap_config_lib.GkeSwapConfig, 'CleanupYaml' + ) as mock_cleanup: + with self.patch_critical_objects(): + cluster = google_kubernetes_engine.GkeCluster(spec) + cluster._Create() + mock_cleanup.assert_called_once() + + if __name__ == '__main__': unittest.main() diff --git a/tests/resources/container_service/swap_config_test.py b/tests/resources/container_service/swap_config_test.py new file mode 100644 index 0000000000..093d9d87c7 --- /dev/null +++ b/tests/resources/container_service/swap_config_test.py @@ -0,0 +1,260 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for perfkitbenchmarker.resources.container_service.swap_config.""" + +import os +import unittest +from unittest import mock + +from perfkitbenchmarker.resources.container_service import swap_config +from tests import pkb_common_test_case + + +class BaseSwapConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for the abstract BaseSwapConfig class.""" + + def test_default_attrs(self): + cfg = swap_config.BaseSwapConfig() + self.assertEqual(cfg.swappiness, 100) + self.assertEqual(cfg.min_free_kbytes, 67584) + self.assertEqual(cfg.watermark_scale_factor, 500) + + def test_custom_attrs(self): + cfg = swap_config.BaseSwapConfig( + swappiness=60, min_free_kbytes=400, watermark_scale_factor=200 + ) + self.assertEqual(cfg.swappiness, 60) + self.assertEqual(cfg.min_free_kbytes, 400) + self.assertEqual(cfg.watermark_scale_factor, 200) + + def test_from_spec_raises_not_implemented(self): + with self.assertRaises(NotImplementedError): + swap_config.BaseSwapConfig.from_spec(mock.Mock()) + + def test_create_is_noop(self): + cfg = swap_config.BaseSwapConfig() + cfg._Create() # Must not raise. + + def test_delete_is_noop(self): + cfg = swap_config.BaseSwapConfig() + cfg._Delete() # Must not raise. + + +class GkeSwapConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for GkeSwapConfig: YAML generation, Hyperdisk clamping, lifecycle.""" + + def _make_spec(self, **kwargs): + """Return a mock SwapConfigSpec with sensible defaults.""" + spec = mock.Mock() + spec.swappiness = kwargs.get('swappiness', 100) + spec.min_free_kbytes = kwargs.get('min_free_kbytes', 67584) + spec.watermark_scale_factor = kwargs.get('watermark_scale_factor', 500) + spec.lssd = kwargs.get('lssd', False) + spec.lssd_count = kwargs.get('lssd_count', 0) + spec.boot_disk_iops = kwargs.get('boot_disk_iops', 0) + spec.boot_disk_throughput = kwargs.get('boot_disk_throughput', 0) + return spec + + # ── from_spec ───────────────────────────────────────────────────────────── + + def test_from_spec_maps_all_attrs(self): + spec = self._make_spec( + swappiness=60, + min_free_kbytes=400, + watermark_scale_factor=200, + lssd=True, + lssd_count=2, + boot_disk_iops=160000, + boot_disk_throughput=2400, + ) + cfg = swap_config.GkeSwapConfig.from_spec(spec) + self.assertEqual(cfg.swappiness, 60) + self.assertEqual(cfg.min_free_kbytes, 400) + self.assertEqual(cfg.watermark_scale_factor, 200) + self.assertTrue(cfg.lssd) + self.assertEqual(cfg.lssd_count, 2) + self.assertEqual(cfg.boot_disk_iops, 160000) + self.assertEqual(cfg.boot_disk_throughput, 2400) + + # ── WriteLinuxConfigYaml ────────────────────────────────────────────────── + + def test_write_linux_config_yaml_basic_content(self): + cfg = swap_config.GkeSwapConfig( + swappiness=80, min_free_kbytes=300, watermark_scale_factor=400 + ) + path = cfg.WriteLinuxConfigYaml() + try: + with open(path) as f: + content = f.read() + self.assertIn('linuxConfig:', content) + self.assertIn('swapConfig:', content) + self.assertIn('enabled: true', content) + self.assertIn('vm.swappiness: "80"', content) + self.assertIn('vm.min_free_kbytes: "300"', content) + self.assertIn('vm.watermark_scale_factor: "400"', content) + finally: + cfg.CleanupYaml() + + def test_write_linux_config_yaml_no_lssd_has_no_disk_profile(self): + cfg = swap_config.GkeSwapConfig(lssd=False) + path = cfg.WriteLinuxConfigYaml() + try: + with open(path) as f: + content = f.read() + self.assertNotIn('dedicatedLocalSsdProfile', content) + self.assertNotIn('diskCount', content) + finally: + cfg.CleanupYaml() + + def test_write_linux_config_yaml_lssd_includes_disk_profile(self): + cfg = swap_config.GkeSwapConfig(lssd=True, lssd_count=2) + path = cfg.WriteLinuxConfigYaml() + try: + with open(path) as f: + content = f.read() + self.assertIn('dedicatedLocalSsdProfile:', content) + self.assertIn('diskCount: 2', content) + finally: + cfg.CleanupYaml() + + def test_write_linux_config_yaml_returns_existing_file_path(self): + cfg = swap_config.GkeSwapConfig() + path = cfg.WriteLinuxConfigYaml() + try: + self.assertTrue(os.path.isfile(path)) + finally: + cfg.CleanupYaml() + + # ── CleanupYaml ─────────────────────────────────────────────────────────── + + def test_cleanup_yaml_removes_tempfile(self): + cfg = swap_config.GkeSwapConfig() + path = cfg.WriteLinuxConfigYaml() + self.assertTrue(os.path.exists(path)) + cfg.CleanupYaml() + self.assertFalse(os.path.exists(path)) + + def test_cleanup_yaml_noop_before_write(self): + cfg = swap_config.GkeSwapConfig() + cfg.CleanupYaml() # Must not raise. + + def test_cleanup_yaml_noop_on_second_call(self): + cfg = swap_config.GkeSwapConfig() + cfg.WriteLinuxConfigYaml() + cfg.CleanupYaml() + cfg.CleanupYaml() # Second call must not raise. + + # ── ValidHyperdiskThroughput ────────────────────────────────────────────── + + def test_valid_hyperdisk_throughput_no_clamp_needed(self): + # min_throughput = ceil(160000 / 256) = 625; 2400 > 625 → unchanged. + cfg = swap_config.GkeSwapConfig( + boot_disk_iops=160000, boot_disk_throughput=2400 + ) + self.assertEqual(cfg.ValidHyperdiskThroughput(), 2400) + + def test_valid_hyperdisk_throughput_clamps_up(self): + # min_throughput = ceil(160000 / 256) = 625; 100 < 625 → clamp to 625. + cfg = swap_config.GkeSwapConfig( + boot_disk_iops=160000, boot_disk_throughput=100 + ) + self.assertEqual(cfg.ValidHyperdiskThroughput(), 625) + + def test_valid_hyperdisk_throughput_no_iops_returns_throughput(self): + # iops=0 means no constraint → return throughput unchanged. + cfg = swap_config.GkeSwapConfig(boot_disk_iops=0, boot_disk_throughput=500) + self.assertEqual(cfg.ValidHyperdiskThroughput(), 500) + + def test_valid_hyperdisk_throughput_both_zero_returns_zero(self): + cfg = swap_config.GkeSwapConfig(boot_disk_iops=0, boot_disk_throughput=0) + self.assertEqual(cfg.ValidHyperdiskThroughput(), 0) + + def test_valid_hyperdisk_throughput_exact_minimum_no_clamp(self): + # iops=256, throughput=1 → min=1; exactly at boundary → unchanged. + cfg = swap_config.GkeSwapConfig(boot_disk_iops=256, boot_disk_throughput=1) + self.assertEqual(cfg.ValidHyperdiskThroughput(), 1) + + +class EksSwapConfigTest(pkb_common_test_case.PkbCommonTestCase): + """Tests for EksSwapConfig: nodeadm YAML output and from_spec mapping.""" + + def _make_spec(self, **kwargs): + spec = mock.Mock() + spec.swappiness = kwargs.get('swappiness', 100) + spec.min_free_kbytes = kwargs.get('min_free_kbytes', 67584) + spec.watermark_scale_factor = kwargs.get('watermark_scale_factor', 500) + return spec + + # ── from_spec ───────────────────────────────────────────────────────────── + + def test_from_spec_maps_sysctl_attrs(self): + spec = self._make_spec( + swappiness=60, min_free_kbytes=400, watermark_scale_factor=200 + ) + cfg = swap_config.EksSwapConfig.from_spec(spec) + self.assertEqual(cfg.swappiness, 60) + self.assertEqual(cfg.min_free_kbytes, 400) + self.assertEqual(cfg.watermark_scale_factor, 200) + + def test_from_spec_eks_specific_attrs_use_defaults(self): + # from_spec does not accept memory_swap_behavior / fail_swap_on from spec. + cfg = swap_config.EksSwapConfig.from_spec(self._make_spec()) + self.assertEqual(cfg.memory_swap_behavior, 'LimitedSwap') + self.assertFalse(cfg.fail_swap_on) + + # ── GetNodeadmConfig ────────────────────────────────────────────────────── + + def test_get_nodeadm_config_api_version(self): + cfg = swap_config.EksSwapConfig() + self.assertIn('apiVersion: node.eks.aws/v1alpha1', cfg.GetNodeadmConfig()) + + def test_get_nodeadm_config_memory_swap_behavior(self): + cfg = swap_config.EksSwapConfig() + self.assertIn('memorySwapBehavior: LimitedSwap', cfg.GetNodeadmConfig()) + + def test_get_nodeadm_config_fail_swap_on_false(self): + cfg = swap_config.EksSwapConfig(fail_swap_on=False) + self.assertIn('failSwapOn: false', cfg.GetNodeadmConfig()) + + def test_get_nodeadm_config_sysctl_keys_present(self): + cfg = swap_config.EksSwapConfig() + output = cfg.GetNodeadmConfig() + self.assertIn('vm.swappiness:', output) + self.assertIn('vm.min_free_kbytes:', output) + self.assertIn('vm.watermark_scale_factor:', output) + + def test_get_nodeadm_config_reflects_custom_sysctl_values(self): + cfg = swap_config.EksSwapConfig( + swappiness=60, min_free_kbytes=400, watermark_scale_factor=200 + ) + output = cfg.GetNodeadmConfig() + self.assertIn('vm.swappiness: 60', output) + self.assertIn('vm.min_free_kbytes: 400', output) + self.assertIn('vm.watermark_scale_factor: 200', output) + + # ── _Create stub ────────────────────────────────────────────────────────── + + def test_create_logs_deferred_warning(self): + cfg = swap_config.EksSwapConfig() + with self.assertLogs(level='WARNING') as log_ctx: + cfg._Create() + combined = ' '.join(log_ctx.output).lower() + self.assertTrue( + 'stub' in combined or 'deferred' in combined, + msg=f'Expected "stub" or "deferred" in log output: {log_ctx.output}', + ) + + +if __name__ == '__main__': + unittest.m \ No newline at end of file