Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from __future__ import annotations

import contextlib
import ctypes
import multiprocessing
import multiprocessing.sharedctypes
Expand Down Expand Up @@ -238,10 +239,12 @@ def sync(self) -> None:
self._check_workers()

def _read_results(self):
while not self.result_queue.empty():
key, state, exc = self.result_queue.get()

self.change_state(key, state)
try:
while not self.result_queue.empty():
key, state, exc = self.result_queue.get()
self.change_state(key, state)
except (OSError, EOFError):
self.log.exception("Error reading from result queue")
Comment thread
SakshamKapoor2911 marked this conversation as resolved.

def end(self) -> None:
"""End the executor."""
Expand All @@ -258,19 +261,40 @@ def end(self) -> None:
if proc.is_alive():
self.activity_queue.put(None)

for proc in self.workers.values():
if proc.is_alive():
proc.join()
proc.close()
# To prevent deadlock, we should consume results from result_queue while waiting for processes to join.
# Otherwise, a worker blocked on putting results into a full result_queue pipe will never exit,
# and an unbounded proc.join() will hang the scheduler indefinitely.
try:
for proc in self.workers.values():
if proc.is_alive():
while proc.is_alive():
self._read_results()
proc.join(timeout=0.05)
except (KeyboardInterrupt, SystemExit):
self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.")
for p in self.workers.values():
if p.is_alive():
p.terminate()
p.join(timeout=0.2)
raise
finally:
# Process any extra results before closing
self._read_results()

# Process any extra results before closing
self._read_results()
for proc in self.workers.values():
with contextlib.suppress(ValueError):
proc.close()

self.activity_queue.close()
self.result_queue.close()
self.activity_queue.close()
self.result_queue.close()

def terminate(self):
"""Terminate the executor is not doing anything."""
"""Terminate all worker processes under control of the executor forcefully."""
self.log.info("Terminating all LocalExecutor worker processes.")
for proc in self.workers.values():
if proc.is_alive():
proc.terminate()
Comment thread
SakshamKapoor2911 marked this conversation as resolved.
proc.join(timeout=0.2)

def _process_workloads(self, workload_list):
for workload in workload_list:
Expand Down