diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 8ff4e061a8b5e..1a08a6c909029 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -938,6 +938,7 @@ impl CatalogState { updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff)); } ConnectionDetails::Csr(_) + | ConnectionDetails::GlueSchemaRegistry(_) | ConnectionDetails::Postgres(_) | ConnectionDetails::MySql(_) | ConnectionDetails::SqlServer(_) diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 62189e33882bf..b61a7cc26f9d7 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -2772,6 +2772,7 @@ impl ConnectionResolver for CatalogState { Kafka(conn) => Kafka(conn.into_inline_connection(self)), Postgres(conn) => Postgres(conn.into_inline_connection(self)), Csr(conn) => Csr(conn.into_inline_connection(self)), + GlueSchemaRegistry(conn) => GlueSchemaRegistry(conn.into_inline_connection(self)), Ssh(conn) => Ssh(conn), Aws(conn) => Aws(conn), AwsPrivatelink(conn) => AwsPrivatelink(conn), diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 0c82e8f690d76..a5afcd2f6f7ab 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1133,6 +1133,7 @@ impl Coordinator { new_aws_privatelink_connections += 1 } ConnectionDetails::Csr(_) + | ConnectionDetails::GlueSchemaRegistry(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) | ConnectionDetails::IcebergCatalog(_) => {} @@ -1308,6 +1309,7 @@ impl Coordinator { ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1, ConnectionDetails::Kafka(_) => current_kafka_connections += 1, ConnectionDetails::Csr(_) + | ConnectionDetails::GlueSchemaRegistry(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) | ConnectionDetails::IcebergCatalog(_) => {} diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index b5832107b0644..cba3cb9c37f52 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -201,6 +201,7 @@ Fullname Function Fusion Generator +Glue Grant Greatest Group diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 527e82a153366..48b5b39330608 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -785,6 +785,7 @@ pub enum ConnectionOptionName { PublicKey1, PublicKey2, Region, + Registry, SaslMechanisms, SaslPassword, SaslUsername, @@ -826,6 +827,7 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1", ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2", ConnectionOptionName::Region => "REGION", + ConnectionOptionName::Registry => "REGISTRY", ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN", ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME", ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS", @@ -875,6 +877,7 @@ impl WithOptionName for ConnectionOptionName { | ConnectionOptionName::PublicKey1 | ConnectionOptionName::PublicKey2 | ConnectionOptionName::Region + | ConnectionOptionName::Registry | ConnectionOptionName::AssumeRoleArn | ConnectionOptionName::AssumeRoleSessionName | ConnectionOptionName::SaslMechanisms @@ -911,6 +914,7 @@ impl_display_t!(ConnectionOption); pub enum CreateConnectionType { Aws, AwsPrivatelink, + GlueSchemaRegistry, Kafka, Csr, Postgres, @@ -928,6 +932,7 @@ impl CreateConnectionType { Self::Postgres => "postgres", Self::Aws => "aws", Self::AwsPrivatelink => "aws-privatelink", + Self::GlueSchemaRegistry => "glue-schema-registry", Self::Ssh => "ssh-tunnel", Self::MySql => "mysql", Self::SqlServer => "sql-server", @@ -954,6 +959,9 @@ impl AstDisplay for CreateConnectionType { Self::AwsPrivatelink => { f.write_str("AWS PRIVATELINK"); } + Self::GlueSchemaRegistry => { + f.write_str("AWS GLUE SCHEMA REGISTRY"); + } Self::Ssh => { f.write_str("SSH TUNNEL"); } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 419220ef1da4d..df62b0aecbdbc 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2462,6 +2462,9 @@ impl<'a> Parser<'a> { AWS => { if self.parse_keyword(PRIVATELINK) { CreateConnectionType::AwsPrivatelink + } else if self.parse_keyword(GLUE) { + self.expect_keywords(&[SCHEMA, REGISTRY])?; + CreateConnectionType::GlueSchemaRegistry } else { CreateConnectionType::Aws } @@ -2792,6 +2795,7 @@ impl<'a> Parser<'a> { PUBLIC, PROGRESS, REGION, + REGISTRY, ROLE, SASL, SCOPE, @@ -2862,6 +2866,7 @@ impl<'a> Parser<'a> { ConnectionOptionName::SecurityProtocol } REGION => ConnectionOptionName::Region, + REGISTRY => ConnectionOptionName::Registry, SASL => match self.expect_one_of_keywords(&[MECHANISMS, PASSWORD, USERNAME])? { MECHANISMS => ConnectionOptionName::SaslMechanisms, PASSWORD => ConnectionOptionName::SaslPassword, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 465ca01b1bb2d..44591b16393b8 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -589,6 +589,27 @@ CREATE CONNECTION icebergcatalog TO ICEBERG CATALOG (CATALOG TYPE = 's3tablesres => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("icebergcatalog")]), connection_type: IcebergCatalog, if_not_exists: false, values: [ConnectionOption { name: CatalogType, value: Some(Value(String("s3tablesrest"))) }, ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Warehouse, value: Some(Value(String("wh"))) }], with_options: [] }) +parse-statement +CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') +---- +CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [] }) + +parse-statement +CREATE CONNECTION glueconn FOR AWS GLUE SCHEMA REGISTRY AWS CONNECTION awsconn, REGISTRY 'my-registry' +---- +CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [] }) + +parse-statement +CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') WITH (VALIDATE = FALSE) +---- +CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') WITH (VALIDATE = false) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [CreateConnectionOption { name: Validate, value: Some(Value(Boolean(false))) }] }) + parse-statement CREATE CONNECTION pgconn FOR postgres HOST foo, PORT 1234, SSL CERTIFICATE AUTHORITY 'foo', SSH TUNNEL tun, DATABASE 'db', PASSWORD 'pw', SSL CERTIFICATE 'cert', SSL KEY 'key', SSL MODE 'mode', USER 'postgres' ---- @@ -2820,7 +2841,7 @@ CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("my parse-statement CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope) ---- -error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis +error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or REGISTRY or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope) ^ diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 6a885020b7486..46383a9bb60a3 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -59,8 +59,9 @@ use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::AwsConnection; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::{ - AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection, - MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection, + AwsPrivatelinkConnection, CsrConnection, GlueSchemaRegistryConnection, + IcebergCatalogConnection, KafkaConnection, MySqlConnection, PostgresConnection, + SqlServerConnectionDetails, SshConnection, }; use mz_storage_types::instances::StorageInstanceId; use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection}; @@ -1668,6 +1669,7 @@ pub struct Connection { pub enum ConnectionDetails { Kafka(KafkaConnection), Csr(CsrConnection), + GlueSchemaRegistry(GlueSchemaRegistryConnection), Postgres(PostgresConnection), Ssh { connection: SshConnection, @@ -1688,6 +1690,9 @@ impl ConnectionDetails { mz_storage_types::connections::Connection::Kafka(c.clone()) } ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()), + ConnectionDetails::GlueSchemaRegistry(c) => { + mz_storage_types::connections::Connection::GlueSchemaRegistry(c.clone()) + } ConnectionDetails::Postgres(c) => { mz_storage_types::connections::Connection::Postgres(c.clone()) } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index f82614cc14296..d1a3cb3cc143c 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -7066,6 +7066,7 @@ pub fn plan_alter_connection( Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink, Connection::Kafka(_) => CreateConnectionType::Kafka, Connection::Csr(_) => CreateConnectionType::Csr, + Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry, Connection::Postgres(_) => CreateConnectionType::Postgres, Connection::Ssh(_) => CreateConnectionType::Ssh, Connection::MySql(_) => CreateConnectionType::MySql, diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index d27d7e74a3101..7024ded55391c 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -33,10 +33,11 @@ use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection, - CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, - KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, - MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, - SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel, + CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogConnection, + IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, + KafkaTopicOptions, MySqlConnection, MySqlSslMode, PostgresConnection, RestIcebergCatalog, + S3TablesRestIcebergCatalog, SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, + Tunnel, }; use crate::names::Aug; @@ -66,6 +67,7 @@ generate_extracted_config!( (PublicKey1, String), (PublicKey2, String), (Region, String), + (Registry, String), (SaslMechanisms, String), (SaslPassword, with_options::Secret), (SaslUsername, StringOrSecret), @@ -115,6 +117,7 @@ pub(super) fn validate_options_per_connection_type( ] .as_slice(), CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName], + CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry], CreateConnectionType::Csr => &[ AwsPrivatelink, Password, @@ -400,6 +403,23 @@ impl ConnectionOptionExtracted { tunnel, }) } + CreateConnectionType::GlueSchemaRegistry => { + scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?; + + let aws_connection = get_aws_connection_reference(scx, &self)? + .ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?; + let registry_name = self + .registry + .ok_or_else(|| sql_err!("REGISTRY option is required"))?; + if registry_name.is_empty() { + sql_bail!("invalid CONNECTION: REGISTRY must not be empty"); + } + + ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection { + aws_connection, + registry_name, + }) + } CreateConnectionType::Postgres => { let cert = self.ssl_certificate; let key = self.ssl_key.map(|secret| secret.into()); diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index a08c06f8526e0..abd5bd6709ff0 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1934,6 +1934,12 @@ feature_flags!( default: false, enable_for_item_parsing: true, }, + { + name: enable_glue_schema_registry, + desc: "CREATE CONNECTION ... TO AWS GLUE SCHEMA REGISTRY", + default: false, + enable_for_item_parsing: true, + }, { name: enable_alter_set_cluster, desc: "ALTER ... SET CLUSTER syntax", diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index cd6b5f602ebef..1f049f76d0dcc 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -254,6 +254,7 @@ impl ConnectionContext { pub enum Connection { Kafka(KafkaConnection), Csr(CsrConnection), + GlueSchemaRegistry(GlueSchemaRegistryConnection), Postgres(PostgresConnection), Ssh(SshConnection), Aws(AwsConnection), @@ -270,6 +271,9 @@ impl IntoInlineConnection match self { Connection::Kafka(kafka) => Connection::Kafka(kafka.into_inline_connection(r)), Connection::Csr(csr) => Connection::Csr(csr.into_inline_connection(r)), + Connection::GlueSchemaRegistry(glue) => { + Connection::GlueSchemaRegistry(glue.into_inline_connection(r)) + } Connection::Postgres(pg) => Connection::Postgres(pg.into_inline_connection(r)), Connection::Ssh(ssh) => Connection::Ssh(ssh), Connection::Aws(aws) => Connection::Aws(aws), @@ -291,6 +295,7 @@ impl Connection { match self { Connection::Kafka(conn) => conn.validate_by_default(), Connection::Csr(conn) => conn.validate_by_default(), + Connection::GlueSchemaRegistry(conn) => conn.validate_by_default(), Connection::Postgres(conn) => conn.validate_by_default(), Connection::Ssh(conn) => conn.validate_by_default(), Connection::Aws(conn) => conn.validate_by_default(), @@ -312,6 +317,9 @@ impl Connection { match self { Connection::Kafka(conn) => conn.validate(id, storage_configuration).await?, Connection::Csr(conn) => conn.validate(id, storage_configuration).await?, + Connection::GlueSchemaRegistry(conn) => { + conn.validate(id, storage_configuration).await? + } Connection::Postgres(conn) => { conn.validate(id, storage_configuration).await?; } @@ -378,6 +386,15 @@ impl Connection { } } + pub fn unwrap_glue_schema_registry( + self, + ) -> ::GlueSchemaRegistry { + match self { + Self::GlueSchemaRegistry(conn) => conn, + o => unreachable!("{o:?} is not an AWS Glue Schema Registry connection"), + } + } + pub fn unwrap_iceberg_catalog(self) -> ::IcebergCatalog { match self { Self::IcebergCatalog(conn) => conn, @@ -1585,6 +1602,92 @@ impl AlterCompatible for CsrConnection { } } +/// A connection to an AWS Glue Schema Registry. +/// +/// AWS credentials, region, and endpoint are inherited from the referenced +/// [`AwsConnection`]; this struct only carries the per-registry settings. +/// +/// NOTE: Stage 1 of the GSR rollout. The client crate +/// (`mz-aws-glue-schema-registry`) does not exist yet; `validate` is a +/// no-op until Stage 3 retrofits a real `GetRegistry` ping. The connection +/// can be created and inspected, but cannot yet be attached to a source or +/// sink. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct GlueSchemaRegistryConnection { + /// The referenced AWS connection that supplies credentials, region, and + /// (optional) endpoint. + pub aws_connection: AwsConnectionReference, + /// The Glue Schema Registry name within the AWS account/region. + pub registry_name: String, +} + +impl IntoInlineConnection + for GlueSchemaRegistryConnection +{ + fn into_inline_connection(self, r: R) -> GlueSchemaRegistryConnection { + let GlueSchemaRegistryConnection { + aws_connection, + registry_name, + } = self; + GlueSchemaRegistryConnection { + aws_connection: aws_connection.into_inline_connection(&r), + registry_name, + } + } +} + +impl GlueSchemaRegistryConnection { + fn validate_by_default(&self) -> bool { + // Stage 1 of the AWS Glue Schema Registry rollout: the client crate + // does not exist yet, and the no-op `validate` below succeeds + // unconditionally. Default-validating preserves the API contract so + // that Stage 3's real `GetRegistry` ping slots in without + // behavioral change. + true + } +} + +impl GlueSchemaRegistryConnection { + async fn validate( + &self, + _id: CatalogItemId, + _storage_configuration: &StorageConfiguration, + ) -> Result<(), anyhow::Error> { + // Stage 1: no-op. Real validation arrives in Stage 3 when the + // Glue client crate exists. Until then a `CREATE CONNECTION` + // succeeds even against a registry that doesn't exist; the + // failure will surface on first use (which is itself gated until + // source/sink integration lands in Stages 4/5). + std::future::ready(Ok(())).await + } +} + +impl AlterCompatible for GlueSchemaRegistryConnection { + fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> { + let GlueSchemaRegistryConnection { + registry_name, + // The referenced AWS connection itself may be swapped; matches + // the permissive policy of MySqlConnection / SqlServerConnection. + aws_connection: _, + } = self; + + let compatibility_checks = [(registry_name == &other.registry_name, "registry_name")]; + + for (compatible, field) in compatibility_checks { + if !compatible { + tracing::warn!( + "GlueSchemaRegistryConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}", + self, + other + ); + + return Err(AlterError { id }); + } + } + Ok(()) + } +} + /// A TLS key pair used for client identity. #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct TlsIdentity { diff --git a/src/storage-types/src/connections/inline.rs b/src/storage-types/src/connections/inline.rs index eca7992c5680f..b49cfed80456e 100644 --- a/src/storage-types/src/connections/inline.rs +++ b/src/storage-types/src/connections/inline.rs @@ -93,6 +93,14 @@ pub trait ConnectionAccess: Clone + Debug + Eq + PartialEq + Serialize + 'static + Serialize + for<'a> Deserialize<'a> + AlterCompatible; + type GlueSchemaRegistry: Clone + + Debug + + Eq + + PartialEq + + Hash + + Serialize + + for<'a> Deserialize<'a> + + AlterCompatible; type MySql: Clone + Debug + Eq @@ -131,6 +139,7 @@ impl ConnectionAccess for ReferencedConnection { type Aws = CatalogItemId; type Ssh = CatalogItemId; type Csr = CatalogItemId; + type GlueSchemaRegistry = CatalogItemId; type MySql = CatalogItemId; type SqlServer = CatalogItemId; type IcebergCatalog = CatalogItemId; @@ -146,6 +155,7 @@ impl ConnectionAccess for InlinedConnection { type Aws = super::aws::AwsConnection; type Ssh = super::SshConnection; type Csr = super::CsrConnection; + type GlueSchemaRegistry = super::GlueSchemaRegistryConnection; type MySql = super::MySqlConnection; type SqlServer = super::SqlServerConnectionDetails; type IcebergCatalog = super::IcebergCatalogConnection; diff --git a/test/sqllogictest/glue_schema_registry.slt b/test/sqllogictest/glue_schema_registry.slt new file mode 100644 index 0000000000000..ad14a41985840 --- /dev/null +++ b/test/sqllogictest/glue_schema_registry.slt @@ -0,0 +1,128 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Stage 1 tests for AWS Glue Schema Registry connections. +# +# Validates parsing, planning, catalog round-trip, ALTER compatibility, and +# the `enable_glue_schema_registry` feature flag. No client crate exists in +# this stage; `validate()` is a no-op (Stage 3 retrofits real validation). +# +# See doc/developer/design/20260512_aws_glue_schema_registry.md. + +reset-server + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_connection_validation_syntax TO true; +---- +COMPLETE 0 + +# The feature flag is off by default; enable it for the happy-path tests. +# A dedicated flag-off section toggles it back below. +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_glue_schema_registry TO true; +---- +COMPLETE 0 + +# Prerequisite: an AWS connection to reference. +statement ok +CREATE CONNECTION aws_conn TO AWS ( + ASSUME ROLE ARN 'arn:aws:iam::123456789000:role/my-role' +) WITH (VALIDATE = false) + +# Happy path: GSR connection with both required options. +statement ok +CREATE CONNECTION glue_conn TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = aws_conn, + REGISTRY = 'my-registry' +) + +# SHOW CREATE CONNECTION round-trip (pretty-printed multiline form). +query T multiline +SELECT create_sql FROM (SHOW CREATE CONNECTION glue_conn) +---- +CREATE CONNECTION materialize.public.glue_conn +TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = materialize.public.aws_conn, REGISTRY = 'my-registry'); +EOF + +# mz_connections lists the connection with the expected type string. +query TT +SELECT name, type FROM mz_connections WHERE name = 'glue_conn' +---- +glue_conn +glue-schema-registry + +# DROP works. +statement ok +DROP CONNECTION glue_conn + +# Missing REGISTRY rejected at plan time. +statement error REGISTRY option is required +CREATE CONNECTION bad_glue TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = aws_conn) + +# Missing AWS CONNECTION rejected at plan time. +statement error AWS CONNECTION option is required +CREATE CONNECTION bad_glue TO AWS GLUE SCHEMA REGISTRY (REGISTRY = 'my-registry') + +# Empty registry name rejected. +statement error REGISTRY must not be empty +CREATE CONNECTION bad_glue TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = aws_conn, + REGISTRY = '' +) + +# Wrong connection type for AWS CONNECTION rejected. +statement ok +CREATE CONNECTION csr_dummy TO CONFLUENT SCHEMA REGISTRY (URL 'http://localhost:8081') WITH (VALIDATE = false) + +statement error is not an AWS connection +CREATE CONNECTION bad_glue TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = csr_dummy, + REGISTRY = 'my-registry' +) + +statement ok +DROP CONNECTION csr_dummy + +# Unsupported option rejected (PORT belongs to other connection types). +statement error AWS GLUE SCHEMA REGISTRY connections do not support PORT values +CREATE CONNECTION bad_glue TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = aws_conn, + REGISTRY = 'my-registry', + PORT = 8080 +) + +# Feature flag gating. +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_glue_schema_registry TO false; +---- +COMPLETE 0 + +statement error CREATE CONNECTION \.\.\. TO AWS GLUE SCHEMA REGISTRY is not available +CREATE CONNECTION gated_glue TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = aws_conn, + REGISTRY = 'my-registry' +) + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_glue_schema_registry TO true; +---- +COMPLETE 0 + +# Re-enable: works again. +statement ok +CREATE CONNECTION glue_conn TO AWS GLUE SCHEMA REGISTRY ( + AWS CONNECTION = aws_conn, + REGISTRY = 'my-registry' +) + +statement ok +DROP CONNECTION glue_conn + +statement ok +DROP CONNECTION aws_conn