diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 74ff0889b732b..67d4393fe6053 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -25,6 +25,7 @@ from __future__ import annotations +import contextlib import ctypes import multiprocessing import multiprocessing.sharedctypes @@ -238,10 +239,12 @@ def sync(self) -> None: self._check_workers() def _read_results(self): - while not self.result_queue.empty(): - key, state, exc = self.result_queue.get() - - self.change_state(key, state) + try: + while not self.result_queue.empty(): + key, state, exc = self.result_queue.get() + self.change_state(key, state) + except (OSError, EOFError): + self.log.exception("Error reading from result queue") def end(self) -> None: """End the executor.""" @@ -258,19 +261,40 @@ def end(self) -> None: if proc.is_alive(): self.activity_queue.put(None) - for proc in self.workers.values(): - if proc.is_alive(): - proc.join() - proc.close() + # To prevent deadlock, we should consume results from result_queue while waiting for processes to join. + # Otherwise, a worker blocked on putting results into a full result_queue pipe will never exit, + # and an unbounded proc.join() will hang the scheduler indefinitely. + try: + for proc in self.workers.values(): + if proc.is_alive(): + while proc.is_alive(): + self._read_results() + proc.join(timeout=0.05) + except (KeyboardInterrupt, SystemExit): + self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.") + for p in self.workers.values(): + if p.is_alive(): + p.terminate() + p.join(timeout=0.2) + raise + finally: + # Process any extra results before closing + self._read_results() - # Process any extra results before closing - self._read_results() + for proc in self.workers.values(): + with contextlib.suppress(ValueError): + proc.close() - self.activity_queue.close() - self.result_queue.close() + self.activity_queue.close() + self.result_queue.close() def terminate(self): - """Terminate the executor is not doing anything.""" + """Terminate all worker processes under control of the executor forcefully.""" + self.log.info("Terminating all LocalExecutor worker processes.") + for proc in self.workers.values(): + if proc.is_alive(): + proc.terminate() + proc.join(timeout=0.2) def _process_workloads(self, workload_list): for workload in workload_list: