diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py index 5ddba214dd356..aba5f0a4adf5b 100644 --- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py @@ -427,6 +427,8 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]: new_state = TaskInstanceState.UPSTREAM_FAILED elif skipped == upstream: new_state = TaskInstanceState.SKIPPED + elif upstream_done and success == 0: + new_state = TaskInstanceState.UPSTREAM_FAILED elif trigger_rule == TR.NONE_SKIPPED: if skipped: new_state = TaskInstanceState.SKIPPED @@ -547,6 +549,14 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]: f"upstream_task_ids={task.upstream_task_ids}" ) ) + elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS and success <= 0: + yield self._failing_status( + reason=( + f"Task's trigger rule '{trigger_rule_str}' requires at least one upstream task " + f"success, but none were found. upstream_states={upstream_states}, " + f"upstream_task_ids={task.upstream_task_ids}" + ) + ) elif trigger_rule == TR.NONE_SKIPPED: if not upstream_done or (skipped > 0): yield self._failing_status( diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py index f0820eb5b4673..bd4a576f9f17a 100644 --- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py +++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py @@ -620,12 +620,18 @@ def test_none_failed_min_one_success_tr_success(self, session, get_task_instance ) _test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed) - @pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), [(True, SKIPPED), (False, None)]) + @pytest.mark.parametrize( + ("flag_upstream_failed", "expected_ti_state", "expected_reason"), + [ + (True, SKIPPED, "requires at least one upstream task success"), + (False, None, "requires at least one upstream task success"), + ], + ) def test_none_failed_min_one_success_tr_skipped( - self, session, get_task_instance, flag_upstream_failed, expected_ti_state + self, session, get_task_instance, flag_upstream_failed, expected_ti_state, expected_reason ): """ - None failed min one success trigger rule success with all skipped + None failed min one success trigger rule with all skipped upstreams """ ti = get_task_instance( TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, @@ -642,6 +648,7 @@ def test_none_failed_min_one_success_tr_skipped( session=session, flag_upstream_failed=flag_upstream_failed, expected_ti_state=expected_ti_state, + expected_reason=expected_reason, ) @pytest.mark.parametrize( @@ -1509,6 +1516,51 @@ def test_mapped_task_upstream_removed_with_none_failed_trigger_rules( _test_trigger_rule(ti=ti, session=session, flag_upstream_failed=flag_upstream_failed) + @pytest.mark.flaky(reruns=5) + @pytest.mark.parametrize( + ("flag_upstream_failed", "expected_ti_state"), + [(True, UPSTREAM_FAILED), (False, None)], + ) + def test_mapped_task_upstream_all_removed_with_none_failed_min_one_success_trigger_rule( + self, + monkeypatch, + session, + get_mapped_task_dagrun, + flag_upstream_failed, + expected_ti_state, + ): + """ + Test NONE_FAILED_MIN_ONE_SUCCESS trigger rule with all mapped upstream tasks removed. + """ + dr, task, _ = get_mapped_task_dagrun( + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + state=REMOVED, + ) + + # ti with removed upstream ti + ti = dr.get_task_instance(task_id="do_something_else", map_index=3, session=session) + ti.task = task + + upstream_states = _UpstreamTIStates( + success=0, + skipped=0, + failed=0, + removed=5, + upstream_failed=0, + done=5, + skipped_setup=0, + success_setup=0, + ) + monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states) + + _test_trigger_rule( + ti=ti, + session=session, + flag_upstream_failed=flag_upstream_failed, + expected_reason="requires at least one upstream task success", + expected_ti_state=expected_ti_state, + ) + def test_upstream_in_mapped_group_triggers_only_relevant(dag_maker, session): from airflow.sdk import task, task_group