Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions airflow-core/src/airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ def sync(self) -> None:
self._check_workers()

def _read_results(self):
while not self.result_queue.empty():
key, state, exc = self.result_queue.get()

self.change_state(key, state)
try:
while not self.result_queue.empty():
key, state, exc = self.result_queue.get()
self.change_state(key, state)
except (OSError, EOFError):
self.log.exception("Error reading from result queue")
Comment thread
SakshamKapoor2911 marked this conversation as resolved.

def end(self) -> None:
"""End the executor."""
Expand All @@ -257,10 +259,23 @@ def end(self) -> None:
# Send the shutdown message once for each alive worker
if proc.is_alive():
self.activity_queue.put(None)

Comment thread
SakshamKapoor2911 marked this conversation as resolved.
Outdated
# To prevent deadlock, we should consume results from result_queue while waiting for processes to join.
# Otherwise, a worker blocked on putting results into a full result_queue pipe will never exit,
# and an unbounded proc.join() will hang the scheduler indefinitely.

for proc in self.workers.values():
if proc.is_alive():
proc.join()
try:
while proc.is_alive():
self._read_results()
proc.join(timeout=0.05)
except (KeyboardInterrupt, SystemExit):
self.log.error("KeyboardInterrupt received during shutdown. Force terminating workers.")
for p in self.workers.values():
if p.is_alive():
p.terminate()
raise
proc.close()
Comment thread
SakshamKapoor2911 marked this conversation as resolved.
Outdated

# Process any extra results before closing
Expand All @@ -270,7 +285,11 @@ def end(self) -> None:
self.result_queue.close()

def terminate(self):
"""Terminate the executor is not doing anything."""
"""Forcefully terminate all worker processes under control of the executor."""
self.log.info("Terminating all LocalExecutor worker processes.")
for proc in self.workers.values():
if proc.is_alive():
proc.terminate()
Comment thread
SakshamKapoor2911 marked this conversation as resolved.

def _process_workloads(self, workload_list):
for workload in workload_list:
Expand Down