Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,7 @@ impl CatalogState {
updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
}
ConnectionDetails::Csr(_)
| ConnectionDetails::GlueSchemaRegistry(_)
| ConnectionDetails::Postgres(_)
| ConnectionDetails::MySql(_)
| ConnectionDetails::SqlServer(_)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2772,6 +2772,7 @@ impl ConnectionResolver for CatalogState {
Kafka(conn) => Kafka(conn.into_inline_connection(self)),
Postgres(conn) => Postgres(conn.into_inline_connection(self)),
Csr(conn) => Csr(conn.into_inline_connection(self)),
GlueSchemaRegistry(conn) => GlueSchemaRegistry(conn.into_inline_connection(self)),
Ssh(conn) => Ssh(conn),
Aws(conn) => Aws(conn),
AwsPrivatelink(conn) => AwsPrivatelink(conn),
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,7 @@ impl Coordinator {
new_aws_privatelink_connections += 1
}
ConnectionDetails::Csr(_)
| ConnectionDetails::GlueSchemaRegistry(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_)
| ConnectionDetails::IcebergCatalog(_) => {}
Expand Down Expand Up @@ -1308,6 +1309,7 @@ impl Coordinator {
ConnectionDetails::SqlServer(_) => current_sql_server_connections += 1,
ConnectionDetails::Kafka(_) => current_kafka_connections += 1,
ConnectionDetails::Csr(_)
| ConnectionDetails::GlueSchemaRegistry(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_)
| ConnectionDetails::IcebergCatalog(_) => {}
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ Fullname
Function
Fusion
Generator
Glue
Grant
Greatest
Group
Expand Down
8 changes: 8 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ pub enum ConnectionOptionName {
PublicKey1,
PublicKey2,
Region,
Registry,
SaslMechanisms,
SaslPassword,
SaslUsername,
Expand Down Expand Up @@ -826,6 +827,7 @@ impl AstDisplay for ConnectionOptionName {
ConnectionOptionName::PublicKey1 => "PUBLIC KEY 1",
ConnectionOptionName::PublicKey2 => "PUBLIC KEY 2",
ConnectionOptionName::Region => "REGION",
ConnectionOptionName::Registry => "REGISTRY",
ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
Expand Down Expand Up @@ -875,6 +877,7 @@ impl WithOptionName for ConnectionOptionName {
| ConnectionOptionName::PublicKey1
| ConnectionOptionName::PublicKey2
| ConnectionOptionName::Region
| ConnectionOptionName::Registry
| ConnectionOptionName::AssumeRoleArn
| ConnectionOptionName::AssumeRoleSessionName
| ConnectionOptionName::SaslMechanisms
Expand Down Expand Up @@ -911,6 +914,7 @@ impl_display_t!(ConnectionOption);
pub enum CreateConnectionType {
Aws,
AwsPrivatelink,
GlueSchemaRegistry,
Kafka,
Csr,
Postgres,
Expand All @@ -928,6 +932,7 @@ impl CreateConnectionType {
Self::Postgres => "postgres",
Self::Aws => "aws",
Self::AwsPrivatelink => "aws-privatelink",
Self::GlueSchemaRegistry => "glue-schema-registry",
Self::Ssh => "ssh-tunnel",
Self::MySql => "mysql",
Self::SqlServer => "sql-server",
Expand All @@ -954,6 +959,9 @@ impl AstDisplay for CreateConnectionType {
Self::AwsPrivatelink => {
f.write_str("AWS PRIVATELINK");
}
Self::GlueSchemaRegistry => {
f.write_str("AWS GLUE SCHEMA REGISTRY");
}
Self::Ssh => {
f.write_str("SSH TUNNEL");
}
Expand Down
5 changes: 5 additions & 0 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2462,6 +2462,9 @@ impl<'a> Parser<'a> {
AWS => {
if self.parse_keyword(PRIVATELINK) {
CreateConnectionType::AwsPrivatelink
} else if self.parse_keyword(GLUE) {
self.expect_keywords(&[SCHEMA, REGISTRY])?;
CreateConnectionType::GlueSchemaRegistry
} else {
CreateConnectionType::Aws
}
Expand Down Expand Up @@ -2792,6 +2795,7 @@ impl<'a> Parser<'a> {
PUBLIC,
PROGRESS,
REGION,
REGISTRY,
ROLE,
SASL,
SCOPE,
Expand Down Expand Up @@ -2862,6 +2866,7 @@ impl<'a> Parser<'a> {
ConnectionOptionName::SecurityProtocol
}
REGION => ConnectionOptionName::Region,
REGISTRY => ConnectionOptionName::Registry,
SASL => match self.expect_one_of_keywords(&[MECHANISMS, PASSWORD, USERNAME])? {
MECHANISMS => ConnectionOptionName::SaslMechanisms,
PASSWORD => ConnectionOptionName::SaslPassword,
Expand Down
23 changes: 22 additions & 1 deletion src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,27 @@ CREATE CONNECTION icebergcatalog TO ICEBERG CATALOG (CATALOG TYPE = 's3tablesres
=>
CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("icebergcatalog")]), connection_type: IcebergCatalog, if_not_exists: false, values: [ConnectionOption { name: CatalogType, value: Some(Value(String("s3tablesrest"))) }, ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Warehouse, value: Some(Value(String("wh"))) }], with_options: [] })

parse-statement
CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry')
----
CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry')
=>
CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [] })

parse-statement
CREATE CONNECTION glueconn FOR AWS GLUE SCHEMA REGISTRY AWS CONNECTION awsconn, REGISTRY 'my-registry'
----
CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry')
=>
CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [] })

parse-statement
CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') WITH (VALIDATE = FALSE)
----
CREATE CONNECTION glueconn TO AWS GLUE SCHEMA REGISTRY (AWS CONNECTION = awsconn, REGISTRY = 'my-registry') WITH (VALIDATE = false)
=>
CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("glueconn")]), connection_type: GlueSchemaRegistry, if_not_exists: false, values: [ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Registry, value: Some(Value(String("my-registry"))) }], with_options: [CreateConnectionOption { name: Validate, value: Some(Value(Boolean(false))) }] })

