Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@
from inspect import Parameter

from kubernetes.client import models as k8s # noqa: TC004
from kubernetes.client.api_client import ApiClient # noqa: TC004

from airflow.models.expandinput import SchedulerExpandInput
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator # noqa: TC004
from airflow.sdk import BaseOperatorLink
from airflow.sdk.definitions._internal.node import DAGNode as SDKDAGNode
from airflow.sdk.types import Operator as SdkOperator
Expand Down Expand Up @@ -494,7 +494,7 @@ def serialize(
and _has_kubernetes(attempt_import=True)
and isinstance(var, k8s.V1Pod)
):
json_pod = PodGenerator.serialize_pod(var)
json_pod = ApiClient().sanitize_for_serialization(var)
return cls._encode(json_pod, type_=DAT.POD)
elif isinstance(var, OutletEventAccessors):
return cls._encode(
Expand Down Expand Up @@ -650,9 +650,9 @@ def deserialize(cls, encoded_var: Any) -> Any:
if not _has_kubernetes(attempt_import=True):
raise RuntimeError(
"Cannot deserialize POD objects without kubernetes libraries. "
"Please install the cncf.kubernetes provider."
"Please install the `kubernetes` package."
)
pod = PodGenerator.deserialize_model_dict(var)
pod = ApiClient()._ApiClient__deserialize_model(var, k8s.V1Pod)
return pod
elif type_ == DAT.TIMEDELTA:
return datetime.timedelta(seconds=var)
Expand Down Expand Up @@ -2185,11 +2185,10 @@ def _has_kubernetes(attempt_import: bool = False) -> bool:
# Loading kube modules is expensive, so delay it until the last moment
try:
from kubernetes.client import models as k8s

from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from kubernetes.client.api_client import ApiClient

globals()["k8s"] = k8s
globals()["PodGenerator"] = PodGenerator
globals()["ApiClient"] = ApiClient
return True
except ImportError:
return False
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None:
if not isinstance(pod, V1Pod):
return None
try:
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from kubernetes.client.api_client import ApiClient

# now we actually reserialize / deserialize the pod
pod_dict = sanitize_for_serialization(pod)
return PodGenerator.deserialize_model_dict(pod_dict)
return ApiClient()._ApiClient__deserialize_model(pod_dict, V1Pod)
except Exception:
return None

Expand Down
35 changes: 33 additions & 2 deletions airflow-core/tests/unit/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ def test_has_kubernetes_uses_existing_import(self):
def test_serialize_v1pod_attempts_import_before_serializing(self, monkeypatch):
"""Regression test: V1Pod serialization must call _has_kubernetes(attempt_import=True)."""
k8s = pytest.importorskip("kubernetes.client.models")
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from kubernetes.client.api_client import ApiClient

calls = []

Expand All @@ -1312,7 +1312,7 @@ def fake_has_kubernetes(*, attempt_import=False):

monkeypatch.setattr(serialized_objects, "_has_kubernetes", fake_has_kubernetes)
monkeypatch.setattr(serialized_objects, "k8s", k8s, raising=False)
monkeypatch.setattr(serialized_objects, "PodGenerator", PodGenerator, raising=False)
monkeypatch.setattr(serialized_objects, "ApiClient", ApiClient, raising=False)

pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
result = BaseSerialization.serialize(pod)
Expand All @@ -1321,6 +1321,37 @@ def fake_has_kubernetes(*, attempt_import=False):
assert result.get(Encoding.TYPE) == DAT.POD, "V1Pod should have type DAT.POD"
assert True in calls

def test_v1pod_serde_without_cncf_kubernetes_provider(self, monkeypatch):
"""V1Pod ser/deser must work when the cncf.kubernetes provider is not installed.

Regression test for the K8s executor ``pod_override`` getting stringified during DAG
serialization on deployments that install ``kubernetes`` but not
``apache-airflow-providers-cncf-kubernetes``.
"""
k8s = pytest.importorskip("kubernetes.client.models")

# Simulate the cncf.kubernetes provider being unimportable. Setting a module to ``None``
# in ``sys.modules`` makes importing it raise ``ModuleNotFoundError``. We must block the
# exact module the old code imported (``...pod_generator``) and not just the parent
# package: if the submodule is already cached in ``sys.modules`` from an earlier test,
# ``from ...pod_generator import PodGenerator`` resolves the cached leaf without ever
# touching the parent, so blocking only the parent would not exercise the regression.
monkeypatch.setitem(sys.modules, "airflow.providers.cncf.kubernetes", None)
monkeypatch.setitem(sys.modules, "airflow.providers.cncf.kubernetes.pod_generator", None)
_has_kubernetes.cache_clear()

pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
encoded = BaseSerialization.serialize(pod)

assert isinstance(encoded, dict), "V1Pod should serialize to a dict, not a string"
assert encoded[Encoding.TYPE] == DAT.POD

decoded = BaseSerialization.deserialize(encoded)
assert isinstance(decoded, k8s.V1Pod)
assert decoded.metadata.name == "test-pod"

_has_kubernetes.cache_clear()


@pytest.mark.db_test
def test_serialized_dag_getitem_returns_task(dag_maker):
Expand Down
Loading