From 6871ed284b7f27a79099ae4c85e8328cf51e7eca Mon Sep 17 00:00:00 2001 From: Karsh Vashi Date: Mon, 1 Jun 2026 23:17:32 +0100 Subject: [PATCH 1/2] propogate verify and botocore_config in redshift cluster triggers --- .../amazon/aws/operators/redshift_cluster.py | 15 +++ .../amazon/aws/triggers/redshift_cluster.py | 60 +++++++++- .../aws/triggers/test_redshift_cluster.py | 107 ++++++++++++++++++ 3 files changed, 177 insertions(+), 5 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py index c050987df1d27..ea0d1272db8c6 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -378,6 +378,9 @@ def execute(self, context: Context): waiter_delay=self.poll_interval, waiter_max_attempts=self.max_attempt, aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + botocore_config=self.botocore_config, ), method_name="execute_complete", ) @@ -497,6 +500,9 @@ def execute(self, context: Context) -> Any: waiter_delay=self.poll_interval, waiter_max_attempts=self.max_attempt, aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + botocore_config=self.botocore_config, ), method_name="execute_complete", # timeout is set to ensure that if a trigger dies, the timeout does not restart @@ -668,6 +674,9 @@ def execute(self, context: Context): waiter_delay=self.poll_interval, waiter_max_attempts=self.max_attempts, aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + botocore_config=self.botocore_config, ), method_name="execute_complete", # timeout is set to ensure that if a trigger dies, the timeout does not restart @@ -775,6 +784,9 @@ def execute(self, context: Context): waiter_delay=self.poll_interval, waiter_max_attempts=self.max_attempts, aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + botocore_config=self.botocore_config, ), method_name="execute_complete", # timeout is set to ensure that if a trigger dies, the timeout does not restart @@ -901,6 +913,9 @@ def execute(self, context: Context): waiter_delay=self.poll_interval, waiter_max_attempts=self.max_attempts, aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + botocore_config=self.botocore_config, ), method_name="execute_complete", ) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py index ad299595bf823..788a4ddad1771 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py @@ -43,10 +43,13 @@ class RedshiftCreateClusterTrigger(AwsBaseWaiterTrigger): def __init__( self, + *, cluster_identifier: str, aws_conn_id: str | None = "aws_default", + region_name: str | None = None, waiter_delay: int = 15, waiter_max_attempts: int = 999999, + **kwargs, ): super().__init__( serialized_fields={"cluster_identifier": cluster_identifier}, @@ -59,10 +62,17 @@ def __init__( waiter_delay=waiter_delay, waiter_max_attempts=waiter_max_attempts, aws_conn_id=aws_conn_id, + region_name=region_name, + **kwargs, ) def hook(self) -> AwsGenericHook: - return RedshiftHook(aws_conn_id=self.aws_conn_id) + return RedshiftHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger): @@ -80,10 +90,13 @@ class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger): def __init__( self, + *, cluster_identifier: str, aws_conn_id: str | None = "aws_default", + region_name: str | None = None, waiter_delay: int = 15, waiter_max_attempts: int = 999999, + **kwargs, ): super().__init__( serialized_fields={"cluster_identifier": cluster_identifier}, @@ -96,10 +109,17 @@ def __init__( waiter_delay=waiter_delay, waiter_max_attempts=waiter_max_attempts, aws_conn_id=aws_conn_id, + region_name=region_name, + **kwargs, ) def hook(self) -> AwsGenericHook: - return RedshiftHook(aws_conn_id=self.aws_conn_id) + return RedshiftHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) class RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger): @@ -117,10 +137,13 @@ class RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger): def __init__( self, + *, cluster_identifier: str, aws_conn_id: str | None = "aws_default", + region_name: str | None = None, waiter_delay: int = 15, waiter_max_attempts: int = 999999, + **kwargs, ): super().__init__( serialized_fields={"cluster_identifier": cluster_identifier}, @@ -133,10 +156,17 @@ def __init__( waiter_delay=waiter_delay, waiter_max_attempts=waiter_max_attempts, aws_conn_id=aws_conn_id, + region_name=region_name, + **kwargs, ) def hook(self) -> AwsGenericHook: - return RedshiftHook(aws_conn_id=self.aws_conn_id) + return RedshiftHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger): @@ -154,10 +184,13 @@ class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger): def __init__( self, + *, cluster_identifier: str, aws_conn_id: str | None = "aws_default", + region_name: str | None = None, waiter_delay: int = 15, waiter_max_attempts: int = 999999, + **kwargs, ): super().__init__( serialized_fields={"cluster_identifier": cluster_identifier}, @@ -170,10 +203,17 @@ def __init__( waiter_delay=waiter_delay, waiter_max_attempts=waiter_max_attempts, aws_conn_id=aws_conn_id, + region_name=region_name, + **kwargs, ) def hook(self) -> AwsGenericHook: - return RedshiftHook(aws_conn_id=self.aws_conn_id) + return RedshiftHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger): @@ -188,10 +228,13 @@ class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger): def __init__( self, + *, cluster_identifier: str, aws_conn_id: str | None = "aws_default", + region_name: str | None = None, waiter_delay: int = 30, waiter_max_attempts: int = 30, + **kwargs, ): super().__init__( serialized_fields={"cluster_identifier": cluster_identifier}, @@ -204,10 +247,17 @@ def __init__( waiter_delay=waiter_delay, waiter_max_attempts=waiter_max_attempts, aws_conn_id=aws_conn_id, + region_name=region_name, + **kwargs, ) def hook(self) -> AwsGenericHook: - return RedshiftHook(aws_conn_id=self.aws_conn_id) + return RedshiftHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) class RedshiftClusterTrigger(BaseTrigger): diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py index 75055fe5ad6fe..aa2f8d71940be 100644 --- a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py @@ -24,6 +24,11 @@ from airflow.providers.amazon.aws.triggers.redshift_cluster import ( RedshiftClusterTrigger, + RedshiftCreateClusterSnapshotTrigger, + RedshiftCreateClusterTrigger, + RedshiftDeleteClusterTrigger, + RedshiftPauseClusterTrigger, + RedshiftResumeClusterTrigger, ) from airflow.triggers.base import TriggerEvent @@ -115,3 +120,105 @@ async def test_redshift_cluster_sensor_trigger_exception(self, mock_cluster_stat # so we validate for length of task to be 1 assert len(task) == 1 assert TriggerEvent({"status": "error", "message": "Test exception"}) in task + + +WAITER_TRIGGER_PARAMS = [ + pytest.param( + RedshiftCreateClusterTrigger, + 15, + 999999, + id="RedshiftCreateClusterTrigger", + ), + pytest.param( + RedshiftPauseClusterTrigger, + 15, + 999999, + id="RedshiftPauseClusterTrigger", + ), + pytest.param( + RedshiftCreateClusterSnapshotTrigger, + 15, + 999999, + id="RedshiftCreateClusterSnapshotTrigger", + ), + pytest.param( + RedshiftResumeClusterTrigger, + 15, + 999999, + id="RedshiftResumeClusterTrigger", + ), + pytest.param( + RedshiftDeleteClusterTrigger, + 30, + 30, + id="RedshiftDeleteClusterTrigger", + ), +] + + +class TestRedshiftWaiterTriggers: + """Tests for the five Redshift triggers that inherit from ``AwsBaseWaiterTrigger``.""" + + @pytest.mark.parametrize( + ("trigger_cls", "default_delay", "default_max_attempts"), + WAITER_TRIGGER_PARAMS, + ) + def test_serialization(self, trigger_cls, default_delay, default_max_attempts): + trigger = trigger_cls( + cluster_identifier="test_cluster", + aws_conn_id="aws_default", + region_name="us-east-1", + ) + + classpath, kwargs = trigger.serialize() + assert classpath == f"airflow.providers.amazon.aws.triggers.redshift_cluster.{trigger_cls.__name__}" + assert kwargs == { + "cluster_identifier": "test_cluster", + "waiter_delay": default_delay, + "waiter_max_attempts": default_max_attempts, + "aws_conn_id": "aws_default", + "region_name": "us-east-1", + } + + @pytest.mark.parametrize( + ("trigger_cls", "default_delay", "default_max_attempts"), + WAITER_TRIGGER_PARAMS, + ) + def test_serialization_with_verify_and_botocore_config( + self, trigger_cls, default_delay, default_max_attempts + ): + trigger = trigger_cls( + cluster_identifier="test_cluster", + aws_conn_id="aws_default", + verify=False, + botocore_config={"connect_timeout": 30}, + ) + + _, kwargs = trigger.serialize() + assert kwargs["verify"] is False + assert kwargs["botocore_config"] == {"connect_timeout": 30} + + @pytest.mark.parametrize( + ("trigger_cls", "default_delay", "default_max_attempts"), + WAITER_TRIGGER_PARAMS, + ) + @mock.patch("airflow.providers.amazon.aws.triggers.redshift_cluster.RedshiftHook") + def test_hook_propagates_verify_and_botocore_config( + self, mock_hook_cls, trigger_cls, default_delay, default_max_attempts + ): + trigger = trigger_cls( + cluster_identifier="test_cluster", + aws_conn_id="test_conn", + region_name="eu-west-1", + verify="/path/to/ca-bundle.crt", + botocore_config={"read_timeout": 60}, + ) + + trigger.hook() + + mock_hook_cls.assert_called_once_with( + aws_conn_id="test_conn", + region_name="eu-west-1", + verify="/path/to/ca-bundle.crt", + config={"read_timeout": 60}, + ) From d80652225ec679083e4af4f3cca9046b47aade63 Mon Sep 17 00:00:00 2001 From: Karsh Vashi Date: Tue, 2 Jun 2026 00:50:39 +0100 Subject: [PATCH 2/2] Document new trigger params and assert region_name pruning --- .../amazon/aws/triggers/redshift_cluster.py | 15 +++++++++++++++ .../amazon/aws/triggers/test_redshift_cluster.py | 1 + 2 files changed, 16 insertions(+) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py index 788a4ddad1771..5e6576d39310e 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/triggers/redshift_cluster.py @@ -39,6 +39,9 @@ class RedshiftCreateClusterTrigger(AwsBaseWaiterTrigger): :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: The AWS region where the cluster is. Used to build the hook. + :param verify: Whether or not to verify SSL certificates. Used to build the hook. + :param botocore_config: Configuration dictionary for the botocore client. Used to build the hook. """ def __init__( @@ -86,6 +89,9 @@ class RedshiftPauseClusterTrigger(AwsBaseWaiterTrigger): :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: The AWS region where the cluster is. Used to build the hook. + :param verify: Whether or not to verify SSL certificates. Used to build the hook. + :param botocore_config: Configuration dictionary for the botocore client. Used to build the hook. """ def __init__( @@ -133,6 +139,9 @@ class RedshiftCreateClusterSnapshotTrigger(AwsBaseWaiterTrigger): :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: The AWS region where the cluster is. Used to build the hook. + :param verify: Whether or not to verify SSL certificates. Used to build the hook. + :param botocore_config: Configuration dictionary for the botocore client. Used to build the hook. """ def __init__( @@ -180,6 +189,9 @@ class RedshiftResumeClusterTrigger(AwsBaseWaiterTrigger): :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: The AWS region where the cluster is. Used to build the hook. + :param verify: Whether or not to verify SSL certificates. Used to build the hook. + :param botocore_config: Configuration dictionary for the botocore client. Used to build the hook. """ def __init__( @@ -224,6 +236,9 @@ class RedshiftDeleteClusterTrigger(AwsBaseWaiterTrigger): :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. :param waiter_delay: The amount of time in seconds to wait between attempts. + :param region_name: The AWS region where the cluster is. Used to build the hook. + :param verify: Whether or not to verify SSL certificates. Used to build the hook. + :param botocore_config: Configuration dictionary for the botocore client. Used to build the hook. """ def __init__( diff --git a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py index aa2f8d71940be..d551308a72458 100644 --- a/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/triggers/test_redshift_cluster.py @@ -197,6 +197,7 @@ def test_serialization_with_verify_and_botocore_config( _, kwargs = trigger.serialize() assert kwargs["verify"] is False assert kwargs["botocore_config"] == {"connect_timeout": 30} + assert "region_name" not in kwargs @pytest.mark.parametrize( ("trigger_cls", "default_delay", "default_max_attempts"),