parse-statement
CREATE CONNECTION pgconn FOR postgres HOST foo, PORT 1234, SSL CERTIFICATE AUTHORITY 'foo', SSH TUNNEL tun, DATABASE 'db', PASSWORD 'pw', SSL CERTIFICATE 'cert', SSL KEY 'key', SSL MODE 'mode', USER 'postgres'
----
Expand Down Expand Up @@ -2820,7 +2841,7 @@ CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("my
parse-statement
CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope)
----
error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis
error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or REGISTRY or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis
CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope)
^

Expand Down
9 changes: 7 additions & 2 deletions src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ use mz_ssh_util::keys::SshKeyPair;
use mz_storage_types::connections::aws::AwsConnection;
use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::connections::{
AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
AwsPrivatelinkConnection, CsrConnection, GlueSchemaRegistryConnection,
IcebergCatalogConnection, KafkaConnection, MySqlConnection, PostgresConnection,
SqlServerConnectionDetails, SshConnection,
};
use mz_storage_types::instances::StorageInstanceId;
use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
Expand Down Expand Up @@ -1668,6 +1669,7 @@ pub struct Connection {
pub enum ConnectionDetails {
Kafka(KafkaConnection<ReferencedConnection>),
Csr(CsrConnection<ReferencedConnection>),
GlueSchemaRegistry(GlueSchemaRegistryConnection<ReferencedConnection>),
Postgres(PostgresConnection<ReferencedConnection>),
Ssh {
connection: SshConnection,
Expand All @@ -1688,6 +1690,9 @@ impl ConnectionDetails {
mz_storage_types::connections::Connection::Kafka(c.clone())
}
ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
ConnectionDetails::GlueSchemaRegistry(c) => {
mz_storage_types::connections::Connection::GlueSchemaRegistry(c.clone())
}
ConnectionDetails::Postgres(c) => {
mz_storage_types::connections::Connection::Postgres(c.clone())
}
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7066,6 +7066,7 @@ pub fn plan_alter_connection(
Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
Connection::Kafka(_) => CreateConnectionType::Kafka,
Connection::Csr(_) => CreateConnectionType::Csr,
Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry,
Connection::Postgres(_) => CreateConnectionType::Postgres,
Connection::Ssh(_) => CreateConnectionType::Ssh,
Connection::MySql(_) => CreateConnectionType::MySql,
Expand Down
28 changes: 24 additions & 4 deletions src/sql/src/plan/statement/ddl/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use mz_storage_types::connections::inline::ReferencedConnection;
use mz_storage_types::connections::string_or_secret::StringOrSecret;
use mz_storage_types::connections::{
AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection,
CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType,
KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection,
MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog,
SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel,
CsrConnectionHttpAuth, GlueSchemaRegistryConnection, IcebergCatalogConnection,
IcebergCatalogImpl, IcebergCatalogType, KafkaConnection, KafkaSaslConfig, KafkaTlsConfig,
KafkaTopicOptions, MySqlConnection, MySqlSslMode, PostgresConnection, RestIcebergCatalog,
S3TablesRestIcebergCatalog, SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity,
Tunnel,
};

use crate::names::Aug;
Expand Down Expand Up @@ -66,6 +67,7 @@ generate_extracted_config!(
(PublicKey1, String),
(PublicKey2, String),
(Region, String),
(Registry, String),
(SaslMechanisms, String),
(SaslPassword, with_options::Secret),
(SaslUsername, StringOrSecret),
Expand Down Expand Up @@ -115,6 +117,7 @@ pub(super) fn validate_options_per_connection_type(
]
.as_slice(),
CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName],
CreateConnectionType::GlueSchemaRegistry => &[AwsConnection, Registry],
CreateConnectionType::Csr => &[
AwsPrivatelink,
Password,
Expand Down Expand Up @@ -400,6 +403,23 @@ impl ConnectionOptionExtracted {
tunnel,
})
}
CreateConnectionType::GlueSchemaRegistry => {
scx.require_feature_flag(&vars::ENABLE_GLUE_SCHEMA_REGISTRY)?;

let aws_connection = get_aws_connection_reference(scx, &self)?
.ok_or_else(|| sql_err!("AWS CONNECTION option is required"))?;
let registry_name = self
.registry
.ok_or_else(|| sql_err!("REGISTRY option is required"))?;
if registry_name.is_empty() {
sql_bail!("invalid CONNECTION: REGISTRY must not be empty");
}

ConnectionDetails::GlueSchemaRegistry(GlueSchemaRegistryConnection {
aws_connection,
registry_name,
})
}
CreateConnectionType::Postgres => {
let cert = self.ssl_certificate;
let key = self.ssl_key.map(|secret| secret.into());
Expand Down
6 changes: 6 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,12 @@ feature_flags!(
default: false,
enable_for_item_parsing: true,
},
{
name: enable_glue_schema_registry,
desc: "CREATE CONNECTION ... TO AWS GLUE SCHEMA REGISTRY",
default: false,
enable_for_item_parsing: true,
},
{
name: enable_alter_set_cluster,
desc: "ALTER ... SET CLUSTER syntax",
Expand Down
Loading
Loading