Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions perfkitbenchmarker/providers/azure/azure_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import base64
import json
import time
import logging
from typing import Any, List

Expand Down Expand Up @@ -573,6 +574,390 @@ def ApplyFusePVC(self, pvc_name: str) -> None:
)
logging.info('Successfully applied BlobFuse PVC.')

def _BuildNodePoolAddCmd(
self,
nodepool_config: container.BaseNodePoolConfig,
node_flags: list[str],
no_wait: bool = False,
) -> list[str]:
"""Builds the az aks nodepool add command.

Shared by CreateNodePool and CreateNodePoolAsync so the argument list
is never duplicated.

Args:
nodepool_config: The nodepool configuration.
node_flags: Flags produced by _GetNodeFlags (and version override).
no_wait: If True, appends --no-wait for the async variant.

Returns:
The complete az CLI command list.
"""
cmd = [
azure.AZURE_PATH,
'aks',
'nodepool',
'add',
'--cluster-name',
self.name,
'--name',
_AzureNodePoolName(nodepool_config.name),
'--labels',
f'pkb_nodepool={nodepool_config.name}',
]
if no_wait:
cmd.append('--no-wait')
return cmd + node_flags

def CreateNodePool(
self,
nodepool_config: container.BaseNodePoolConfig,
node_version: str | None = None,
) -> None:
"""Creates a single named node pool on the cluster."""
node_flags = self._GetNodeFlags(nodepool_config)
if node_version:
# Remove any --kubernetes-version _GetNodeFlags may have added from
# self.cluster_version, then always append the caller-supplied version.
while '--kubernetes-version' in node_flags:
idx = node_flags.index('--kubernetes-version')
node_flags.pop(idx)
node_flags.pop(idx)
node_flags += ['--kubernetes-version', node_version]
cmd = self._BuildNodePoolAddCmd(nodepool_config, node_flags)
_, stderr, retcode = vm_util.IssueCommand(
cmd, timeout=1800, raise_on_failure=False
)
if retcode:
raise errors.Resource.CreationError(stderr)

def DeleteNodePool(self, name: str) -> None:
"""Deletes the named node pool."""
cmd = [
azure.AZURE_PATH,
'aks',
'nodepool',
'delete',
'--cluster-name',
self.name,
'--name',
_AzureNodePoolName(name),
] + self.resource_group.args
self._RunCreateClusterCmd(cmd)

def UpgradeNodePool(self, name: str, target_version: str) -> None:
"""Upgrades the named node pool to target_version."""
cmd = [
azure.AZURE_PATH,
'aks',
'nodepool',
'upgrade',
'--cluster-name',
self.name,
'--name',
_AzureNodePoolName(name),
'--kubernetes-version',
target_version,
] + self.resource_group.args
vm_util.IssueCommand(cmd, timeout=1800)

def UpdateCluster(self) -> None:
"""Real cluster-level update via a unique-timestamp tag change.

Triggers a control-plane operation (cluster-scoped, not pool-scoped) by
updating the cluster tags. Always succeeds because the tag value changes
every call.
"""
cmd = [
azure.AZURE_PATH,
'aks',
'update',
'--name',
self.name,
'--tags',
f'k8s-mgmt-ts={int(time.time())}',
] + self.resource_group.args
vm_util.IssueCommand(cmd, timeout=1800)

# ---- Async variants (return opaque handles) -------------------------------

def CreateNodePoolAsync(
self,
nodepool_config: container.BaseNodePoolConfig,
node_version: str | None = None,
) -> str:
"""Initiates AKS nodepool create; returns np_succeeded handle."""
node_flags = self._GetNodeFlags(nodepool_config)
if node_version:
# Remove any --kubernetes-version _GetNodeFlags may have added from
# self.cluster_version, then always append the caller-supplied version.
while '--kubernetes-version' in node_flags:
idx = node_flags.index('--kubernetes-version')
node_flags.pop(idx)
node_flags.pop(idx)
node_flags += ['--kubernetes-version', node_version]
cmd = self._BuildNodePoolAddCmd(nodepool_config, node_flags, no_wait=True)
# fix: raise timeout to 600s (AKS can take >300s to accept a
# --no-wait request under concurrent load) and retry on transient errors
# that indicate the cluster is temporarily at its concurrent-op or
# pool-count limit.
retryable_errors = (
'OperationNotAllowed',
'ConflictingOperationInProgress',
'MaxAgentPoolCountReached',
)

