Skip to content

fix: drain result_queue in LocalExecutor to prevent process join dead…#67881

Open
SakshamKapoor2911 wants to merge 2 commits into
apache:mainfrom
SakshamKapoor2911:fix-local-executor-deadlock
Open

fix: drain result_queue in LocalExecutor to prevent process join dead…#67881
SakshamKapoor2911 wants to merge 2 commits into
apache:mainfrom
SakshamKapoor2911:fix-local-executor-deadlock

Conversation

@SakshamKapoor2911
Copy link
Copy Markdown

closes: #67870

Description

This PR fixes a deadlock in the LocalExecutor during executor shutdown (end()).

Currently, the executor calls proc.join() on active worker processes. However, if a worker has written enough results to the result_queue to fill the OS-level pipe buffer (typically 64KB), the worker process blocks indefinitely on its put() call.
Because the parent scheduler process is blocked on the unbounded proc.join() and not reading from result_queue, a classic multiprocessing deadlock occurs. The scheduler hangs indefinitely, stopping heartbeats and preventing systemd/Kubernetes from restarting the process (as it never exits).

Changes

  1. Draining during Join: Updated the shutdown loop in LocalExecutor.end() to continuously drain the result_queue using _read_results() while waiting for worker processes to join with a small timeout.
  2. Robust Queue Reading: Wrapped the queue drainage loop in _read_results() with a try/except (OSError, EOFError) block to gracefully handle cases where pipes are already broken or closed during task exit.
  3. Forced Terminate: Implemented the LocalExecutor.terminate() method to forcefully kill any remaining workers when a hard stop is requested.

Context

This contribution was created as part of the required capstone of the CodePath AI301 course, where students learn how to make responsible and effective use of generative AI tools for open-source software contributions.


Was generative AI tooling used to co-author this PR?
  • Yes (Antigravity IDE powered by Gemini)

Generated-by: Antigravity IDE (Gemini) following the guidelines


Copilot AI review requested due to automatic review settings June 2, 2026 00:24
@boring-cyborg boring-cyborg Bot added the area:Executors-core LocalExecutor & SequentialExecutor label Jun 2, 2026
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented Jun 2, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Fixes a potential deadlock in LocalExecutor shutdown where workers blocked on writing to a full result_queue would never exit, hanging the scheduler. Also makes terminate() actually terminate worker processes.

Changes:

  • Drain result_queue while waiting for workers to join in end(), with KeyboardInterrupt/SystemExit fallback that force-terminates workers.
  • Harden _read_results against OSError/EOFError from a closed/broken queue.
  • Implement terminate() to forcefully terminate all worker processes (previously a no-op).

Comment thread airflow-core/src/airflow/executors/local_executor.py Outdated
Comment thread airflow-core/src/airflow/executors/local_executor.py Outdated
Comment thread airflow-core/src/airflow/executors/local_executor.py
Comment thread airflow-core/src/airflow/executors/local_executor.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor

Projects

None yet

2 participants