diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index a98751de590a1..a03e77dc4d45b 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -114,9 +114,9 @@ from inspect import Parameter from kubernetes.client import models as k8s # noqa: TC004 + from kubernetes.client.api_client import ApiClient # noqa: TC004 from airflow.models.expandinput import SchedulerExpandInput - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator # noqa: TC004 from airflow.sdk import BaseOperatorLink from airflow.sdk.definitions._internal.node import DAGNode as SDKDAGNode from airflow.sdk.types import Operator as SdkOperator @@ -494,7 +494,7 @@ def serialize( and _has_kubernetes(attempt_import=True) and isinstance(var, k8s.V1Pod) ): - json_pod = PodGenerator.serialize_pod(var) + json_pod = ApiClient().sanitize_for_serialization(var) return cls._encode(json_pod, type_=DAT.POD) elif isinstance(var, OutletEventAccessors): return cls._encode( @@ -650,9 +650,9 @@ def deserialize(cls, encoded_var: Any) -> Any: if not _has_kubernetes(attempt_import=True): raise RuntimeError( "Cannot deserialize POD objects without kubernetes libraries. " - "Please install the cncf.kubernetes provider." + "Please install the `kubernetes` package." ) - pod = PodGenerator.deserialize_model_dict(var) + pod = ApiClient()._ApiClient__deserialize_model(var, k8s.V1Pod) return pod elif type_ == DAT.TIMEDELTA: return datetime.timedelta(seconds=var) @@ -2185,11 +2185,10 @@ def _has_kubernetes(attempt_import: bool = False) -> bool: # Loading kube modules is expensive, so delay it until the last moment try: from kubernetes.client import models as k8s - - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator + from kubernetes.client.api_client import ApiClient globals()["k8s"] = k8s - globals()["PodGenerator"] = PodGenerator + globals()["ApiClient"] = ApiClient return True except ImportError: return False diff --git a/airflow-core/src/airflow/utils/sqlalchemy.py b/airflow-core/src/airflow/utils/sqlalchemy.py index a767c65fad918..c62161fabccd4 100644 --- a/airflow-core/src/airflow/utils/sqlalchemy.py +++ b/airflow-core/src/airflow/utils/sqlalchemy.py @@ -251,11 +251,11 @@ def ensure_pod_is_valid_after_unpickling(pod: V1Pod) -> V1Pod | None: if not isinstance(pod, V1Pod): return None try: - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator + from kubernetes.client.api_client import ApiClient # now we actually reserialize / deserialize the pod pod_dict = sanitize_for_serialization(pod) - return PodGenerator.deserialize_model_dict(pod_dict) + return ApiClient()._ApiClient__deserialize_model(pod_dict, V1Pod) except Exception: return None diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index c6d1fe0be80e5..b3e34ca76fc65 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -1302,7 +1302,7 @@ def test_has_kubernetes_uses_existing_import(self): def test_serialize_v1pod_attempts_import_before_serializing(self, monkeypatch): """Regression test: V1Pod serialization must call _has_kubernetes(attempt_import=True).""" k8s = pytest.importorskip("kubernetes.client.models") - from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator + from kubernetes.client.api_client import ApiClient calls = [] @@ -1312,7 +1312,7 @@ def fake_has_kubernetes(*, attempt_import=False): monkeypatch.setattr(serialized_objects, "_has_kubernetes", fake_has_kubernetes) monkeypatch.setattr(serialized_objects, "k8s", k8s, raising=False) - monkeypatch.setattr(serialized_objects, "PodGenerator", PodGenerator, raising=False) + monkeypatch.setattr(serialized_objects, "ApiClient", ApiClient, raising=False) pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod")) result = BaseSerialization.serialize(pod) @@ -1321,6 +1321,37 @@ def fake_has_kubernetes(*, attempt_import=False): assert result.get(Encoding.TYPE) == DAT.POD, "V1Pod should have type DAT.POD" assert True in calls + def test_v1pod_serde_without_cncf_kubernetes_provider(self, monkeypatch): + """V1Pod ser/deser must work when the cncf.kubernetes provider is not installed. + + Regression test for the K8s executor ``pod_override`` getting stringified during DAG + serialization on deployments that install ``kubernetes`` but not + ``apache-airflow-providers-cncf-kubernetes``. + """ + k8s = pytest.importorskip("kubernetes.client.models") + + # Simulate the cncf.kubernetes provider being unimportable. Setting a module to ``None`` + # in ``sys.modules`` makes importing it raise ``ModuleNotFoundError``. We must block the + # exact module the old code imported (``...pod_generator``) and not just the parent + # package: if the submodule is already cached in ``sys.modules`` from an earlier test, + # ``from ...pod_generator import PodGenerator`` resolves the cached leaf without ever + # touching the parent, so blocking only the parent would not exercise the regression. + monkeypatch.setitem(sys.modules, "airflow.providers.cncf.kubernetes", None) + monkeypatch.setitem(sys.modules, "airflow.providers.cncf.kubernetes.pod_generator", None) + _has_kubernetes.cache_clear() + + pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod")) + encoded = BaseSerialization.serialize(pod) + + assert isinstance(encoded, dict), "V1Pod should serialize to a dict, not a string" + assert encoded[Encoding.TYPE] == DAT.POD + + decoded = BaseSerialization.deserialize(encoded) + assert isinstance(decoded, k8s.V1Pod) + assert decoded.metadata.name == "test-pod" + + _has_kubernetes.cache_clear() + @pytest.mark.db_test def test_serialized_dag_getitem_returns_task(dag_maker):