diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 1a044fdde2e29..3ecb6396f2c99 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -32,7 +32,7 @@ from http import HTTPStatus from multiprocessing import Process from pathlib import Path -from typing import IO, TYPE_CHECKING +from typing import IO, TYPE_CHECKING, NoReturn import anyio from aiofiles import open as aio_open @@ -422,7 +422,7 @@ def _get_state(self) -> EdgeWorkerState: return EdgeWorkerState.MAINTENANCE_MODE return EdgeWorkerState.IDLE - def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Path) -> int: + def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Path) -> NoReturn: """Run a task by calling the supervisor directly (executes inside a forked child process).""" _reset_parent_signal_state() @@ -435,7 +435,7 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa if AIRFLOW_V_3_3_PLUS: from airflow.executors.base_executor import BaseExecutor - BaseExecutor.run_workload( + exit_code = BaseExecutor.run_workload( workload=workload, server=self._execution_api_server_url, ) @@ -448,7 +448,7 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa f"dag_id={ti.dag_id} task_id={ti.task_id} run_id={ti.run_id} map_index={ti.map_index} " f"try_number={ti.try_number}" ) - supervise( + exit_code = supervise( # This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this. # Same like in airflow/executors/local_executor.py:_execute_workload() ti=ti, # type: ignore[arg-type] @@ -458,12 +458,17 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa server=self._execution_api_server_url, log_path=workload.log_path, ) - return 0 except Exception: logger.exception("Task execution failed") with suppress(Exception): error_file_path.write_text(traceback.format_exc()) - return 1 + exit_code = 1 + + # Exit explicitly so the real exit code propagates to the parent process. + # the child would always exit 0 without this, so a failed supervisor + # (non-zero ``exit_code``, e.g. when ``run_workload`` reports a task failure without raising) + # would be misreported as success by the parent's ``Job.is_success`` check. + sys.exit(exit_code) def _launch_job_subprocess(self, workload: ExecuteTypeBody) -> tuple[subprocess.Popen, Path]: """Launch workload via a fresh Python interpreter (subprocess.Popen).""" @@ -700,6 +705,7 @@ async def fetch_and_run_job(self) -> None: break await self._push_logs_in_chunks(job) + logger.info("The code is changed: %s", job.edge_job.identifier) if job.is_success: logger.info("Job completed: %s", job.edge_job.identifier) await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS) diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 23a247d8a0857..bbbf2690faf4a 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -363,12 +363,15 @@ async def test_supervise_launch_pre_3_3( worker_with_job: EdgeWorker, tmp_path: Path, ): + mock_supervise.return_value = 0 worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution" edge_job = worker_with_job.jobs.pop().edge_job error_file_path = tmp_path / "fork-error.log" - result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) + with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info: + worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) - assert result == 0 + # The child process must exit 0 on success so the parent's Job.is_success check passes. + assert exc_info.value.code == 0 assert not error_file_path.exists() # no error written on success @patch("airflow.executors.base_executor.BaseExecutor.run_workload") @@ -382,12 +385,15 @@ async def test_supervise_launch( worker_with_job: EdgeWorker, tmp_path: Path, ): + mock_run_workload.return_value = 0 worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution" edge_job = worker_with_job.jobs.pop().edge_job error_file_path = tmp_path / "fork-error.log" - result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) + with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info: + worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) - assert result == 0 + # The child process must exit 0 on success so the parent's Job.is_success check passes. + assert exc_info.value.code == 0 assert not error_file_path.exists() # no error written on success @patch("airflow.sdk.execution_time.supervisor.supervise") @@ -403,9 +409,10 @@ async def test_supervise_launch_fail_pre_3_3( worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution" edge_job = worker_with_job.jobs.pop().edge_job error_file_path = tmp_path / "fork-error.log" - result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) + with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info: + worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) - assert result == 1 + assert exc_info.value.code == 1 assert error_file_path.exists() assert "Supervise failed" in error_file_path.read_text() @@ -424,12 +431,52 @@ async def test_supervise_launch_fail( worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution" edge_job = worker_with_job.jobs.pop().edge_job error_file_path = tmp_path / "fork-error.log" - result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) + with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info: + worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path) - assert result == 1 + assert exc_info.value.code == 1 assert error_file_path.exists() assert "Supervise failed" in error_file_path.read_text() + @patch("airflow.executors.base_executor.BaseExecutor.run_workload") + @pytest.mark.skipif(not hasattr(os, "fork"), reason="Requires the fork start method") + @pytest.mark.skipif( + not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where BaseExecutor.run_workload is used" + ) + def test_fork_child_exits_nonzero_when_supervisor_raises( + self, + mock_run_workload, + worker_with_job: EdgeWorker, + tmp_path: Path, + ): + """ + A supervisor exception must make the forked child terminate with a non-zero exit code so the + parent's Job.is_success reports the failure. + """ + import multiprocessing + + mock_run_workload.side_effect = RuntimeError("supervisor crashed") + worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution" + edge_job = worker_with_job.jobs.pop().edge_job + error_file_path = tmp_path / "fork-error.log" + + # Use the fork context explicitly so the child inherits the patched run_workload in memory. + process = multiprocessing.get_context("fork").Process( + target=worker_with_job._run_job_via_supervisor, + kwargs={"workload": edge_job.command, "error_file_path": error_file_path}, + ) + process.start() + process.join(timeout=30) + + # With ``return 1`` this would have been 0; ``sys.exit(1)`` propagates the non-zero code. + assert process.exitcode == 1 + # And the parent's success check therefore reports failure instead of a false success. + job = Job(edge_job=edge_job, process=process, logfile=tmp_path / "file.log") # type: ignore[arg-type] + assert job.is_success is False + # Confirm the non-zero exit came from the supervisor failure path (not an unrelated early error). + assert error_file_path.exists() + assert "supervisor crashed" in error_file_path.read_text() + @patch("airflow.providers.edge3.cli.worker.jobs_fetch") @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job") @pytest.mark.asyncio @@ -1148,14 +1195,15 @@ def test_run_job_via_supervisor_resets_signals_before_supervise(self, tmp_path): mock.patch("os.setpgrp", side_effect=lambda: order("setpgrp")), mock.patch( "airflow.executors.base_executor.BaseExecutor.run_workload", - side_effect=lambda **_: order("supervise"), + side_effect=lambda **_: (order("supervise"), 0)[1], ), ): - rc = worker._run_job_via_supervisor( - workload=self._make_workload(), - error_file_path=tmp_path / "fork-error.log", - ) - assert rc == 0 + with pytest.raises(SystemExit) as exc_info: + worker._run_job_via_supervisor( + workload=self._make_workload(), + error_file_path=tmp_path / "fork-error.log", + ) + assert exc_info.value.code == 0 assert [c.args[0] for c in order.call_args_list] == ["reset", "setpgrp", "supervise"] def test_shutdown_handler_is_idempotent(self, worker_with_one_job):