From fe8708e17ebbaac3e82c46e869a6677a532321b2 Mon Sep 17 00:00:00 2001 From: Saksham Kapoor Date: Mon, 1 Jun 2026 20:12:03 -0400 Subject: [PATCH 1/3] fix: drain result_queue in LocalExecutor to prevent process join deadlock --- .../src/airflow/executors/local_executor.py | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 74ff0889b732b..6e2707db21a05 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -238,10 +238,12 @@ def sync(self) -> None: self._check_workers() def _read_results(self): - while not self.result_queue.empty(): - key, state, exc = self.result_queue.get() - - self.change_state(key, state) + try: + while not self.result_queue.empty(): + key, state, exc = self.result_queue.get() + self.change_state(key, state) + except (OSError, EOFError): + self.log.exception("Error reading from result queue") def end(self) -> None: """End the executor.""" @@ -257,10 +259,23 @@ def end(self) -> None: # Send the shutdown message once for each alive worker if proc.is_alive(): self.activity_queue.put(None) + + # To prevent deadlock, we should consume results from result_queue while waiting for processes to join. + # Otherwise, a worker blocked on putting results into a full result_queue pipe will never exit, + # and an unbounded proc.join() will hang the scheduler indefinitely. for proc in self.workers.values(): if proc.is_alive(): - proc.join() + try: + while proc.is_alive(): + self._read_results() + proc.join(timeout=0.05) + except (KeyboardInterrupt, SystemExit): + self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.") + for p in self.workers.values(): + if p.is_alive(): + p.terminate() + raise proc.close() # Process any extra results before closing @@ -270,7 +285,11 @@ def end(self) -> None: self.result_queue.close() def terminate(self): - """Terminate the executor is not doing anything.""" + """Forcefully terminate all worker processes under control of the executor.""" + self.log.info("Terminating all LocalExecutor worker processes.") + for proc in self.workers.values(): + if proc.is_alive(): + proc.terminate() def _process_workloads(self, workload_list): for workload in workload_list: From 5bdc31aa4449596a25ea6d89f8f3f335601fcabc Mon Sep 17 00:00:00 2001 From: Saksham Kapoor Date: Mon, 1 Jun 2026 20:30:32 -0400 Subject: [PATCH 2/3] fix: resolve simplequeue API mismatch, zombie processes, and close queues in finally block --- .../src/airflow/executors/local_executor.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 6e2707db21a05..77d13ac710f04 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -259,30 +259,36 @@ def end(self) -> None: # Send the shutdown message once for each alive worker if proc.is_alive(): self.activity_queue.put(None) - + # To prevent deadlock, we should consume results from result_queue while waiting for processes to join. # Otherwise, a worker blocked on putting results into a full result_queue pipe will never exit, # and an unbounded proc.join() will hang the scheduler indefinitely. - - for proc in self.workers.values(): - if proc.is_alive(): - try: + try: + for proc in self.workers.values(): + if proc.is_alive(): while proc.is_alive(): self._read_results() proc.join(timeout=0.05) - except (KeyboardInterrupt, SystemExit): - self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.") - for p in self.workers.values(): - if p.is_alive(): - p.terminate() - raise - proc.close() - - # Process any extra results before closing - self._read_results() + except (KeyboardInterrupt, SystemExit): + self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.") + for p in self.workers.values(): + if p.is_alive(): + p.terminate() + p.join(timeout=0.2) + raise + finally: + # Process any extra results before closing + self._read_results() + + for proc in self.workers.values(): + try: + proc.close() + except ValueError: + # Raised if the process is still running/alive + pass - self.activity_queue.close() - self.result_queue.close() + self.activity_queue.close() + self.result_queue.close() def terminate(self): """Forcefully terminate all worker processes under control of the executor.""" @@ -290,6 +296,7 @@ def terminate(self): for proc in self.workers.values(): if proc.is_alive(): proc.terminate() + proc.join(timeout=0.2) def _process_workloads(self, workload_list): for workload in workload_list: From 33cd755cde7cb27ec308f14d57cd3f7321b25658 Mon Sep 17 00:00:00 2001 From: Saksham Kapoor Date: Wed, 3 Jun 2026 21:47:15 -0400 Subject: [PATCH 3/3] style: fix ruff linting violations (SIM105 & D401) in local_executor.py --- airflow-core/src/airflow/executors/local_executor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index 77d13ac710f04..67d4393fe6053 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -25,6 +25,7 @@ from __future__ import annotations +import contextlib import ctypes import multiprocessing import multiprocessing.sharedctypes @@ -281,17 +282,14 @@ def end(self) -> None: self._read_results() for proc in self.workers.values(): - try: + with contextlib.suppress(ValueError): proc.close() - except ValueError: - # Raised if the process is still running/alive - pass self.activity_queue.close() self.result_queue.close() def terminate(self): - """Forcefully terminate all worker processes under control of the executor.""" + """Terminate all worker processes under control of the executor forcefully.""" self.log.info("Terminating all LocalExecutor worker processes.") for proc in self.workers.values(): if proc.is_alive():