Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
new_state = TaskInstanceState.UPSTREAM_FAILED
elif skipped == upstream:
new_state = TaskInstanceState.SKIPPED
elif upstream_done and success == 0:
new_state = TaskInstanceState.UPSTREAM_FAILED
elif trigger_rule == TR.NONE_SKIPPED:
if skipped:
new_state = TaskInstanceState.SKIPPED
Expand Down Expand Up @@ -547,6 +549,14 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS and success <= 0:
yield self._failing_status(
reason=(
f"Task's trigger rule '{trigger_rule_str}' requires at least one upstream task "
f"success, but none were found. upstream_states={upstream_states}, "
f"upstream_task_ids={task.upstream_task_ids}"
)
)
elif trigger_rule == TR.NONE_SKIPPED:
if not upstream_done or (skipped > 0):
yield self._failing_status(
Expand Down
58 changes: 55 additions & 3 deletions airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,12 +620,18 @@ def test_none_failed_min_one_success_tr_success(self, session, get_task_instance
)
_test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed)

@pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), [(True, SKIPPED), (False, None)])
@pytest.mark.parametrize(
("flag_upstream_failed", "expected_ti_state", "expected_reason"),
[
(True, SKIPPED, "requires at least one upstream task success"),
(False, None, "requires at least one upstream task success"),
],
)
def test_none_failed_min_one_success_tr_skipped(
self, session, get_task_instance, flag_upstream_failed, expected_ti_state
self, session, get_task_instance, flag_upstream_failed, expected_ti_state, expected_reason
):
"""
None failed min one success trigger rule success with all skipped
None failed min one success trigger rule with all skipped upstreams
"""
ti = get_task_instance(
TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
Expand All @@ -642,6 +648,7 @@ def test_none_failed_min_one_success_tr_skipped(
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_ti_state=expected_ti_state,
expected_reason=expected_reason,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -1509,6 +1516,51 @@ def test_mapped_task_upstream_removed_with_none_failed_trigger_rules(

_test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed)

@pytest.mark.flaky(reruns=5)
@pytest.mark.parametrize(
("flag_upstream_failed", "expected_ti_state"),
[(True, UPSTREAM_FAILED), (False, None)],
)
def test_mapped_task_upstream_all_removed_with_none_failed_min_one_success_trigger_rule(
self,
monkeypatch,
session,
get_mapped_task_dagrun,
flag_upstream_failed,
expected_ti_state,
):
"""
Test NONE_FAILED_MIN_ONE_SUCCESS trigger rule with all mapped upstream tasks removed.
"""
dr, task, _ = get_mapped_task_dagrun(
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
state=REMOVED,
)

# ti with removed upstream ti
ti = dr.get_task_instance(task_id="do_something_else", map_index=3, session=session)
ti.task = task

upstream_states = _UpstreamTIStates(
success=0,
skipped=0,
failed=0,
removed=5,
upstream_failed=0,
done=5,
skipped_setup=0,
success_setup=0,
)
monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states)

_test_trigger_rule(
ti=ti,
session=session,
flag_upstream_failed=flag_upstream_failed,
expected_reason="requires at least one upstream task success",
expected_ti_state=expected_ti_state,
)


def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session):
from airflow.sdk import task, task_group
Expand Down
Loading