@vm_util.Retry(
retryable_exceptions=(errors.Resource.RetryableCreationError,),
max_retries=5,
poll_interval=30,
log_errors=True,
)
def IssueWithRetry():
"""Issues create-nodepool command with retry on transient errors."""
_, stderr, retcode = vm_util.IssueCommand(
cmd, timeout=600, raise_on_failure=False
)
if retcode:
if any(e in stderr for e in retryable_errors):
raise errors.Resource.RetryableCreationError(stderr)
raise errors.Resource.CreationError(stderr)

IssueWithRetry()
return f'np_succeeded:{_AzureNodePoolName(nodepool_config.name)}'

def UpgradeNodePoolAsync(self, name: str, target_version: str) -> str:
"""Initiates AKS nodepool upgrade; returns np_succeeded handle."""
cmd = [
azure.AZURE_PATH,
'aks',
'nodepool',
'upgrade',
'--cluster-name',
self.name,
'--name',
_AzureNodePoolName(name),
'--kubernetes-version',
target_version,
'--no-wait',
] + self.resource_group.args
# fix: raise timeout to 600s — az aks nodepool upgrade --no-wait
# can take >300s to be accepted by Azure under concurrent load.
_, stderr, retcode = vm_util.IssueCommand(
cmd, timeout=600, raise_on_failure=False
)
if retcode:
raise errors.Resource.CreationError(stderr)
return f'np_succeeded:{_AzureNodePoolName(name)}'

def DeleteNodePoolAsync(self, name: str) -> str:
"""Initiates AKS nodepool delete; returns np_gone handle."""
cmd = [
azure.AZURE_PATH,
'aks',
'nodepool',
'delete',
'--cluster-name',
self.name,
'--name',
_AzureNodePoolName(name),
'--no-wait',
] + self.resource_group.args
# fix: raise timeout to 600s and treat NotFound as success.
# A pool that never existed or was already removed is the desired end-state
# for a delete — raising CreationError here caused all delete phases to
# fail for any pool whose create had previously failed.
_, stderr, retcode = vm_util.IssueCommand(
cmd, timeout=600, raise_on_failure=False
)
if retcode:
if 'NotFound' in stderr or 'not found' in stderr.lower():
logging.info(
'[AKS] DeleteNodePoolAsync: %s already gone — treating as success',
_AzureNodePoolName(name),
)
return f'np_gone:{_AzureNodePoolName(name)}'
raise errors.Resource.CreationError(stderr)
return f'np_gone:{_AzureNodePoolName(name)}'

def UpdateClusterAsync(self) -> str:
"""Triggers a tag update on the cluster; returns 'cluster_succeeded'.

Updating a cluster tag is a lightweight cluster-scoped control-plane
operation. The tag value changes on every call (unix timestamp) so it
is always a real change, giving a meaningful overlap window for
Scenario B concurrent NodePool creates.
"""
cmd = [
azure.AZURE_PATH,
'aks',
'update',
'--name',
self.name,
'--tags',
f'k8s-mgmt-ts={int(time.time())}',
'--no-wait',
] + self.resource_group.args
_, stderr, retcode = vm_util.IssueCommand(
cmd, timeout=300, raise_on_failure=False
)
if retcode:
raise errors.Resource.CreationError(stderr)
return 'cluster_succeeded'

def ResolveNodePoolVersions(self) -> tuple[str, str]:
"""Returns (initial, target) AKS node pool versions.

Uses cluster_version (already set) rather than querying kubectl.
initial = N-1 (adjacent minor below cluster version)
target = N (cluster version = latest)
"""
cluster_ver = self.cluster_version or self.k8s_version
parts = cluster_ver.lstrip('v').split('.')
major, minor = int(parts[0]), int(parts[1])
target = f'{major}.{minor}'
initial = f'{major}.{minor - 1}'
logging.info(
'[AKS] ResolveNodePoolVersions: cluster=%s initial=%s target=%s',
cluster_ver,
initial,
target,
)
return initial, target

def _WaitForProvisioningState(
self,
cmd: list[str],
resource_desc: str,
not_found_is_success: bool = False,
retryable_exception_type=errors.Resource.RetryableCreationError,
) -> None:
"""Polls an AKS resource until it reaches Succeeded or a terminal error.

Issues cmd on each poll (with a 120 s per-call timeout) and inspects
the provisioningState TSV output. Terminal outcomes:

Succeeded → return normally.
Failed → raise errors.Resource.CreationError.
NotFound (rc!=0) → return if not_found_is_success, else raise
errors.Resource.CreationError.
Other state → raise retryable_exception_type (retried by Retry).

Args:
cmd: The az CLI command to run on each poll.
resource_desc: Human-readable label for log / error messages.
not_found_is_success: True when the desired end-state is gone (delete).
retryable_exception_type: Exception class used for in-progress states;
defaults to RetryableCreationError, pass RetryableDeletionError for
delete waits.
"""

@vm_util.Retry(
poll_interval=5,
fuzz=0,
timeout=3600,
retryable_exceptions=(retryable_exception_type,),
)
def _Poll():
out, err, rc = vm_util.IssueCommand(
cmd, raise_on_failure=False, timeout=120
)
if rc:
is_not_found = (
'NotFound' in (err or '') or 'not found' in (err or '').lower()
)
if is_not_found and not_found_is_success:
return
if is_not_found:
raise errors.Resource.CreationError(
f'{resource_desc} not found: {err}'
)
raise retryable_exception_type(err)
status = out.strip()
if status == 'Succeeded':
return
if status == 'Failed':
raise errors.Resource.CreationError(
f'{resource_desc} ended in Failed'
)
raise retryable_exception_type(f'{resource_desc} state={status}')

_Poll()

def WaitForOperation(self, op_handle: str) -> None:
"""Polls AKS resources until the expected terminal state is observed.

Args:
op_handle: Opaque string returned by the *Async methods. Format:

'np_succeeded:<azure-nodepool-name>'
Wait for the nodepool to reach provisioningState=Succeeded.
Example: 'np_succeeded:pkbma001'

'np_gone:<azure-nodepool-name>'
Wait for the nodepool to be deleted (NotFound = success).
Example: 'np_gone:pkbma001'

'cluster_succeeded'
Wait for the cluster to reach provisioningState=Succeeded.
"""
kind, _, name = op_handle.partition(':')

if kind == 'np_succeeded':
self._WaitForProvisioningState(
cmd=[
azure.AZURE_PATH,
'aks',
'nodepool',
'show',
'--cluster-name',
self.name,
'--name',
name,
'--query',
'provisioningState',
'--output',
'tsv',
]
+ self.resource_group.args,
resource_desc=f'nodepool {name}',
)
elif kind == 'np_gone':
self._WaitForProvisioningState(
cmd=[
azure.AZURE_PATH,
'aks',
'nodepool',
'show',
'--cluster-name',
self.name,
'--name',
name,
]
+ self.resource_group.args,
resource_desc=f'nodepool {name}',
not_found_is_success=True,
retryable_exception_type=errors.Resource.RetryableDeletionError,
)
elif kind == 'cluster_succeeded':
self._WaitForProvisioningState(
cmd=[
azure.AZURE_PATH,
'aks',
'show',
'--name',
self.name,
'--query',
'provisioningState',
'--output',
'tsv',
]
+ self.resource_group.args,
resource_desc='cluster',
)
else:
raise ValueError(f'Unknown AKS op handle: {op_handle!r}')


class AksAutomaticCluster(AksCluster):
"""Class representing an AKS Automatic cluster, which has managed node pools.
Expand Down
Loading