Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/67873.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``none_failed_min_one_success`` trigger rule evaluation when no upstream task succeeds, including mapped tasks whose upstream instances are all ``removed``.
Comment thread
nailo2c marked this conversation as resolved.
Outdated
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
new_state = TaskInstanceState.UPSTREAM_FAILED
elif skipped == upstream:
new_state = TaskInstanceState.SKIPPED
elif upstream_done and success == 0:
new_state = TaskInstanceState.UPSTREAM_FAILED
elif trigger_rule == TR.NONE_SKIPPED:
if skipped:
new_state = TaskInstanceState.SKIPPED
Expand Down Expand Up @@ -547,6 +549,14 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS and success <= 0:
yield self._failing_status(
reason=(
f"Task's trigger rule '{trigger_rule_str}' requires at least one upstream task "
f"success, but none were found. upstream_states={upstream_states}, "
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif trigger_rule == TR.NONE_SKIPPED:
if not upstream_done or (skipped > 0):
yield self._failing_status(
Expand Down
58 changes: 55 additions & 3 deletions airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,18 @@ def test_none_failed_min_one_success_tr_success(self, session, get_task_instance
)
_test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed)

@pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), [(True, SKIPPED), (False, None)])
@pytest.mark.parametrize(
("flag_upstream_failed", "expected_ti_state", "expected_reason"),
[
(True, SKIPPED, "requires at least one upstream task success"),
(False, None, "requires at least one upstream task success"),
],
)
def test_none_failed_min_one_success_tr_skipped(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
self, session, get_task_instance, flag_upstream_failed, expected_ti_state, expected_reason
):
"""
None failed min one success trigger rule success with all skipped
None failed min one success trigger rule with all skipped upstreams
"""
ti = get_task_instance(
TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
Expand All @@ -642,6 +648,7 @@ def test_none_failed_min_one_success_tr_skipped(
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_ti_state=expected_ti_state,
expected_reason=expected_reason,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -1509,6 +1516,51 @@ def test_mapped_task_upstream_removed_with_none_failed_trigger_rules(

_test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed)

@pytest.mark.flaky(reruns=5)
@pytest.mark.parametrize(
("flag_upstream_failed", "expected_ti_state"),
[(True, UPSTREAM_FAILED), (False, None)],
)
def test_mapped_task_upstream_all_removed_with_none_failed_min_one_success_trigger_rule(
self,
monkeypatch,
session,
get_mapped_task_dagrun,
flag_upstream_failed,
expected_ti_state,
):
"""
Test NONE_FAILED_MIN_ONE_SUCCESS trigger rule with all mapped upstream tasks removed.
"""
dr, task, _ = get_mapped_task_dagrun(
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
state=REMOVED,
)

# ti with removed upstream ti
ti = dr.get_task_instance(task_id="do_something_else", map_index=3, session=session)
ti.task = task

upstream_states = _UpstreamTIStates(
success=0,
skipped=0,
failed=0,
removed=5,
upstream_failed=0,
done=5,
skipped_setup=0,
success_setup=0,
)
monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states)

_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_reason="requires at least one upstream task success",
expected_ti_state=expected_ti_state,
)


def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
from airflow.sdk import task, task_group
Expand Down
Loading