From 9dcd72088ead529969e88dccfef85b9080ad8b46 Mon Sep 17 00:00:00 2001 From: Dheeraj Turaga Date: Sat, 25 Apr 2026 09:29:05 -0500 Subject: [PATCH] Fix Edge worker reporting crashed tasks as SUCCESS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _run_job_via_supervisor returned 1 on caught exception, but multiprocessing.Process ignores the target's return value — exit code was always 0, so Job.is_success was True and crashed tasks were reported to the central server as SUCCESS despite the Task execution failed traceback in the log. Use sys.exit(1) so the subprocess exits with code 1 and fetch_and_run_job takes the FAILED branch. --- providers/edge3/src/airflow/providers/edge3/cli/worker.py | 2 +- providers/edge3/tests/unit/edge3/cli/test_worker.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index f5396127219eb..80d120a99441f 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -291,7 +291,7 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) - except Exception as e: logger.exception("Task execution failed") results_queue.put(e) - return 1 + sys.exit(1) def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]: # Improvement: Use frozen GC to prevent child process from copying unnecessary memory diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 62565cdf98cac..d62380352e91b 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -231,9 +231,10 @@ async def test_supervise_launch_fail( mock_supervise.side_effect = Exception("Supervise failed") edge_job = worker_with_job.jobs.pop().edge_job q = mock.MagicMock() - result = worker_with_job._run_job_via_supervisor(edge_job.command, q) + with pytest.raises(SystemExit) as exc_info: + worker_with_job._run_job_via_supervisor(edge_job.command, q) - assert result == 1 + assert exc_info.value.code == 1 q.put.assert_called_once() @patch("airflow.providers.edge3.cli.worker.jobs_fetch")