Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
from typing import IO, TYPE_CHECKING
from typing import IO, TYPE_CHECKING, NoReturn

import anyio
from aiofiles import open as aio_open
Expand Down Expand Up @@ -422,7 +422,7 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.MAINTENANCE_MODE
return EdgeWorkerState.IDLE

def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Path) -> int:
def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Path) -> NoReturn:
"""Run a task by calling the supervisor directly (executes inside a forked child process)."""
_reset_parent_signal_state()

Expand All @@ -435,7 +435,7 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa
if AIRFLOW_V_3_3_PLUS:
from airflow.executors.base_executor import BaseExecutor

BaseExecutor.run_workload(
exit_code = BaseExecutor.run_workload(
workload=workload,
server=self._execution_api_server_url,
)
Expand All @@ -448,7 +448,7 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa
f"dag_id={ti.dag_id} task_id={ti.task_id} run_id={ti.run_id} map_index={ti.map_index} "
f"try_number={ti.try_number}"
)
supervise(
exit_code = supervise(
# This is the "wrong" ti type, but it duck types the same. TODO: Create a protocol for this.
# Same like in airflow/executors/local_executor.py:_execute_workload()
ti=ti, # type: ignore[arg-type]
Expand All @@ -458,12 +458,17 @@ def _run_job_via_supervisor(self, workload: ExecuteTypeBody, error_file_path: Pa
server=self._execution_api_server_url,
log_path=workload.log_path,
)
return 0
except Exception:
logger.exception("Task execution failed")
with suppress(Exception):
error_file_path.write_text(traceback.format_exc())
return 1
exit_code = 1

# Exit explicitly so the real exit code propagates to the parent process.
# the child would always exit 0 without this, so a failed supervisor
# (non-zero ``exit_code``, e.g. when ``run_workload`` reports a task failure without raising)
# would be misreported as success by the parent's ``Job.is_success`` check.
sys.exit(exit_code)

def _launch_job_subprocess(self, workload: ExecuteTypeBody) -> tuple[subprocess.Popen, Path]:
"""Launch workload via a fresh Python interpreter (subprocess.Popen)."""
Expand Down Expand Up @@ -700,6 +705,7 @@ async def fetch_and_run_job(self) -> None:
break
await self._push_logs_in_chunks(job)

logger.info("The code is changed: %s", job.edge_job.identifier)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
Expand Down
76 changes: 62 additions & 14 deletions providers/edge3/tests/unit/edge3/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,15 @@ async def test_supervise_launch_pre_3_3(
worker_with_job: EdgeWorker,
tmp_path: Path,
):
mock_supervise.return_value = 0
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)
with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)

assert result == 0
# The child process must exit 0 on success so the parent's Job.is_success check passes.
assert exc_info.value.code == 0
assert not error_file_path.exists() # no error written on success

@patch("airflow.executors.base_executor.BaseExecutor.run_workload")
Expand All @@ -382,12 +385,15 @@ async def test_supervise_launch(
worker_with_job: EdgeWorker,
tmp_path: Path,
):
mock_run_workload.return_value = 0
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)
with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)

assert result == 0
# The child process must exit 0 on success so the parent's Job.is_success check passes.
assert exc_info.value.code == 0
assert not error_file_path.exists() # no error written on success

@patch("airflow.sdk.execution_time.supervisor.supervise")
Expand All @@ -403,9 +409,10 @@ async def test_supervise_launch_fail_pre_3_3(
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)
with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)

assert result == 1
assert exc_info.value.code == 1
assert error_file_path.exists()
assert "Supervise failed" in error_file_path.read_text()

Expand All @@ -424,12 +431,52 @@ async def test_supervise_launch_fail(
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"
result = worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)
with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
worker_with_job._run_job_via_supervisor(edge_job.command, error_file_path)

assert result == 1
assert exc_info.value.code == 1
assert error_file_path.exists()
assert "Supervise failed" in error_file_path.read_text()

@patch("airflow.executors.base_executor.BaseExecutor.run_workload")
@pytest.mark.skipif(not hasattr(os, "fork"), reason="Requires the fork start method")
@pytest.mark.skipif(
not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where BaseExecutor.run_workload is used"
)
def test_fork_child_exits_nonzero_when_supervisor_raises(
self,
mock_run_workload,
worker_with_job: EdgeWorker,
tmp_path: Path,
):
"""
A supervisor exception must make the forked child terminate with a non-zero exit code so the
parent's Job.is_success reports the failure.
"""
import multiprocessing

mock_run_workload.side_effect = RuntimeError("supervisor crashed")
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
error_file_path = tmp_path / "fork-error.log"

# Use the fork context explicitly so the child inherits the patched run_workload in memory.
process = multiprocessing.get_context("fork").Process(
target=worker_with_job._run_job_via_supervisor,
kwargs={"workload": edge_job.command, "error_file_path": error_file_path},
)
process.start()
process.join(timeout=30)

# With ``return 1`` this would have been 0; ``sys.exit(1)`` propagates the non-zero code.
assert process.exitcode == 1
# And the parent's success check therefore reports failure instead of a false success.
job = Job(edge_job=edge_job, process=process, logfile=tmp_path / "file.log") # type: ignore[arg-type]
assert job.is_success is False
# Confirm the non-zero exit came from the supervisor failure path (not an unrelated early error).
assert error_file_path.exists()
assert "supervisor crashed" in error_file_path.read_text()

@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
@pytest.mark.asyncio
Expand Down Expand Up @@ -1148,14 +1195,15 @@ def test_run_job_via_supervisor_resets_signals_before_supervise(self, tmp_path):
mock.patch("os.setpgrp", side_effect=lambda: order("setpgrp")),
mock.patch(
"airflow.executors.base_executor.BaseExecutor.run_workload",
side_effect=lambda **_: order("supervise"),
side_effect=lambda **_: (order("supervise"), 0)[1],
),
):
rc = worker._run_job_via_supervisor(
workload=self._make_workload(),
error_file_path=tmp_path / "fork-error.log",
)
assert rc == 0
with pytest.raises(SystemExit) as exc_info:
worker._run_job_via_supervisor(
workload=self._make_workload(),
error_file_path=tmp_path / "fork-error.log",
)
assert exc_info.value.code == 0
assert [c.args[0] for c in order.call_args_list] == ["reset", "setpgrp", "supervise"]

def test_shutdown_handler_is_idempotent(self, worker_with_one_job):
Expand Down
Loading