diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index a0d9739080118..5c1c8b3b7d748 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -166,6 +166,7 @@ class TIRescheduleStatePayload(StrictBaseModel): ] reschedule_date: UtcDateTime end_date: UtcDateTime + rendered_map_index: str | None = None class TIRetryStatePayload(StrictBaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 2afd96806c473..1edb3ff1cbdfa 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -673,7 +673,10 @@ def _create_ti_state_update_query_and_update_state( query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) # clear the next_method and next_kwargs so that none of the retries pick them up updated_state = TaskInstanceState.UP_FOR_RESCHEDULE - query = query.values(state=updated_state, next_method=None, next_kwargs=None) + reschedule_values: dict[str, Any] = {"state": updated_state, "next_method": None, "next_kwargs": None} + if ti_patch_payload.rendered_map_index is not None: + reschedule_values["_rendered_map_index"] = ti_patch_payload.rendered_map_index + query = query.values(**reschedule_values) else: raise ValueError(f"Unexpected Payload Type {type(ti_patch_payload)}") diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index ed43dfdbc1209..15d7b5a9d07b0 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -46,11 +46,14 @@ AddTaskAndAssetStoreEndpoints, AddTeamNameField, ) -from airflow.api_fastapi.execution_api.versions.v2026_06_30 import AddVariableKeysEndpoint +from airflow.api_fastapi.execution_api.versions.v2026_06_30 import ( + AddRenderedMapIndexToReschedulePayload, + AddVariableKeysEndpoint, +) bundle = VersionBundle( HeadVersion(), - Version("2026-06-30", AddVariableKeysEndpoint), + Version("2026-06-30", AddVariableKeysEndpoint, AddRenderedMapIndexToReschedulePayload), Version( "2026-06-16", AddRetryPolicyFields, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index 0bc300a499837..21dad504fdb4e 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -17,7 +17,9 @@ from __future__ import annotations -from cadwyn import VersionChange, endpoint +from cadwyn import VersionChange, endpoint, schema + +from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRescheduleStatePayload class AddVariableKeysEndpoint(VersionChange): @@ -26,3 +28,13 @@ class AddVariableKeysEndpoint(VersionChange): description = __doc__ instructions_to_migrate_to_previous_version = (endpoint("/variables/keys", ["GET"]).didnt_exist,) + + +class AddRenderedMapIndexToReschedulePayload(VersionChange): + """Add the ``rendered_map_index`` field to TIRescheduleStatePayload.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(TIRescheduleStatePayload).field("rendered_map_index").didnt_exist, + ) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 89b698d21fbe0..d8e8c3aa3005b 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -270,6 +270,7 @@ class TIRescheduleStatePayload(BaseModel): state: Annotated[Literal["up_for_reschedule"] | None, Field(title="State")] = "up_for_reschedule" reschedule_date: Annotated[AwareDatetime, Field(title="Reschedule Date")] end_date: Annotated[AwareDatetime, Field(title="End Date")] + rendered_map_index: Annotated[str | None, Field(title="Rendered Map Index")] = None class TIRetryStatePayload(BaseModel): diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json index 094c3f94d9ee9..6d49f2bc65bd5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json +++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json @@ -3211,6 +3211,18 @@ "title": "End Date", "type": "string" }, + "rendered_map_index": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Rendered Map Index" + }, "type": { "const": "RescheduleTask", "default": "RescheduleTask", diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 0005c36bc7e9b..3dc8406500989 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1708,6 +1708,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: self._rendered_map_index = msg.rendered_map_index self._send_terminal_state_msg(msg) elif isinstance(msg, RescheduleTask): + self._rendered_map_index = msg.rendered_map_index self._send_terminal_state_msg(msg) elif isinstance(msg, SkipDownstreamTasks): self.client.task_instances.skip_downstream_tasks(self.id, msg) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 22ac90405e027..028ae74bbc779 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1484,7 +1484,9 @@ def _on_term(signum, frame): log.info("::group::Post Execute") log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( - reschedule_date=reschedule.reschedule_date, end_date=datetime.now(tz=timezone.utc) + reschedule_date=reschedule.reschedule_date, + end_date=datetime.now(tz=timezone.utc), + rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.UP_FOR_RESCHEDULE except (AirflowFailException, AirflowSensorTimeout) as e: diff --git a/task-sdk/tests/task_sdk/bases/test_sensor.py b/task-sdk/tests/task_sdk/bases/test_sensor.py index 5e7f588a1551a..84861f797fcab 100644 --- a/task-sdk/tests/task_sdk/bases/test_sensor.py +++ b/task-sdk/tests/task_sdk/bases/test_sensor.py @@ -310,6 +310,26 @@ def test_ok_with_custom_reschedule_exception(self, make_sensor, run_task): state, _, _ = run_task(sensor) assert state == TaskInstanceState.SUCCESS + def test_reschedule_includes_rendered_map_index(self, run_task, make_sensor, time_machine): + """Test that RescheduleTask message includes rendered_map_index when map_index_template is set.""" + sensor = make_sensor( + return_value=None, + poke_interval=10, + timeout=25, + mode="reschedule", + map_index_template="{{ task.task_id }}", + ) + sensor.poke = Mock(return_value=False) + + date1 = timezone.utcnow() + time_machine.move_to(date1, tick=False) + + state, msg, _ = run_task(task=sensor) + + assert state == TaskInstanceState.UP_FOR_RESCHEDULE + assert isinstance(msg, RescheduleTask) + assert msg.rendered_map_index == SENSOR_OP + def test_sensor_with_invalid_poke_interval(self): negative_poke_interval = -10 non_number_poke_interval = "abcd"