diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a75937528f..d768b15e583 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti - Names containing any of these characters: /\:*?"<>|#+` ### New Features +- Added GCS principal attribution for vended credentials (the GCP counterpart of AWS STS session tags). When the `GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE`, `GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER`, and `GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE` feature flags are set (plus a `gcpServiceAccount` on the storage config), credential vending chains a catalog-signed JWT through a Workload Identity Federation token exchange and tenant service-account impersonation, so the Polaris principal appears in GCS Data Access audit logs (`serviceAccountDelegationInfo.principalSubject`) for any client. `GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_ID` sets the JWT `kid` for JWKS key rotation. Attribution activates automatically once configured and is keyed per-principal in the credential cache; when unconfigured, GCP vending behaviour is unchanged. - Added `SESSION_NAME_FIELDS_IN_SUBSCOPED_CREDENTIAL` feature flag for AWS credential vending. Operators can now configure an ordered list of fields (`realm`, `catalog`, `namespace`, `table`, `principal`) to compose structured STS role session names (e.g. `p-acme-hr_catalog-employee-etl_writer`). Session names are sanitized and proportionally truncated to the AWS 64-character limit. When unset, existing `INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL` behaviour is preserved. - Added `hostUsers` support in Helm chart. - Added documentation for BigQuery Metastore Catalog federation. Build with `-PNonRESTCatalogs=BIGQUERY` to include the BigQueryMetastoreCatalog federation extension. See `site/content/in-dev/unreleased/federation/bigquery-metastore-federation.md`. diff --git a/polaris-core/build.gradle.kts b/polaris-core/build.gradle.kts index f516a7826ec..f35e0a58ff3 100644 --- a/polaris-core/build.gradle.kts +++ b/polaris-core/build.gradle.kts @@ -70,6 +70,9 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") implementation(libs.google.cloud.iamcredentials) + // Signs short-lived attribution JWTs for GCS principal attribution via Workload Identity + // Federation (see GcpFederatedCredentialsExchanger). + implementation(libs.auth0.jwt) testCompileOnly(project(":polaris-immutables")) testAnnotationProcessor(project(":polaris-immutables", configuration = "processor")) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 31db1d987e9..4c0e42be933 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -209,6 +209,64 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(List.of()) .buildFeatureConfiguration(); + // --------------------------------------------------------------------------- + // GCS principal attribution via Workload Identity Federation + // + // GCP downscoped credentials have no session-tag mechanism (unlike AWS STS), and custom audit + // headers only reach GCS audit logs if the client forwards them. To attribute GCS data access + // to the Polaris principal for ANY client, credential vending can chain + // catalog-signed JWT -> STS token exchange -> per-catalog service-account impersonation, so the + // principal appears in serviceAccountDelegationInfo of every GCS Data Access audit log entry. + // + // Attribution activates automatically once the audience, issuer, and signing key file are all + // set (no on/off flag); it additionally requires a gcpServiceAccount on the storage config. + // --------------------------------------------------------------------------- + + public static final FeatureConfiguration GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE = + PolarisConfiguration.builder() + .key("GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE") + .description( + "Full resource name of the Workload Identity Pool provider used for GCS principal\n" + + "attribution, e.g.\n" + + "//iam.googleapis.com/projects//locations/global/workloadIdentityPools//providers/.\n" + + "Used as both the attribution JWT 'aud' claim and the STS token-exchange audience.\n" + + "Empty (default) disables principal attribution.") + .defaultValue("") + .buildFeatureConfiguration(); + + public static final FeatureConfiguration GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER = + PolarisConfiguration.builder() + .key("GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER") + .description( + "Issuer (iss claim) of catalog-minted GCS attribution JWTs; must match the issuer\n" + + "configured on the Workload Identity Pool OIDC provider. The provider verifies\n" + + "signatures against its uploaded JWKS, so no public discovery endpoint is required.\n" + + "Empty (default) disables principal attribution.") + .defaultValue("") + .buildFeatureConfiguration(); + + public static final FeatureConfiguration GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE = + PolarisConfiguration.builder() + .key("GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE") + .description( + "Filesystem path to the PKCS#8 PEM RSA private key used to sign GCS attribution JWTs\n" + + "(RS256). The corresponding public key must be published in the Workload Identity\n" + + "Pool provider's uploaded JWKS. Empty (default) disables principal attribution.") + .defaultValue("") + .buildFeatureConfiguration(); + + public static final FeatureConfiguration GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_ID = + PolarisConfiguration.builder() + .key("GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_ID") + .description( + "Key ID (kid) written into the header of GCS attribution JWTs so the Workload Identity\n" + + "Pool provider can select the right public key from its JWKS during key rotation\n" + + "(when the JWKS holds both the old and new keys). Must match the kid of the JWKS\n" + + "entry for the configured signing key. Empty omits the header (only safe with a\n" + + "single-key JWKS).") + .defaultValue("") + .buildFeatureConfiguration(); + public static final FeatureConfiguration ALLOW_SETTING_S3_ENDPOINTS = PolarisConfiguration.builder() .key("ALLOW_SETTING_S3_ENDPOINTS") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionParams.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionParams.java new file mode 100644 index 00000000000..7277727d6c3 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionParams.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import org.apache.polaris.immutables.PolarisImmutable; +import org.immutables.value.Value; + +/** + * Pre-computed GCS principal attribution configuration parameters, evaluated at cache key build + * time. Present in the cache key only when attribution is fully configured and a principal is + * available; absent otherwise. + */ +@PolarisImmutable +public interface GcpAttributionParams { + + @Value.Parameter(order = 1) + String tokenIssuer(); + + @Value.Parameter(order = 2) + String wifAudience(); + + @Value.Parameter(order = 3) + String signingKeyFile(); + + @Value.Parameter(order = 4) + String signingKeyId(); + + static GcpAttributionParams of( + String tokenIssuer, String wifAudience, String signingKeyFile, String signingKeyId) { + return ImmutableGcpAttributionParams.of(tokenIssuer, wifAudience, signingKeyFile, signingKeyId); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilder.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilder.java new file mode 100644 index 00000000000..4982bca1b79 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +/** + * Builds the {@code sub} claim for GCS principal-attribution JWTs as {@code /}, + * within GCP's 127-character {@code google.subject} limit. + * + *

The character budget mirrors the AWS session-name builder: one character is reserved for the + * separator, then each field receives an equal share of the remainder, and budget unused by a short + * field flows to the other. ISO control characters and the {@code /} separator are stripped from + * each field so the subject stays unambiguously parseable, and the {@code unknown} placeholder + * substitutes null/empty fields so the subject shape stays stable. + */ +public final class GcpAttributionSubjectBuilder { + + /** GCP limit for the {@code google.subject} attribute of a federated identity. */ + public static final int MAX_SUBJECT_LENGTH = 127; + + static final String SEPARATOR = "/"; + + static final String VALUE_UNKNOWN = "unknown"; + + private GcpAttributionSubjectBuilder() {} + + /** + * Builds the attribution subject {@code /}, guaranteed to be at most {@value + * #MAX_SUBJECT_LENGTH} characters. + * + * @param realm the realm identifier (gets first-half budget priority) + * @param principalName the Polaris principal name + * @return the subject string + */ + public static String buildSubject(String realm, String principalName) { + String cleanRealm = sanitize(realm); + String cleanPrincipal = sanitize(principalName); + + int budget = MAX_SUBJECT_LENGTH - SEPARATOR.length(); + int remaining = budget; + + int realmAlloc = remaining / 2; + int realmUsed = Math.min(cleanRealm.length(), realmAlloc); + remaining -= realmUsed; + + int principalUsed = Math.min(cleanPrincipal.length(), remaining); + remaining -= principalUsed; + + // Carry-forward: if the principal left budget unused, the realm may take more than its + // initial half-share. + int realmFinal = Math.min(cleanRealm.length(), realmUsed + remaining); + + return cleanRealm.substring(0, realmFinal) + + SEPARATOR + + cleanPrincipal.substring(0, principalUsed); + } + + private static String sanitize(String value) { + if (value == null || value.isEmpty()) { + return VALUE_UNKNOWN; + } + StringBuilder cleaned = new StringBuilder(value.length()); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + // Drop control chars and the separator itself so the subject stays unambiguously + // /: a value containing '/' would otherwise let an audit-log consumer + // mis-split it (e.g. principal "a/b" read as realm "a", principal "b"). + if (!Character.isISOControl(c) && c != '/') { + cleaned.append(c); + } + } + return cleaned.length() == 0 ? VALUE_UNKNOWN : cleaned.toString(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java index 8ed4a843d9a..668f84ffc2b 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java @@ -33,6 +33,7 @@ import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; import java.io.IOException; +import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Date; @@ -46,6 +47,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.CachingStorageIntegration; import org.apache.polaris.core.storage.CredentialVendingContext; @@ -176,6 +178,28 @@ private GcpStorageCredentialCacheKey buildCacheKey( @NonNull Set writeLocations, @NonNull Optional refreshEndpoint, @NonNull CredentialVendingContext context) { + // Principal attribution makes the vended token per-principal, so the principal must + // participate in cache identity; otherwise it is left empty to preserve cross-principal cache + // reuse. Attribution requires a service account to impersonate and a principal to attribute. + String principalName = ""; + Optional attributionParams = Optional.empty(); + if (principalAttributionConfigured(realmConfig()) + && storageConfig().getGcpServiceAccount() != null) { + principalName = context.principalName().orElse(""); + if (!principalName.isEmpty()) { + attributionParams = + Optional.of( + GcpAttributionParams.of( + realmConfig() + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER), + realmConfig() + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE), + realmConfig() + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE), + realmConfig() + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_ID))); + } + } return GcpStorageCredentialCacheKey.of( context.realm().orElse(""), storageConfig(), @@ -183,10 +207,28 @@ private GcpStorageCredentialCacheKey buildCacheKey( listLocations, writeLocations, refreshEndpoint, + principalName, sourceCredentials, transportFactory, realmConfig(), - credentialOps); + credentialOps, + attributionParams); + } + + /** + * Returns true when GCS principal attribution is fully configured (WIF audience, token issuer, + * and signing key file all set). There is intentionally no separate on/off flag. + */ + private static boolean principalAttributionConfigured(RealmConfig realmConfig) { + return !realmConfig + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE) + .isEmpty() + && !realmConfig + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER) + .isEmpty() + && !realmConfig + .getConfig(FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE) + .isEmpty(); } /** Mint a fresh {@link StorageAccessConfig} for the given GCP cache key. */ @@ -206,7 +248,7 @@ static StorageAccessConfig compute(GcpStorageCredentialCacheKey key) { } GoogleCredentials credentialsToDownscope = - getBaseCredentials(gcpStorageConfig, sourceCredentials, credentialOps); + baseCredentialsForVending(key, gcpStorageConfig, sourceCredentials, credentialOps); CredentialAccessBoundary accessBoundary = generateAccessBoundaryRules(readLocations, listLocations, writeLocations); @@ -246,6 +288,40 @@ static StorageAccessConfig compute(GcpStorageCredentialCacheKey key) { return accessConfig.build(); } + /** + * Returns the credential to be used as the source for downscoping. + * + *

When GCS principal attribution is configured and a principal is present (so the cache key + * carries pre-computed {@link GcpAttributionParams}), the impersonation source is a federated + * identity whose subject is {@code /}, which surfaces the principal in {@code + * serviceAccountDelegationInfo} of GCS Data Access audit logs. Otherwise this is the standard + * path: impersonate the configured service account from the ambient source credentials, or use + * those credentials directly. + */ + private static GoogleCredentials baseCredentialsForVending( + GcpStorageCredentialCacheKey key, + GcpStorageConfigurationInfo storageConfig, + GoogleCredentials sourceCredentials, + GcpCredentialOps credentialOps) { + Optional attributionParams = key.attributionParams(); + if (attributionParams.isPresent()) { + GcpAttributionParams params = attributionParams.get(); + String subject = + GcpAttributionSubjectBuilder.buildSubject(key.realmId(), key.principalName()); + GcpFederatedCredentialsExchanger exchanger = + new GcpFederatedCredentialsExchanger( + params.tokenIssuer(), + params.wifAudience(), + Path.of(params.signingKeyFile()), + params.signingKeyId(), + key.transportFactory()); + GoogleCredentials federated = exchanger.federatedCredentials(subject, key.realmId()); + return createImpersonatedCredentials( + federated, storageConfig.getGcpServiceAccount(), credentialOps); + } + return getBaseCredentials(storageConfig, sourceCredentials, credentialOps); + } + /** * Returns the credential to be used as the source for downscoping. If a specific service account * is configured, it impersonates that account first. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java new file mode 100644 index 00000000000..b7ec99ae8c0 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import com.auth0.jwt.JWT; +import com.auth0.jwt.JWTCreator; +import com.auth0.jwt.algorithms.Algorithm; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.IdentityPoolCredentials; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyFactory; +import java.security.interfaces.RSAPrivateKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; +import java.time.Instant; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Produces a GCP federated {@link GoogleCredentials} whose identity carries {@code + * /}, so that GCS Data Access audit logs attribute access to the requesting + * Polaris principal. This is the GCP counterpart of AWS STS session tags. + * + *

The federated credential is an {@link IdentityPoolCredentials} backed by a programmatic + * subject-token supplier: on each token refresh google-auth invokes the supplier, which mints a + * short-lived RS256 JWT ({@code sub = /}, {@code realm} claim), and exchanges it + * at the Workload Identity Pool provider's STS endpoint. The provider maps {@code google.subject = + * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm {@code attribute.realm} + * IAM bindings then enforce that a realm-A identity can only impersonate realm-A's service account. + * The returned credential is intended to be used as the source for tenant service-account + * impersonation (see {@link GcpCredentialsStorageIntegration}). + * + *

Network note: this performs an STS token exchange against {@code sts.googleapis.com} in + * addition to the existing {@code iamcredentials.googleapis.com} and {@code storage.googleapis.com} + * traffic. + */ +public class GcpFederatedCredentialsExchanger { + + static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token"; + static final String SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"; + static final String CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; + + /** Attribution JWTs are single-purpose and short-lived. */ + static final Duration JWT_LIFETIME = Duration.ofMinutes(5); + + /** + * JVM-wide cache of parsed signing keys, keyed by file path. The key file is a stable pod-mounted + * secret; parsing it (disk read + {@link KeyFactory}) once per path amortizes across vends rather + * than re-reading on every credential-cache miss. Key rotation is delivered by a process restart + * (the secret is mounted at startup), which clears this cache. + */ + private static final ConcurrentHashMap SIGNING_KEY_CACHE = + new ConcurrentHashMap<>(); + + private final String issuer; + private final String wifAudience; + private final Path signingKeyPath; + private final String signingKeyId; + private final HttpTransportFactory transportFactory; + + public GcpFederatedCredentialsExchanger( + String issuer, + String wifAudience, + Path signingKeyPath, + String signingKeyId, + HttpTransportFactory transportFactory) { + this.issuer = issuer; + this.wifAudience = wifAudience; + this.signingKeyPath = signingKeyPath; + this.signingKeyId = signingKeyId; + this.transportFactory = transportFactory; + } + + /** + * Builds a federated credential whose subject is {@code /}. + * + * @param subject the attribution subject, {@code /} (see {@link + * GcpAttributionSubjectBuilder}) + * @param realm the realm identifier, emitted as the {@code realm} claim for {@code + * attribute.realm} mapping + * @return federated credentials suitable as the source for tenant-SA impersonation + */ + public GoogleCredentials federatedCredentials(String subject, String realm) { + return IdentityPoolCredentials.newBuilder() + .setHttpTransportFactory(transportFactory) + .setAudience(wifAudience) + .setSubjectTokenType(SUBJECT_TOKEN_TYPE) + .setTokenUrl(STS_TOKEN_URL) + .setScopes(List.of(CLOUD_PLATFORM_SCOPE)) + .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm)) + .build(); + } + + @VisibleForTesting + String mintAttributionJwt(String subject, String realm) throws IOException { + Instant now = Instant.now(); + JWTCreator.Builder builder = + JWT.create() + .withIssuer(issuer) + .withSubject(subject) + .withAudience(wifAudience) + .withClaim("realm", realm) + .withIssuedAt(Date.from(now)) + .withExpiresAt(Date.from(now.plus(JWT_LIFETIME))) + .withJWTId(UUID.randomUUID().toString()); + // Set the kid header so the WIF provider can pick the right public key from its JWKS during + // rotation (when the JWKS holds both the old and new keys). Omitted only for a single-key JWKS. + if (signingKeyId != null && !signingKeyId.isEmpty()) { + builder.withKeyId(signingKeyId); + } + return builder.sign(Algorithm.RSA256(null, loadSigningKey())); + } + + private RSAPrivateKey loadSigningKey() throws IOException { + RSAPrivateKey cached = SIGNING_KEY_CACHE.get(signingKeyPath); + if (cached != null) { + return cached; + } + RSAPrivateKey key = readPkcs8PrivateKey(signingKeyPath); + SIGNING_KEY_CACHE.putIfAbsent(signingKeyPath, key); + return key; + } + + /** Reads an RSA private key from a PKCS#8 PEM file. */ + @VisibleForTesting + static RSAPrivateKey readPkcs8PrivateKey(Path pemPath) throws IOException { + String pem = Files.readString(pemPath); + String base64 = + pem.replaceAll("-----BEGIN [A-Z ]+-----", "") + .replaceAll("-----END [A-Z ]+-----", "") + .replaceAll("\\s", ""); + try { + byte[] der = Base64.getDecoder().decode(base64); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + return (RSAPrivateKey) keyFactory.generatePrivate(new PKCS8EncodedKeySpec(der)); + } catch (Exception e) { + throw new IOException( + "Unable to read PKCS#8 RSA private key for GCS attribution from " + pemPath, e); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKey.java index 8691c7d2572..d8abca63761 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKey.java @@ -29,8 +29,14 @@ import org.immutables.value.Value; /** - * Cache key for vended GCP credentials. GCP downscoped credentials do not support session tags, so - * principal and credential vending context are never included. + * Cache key for vended GCP credentials. + * + *

By default GCP downscoped credentials are principal-independent, so {@link #principalName()} + * is empty and credentials are shared across principals. When GCS principal attribution via + * Workload Identity Federation is enabled (see {@code + * FeatureConfiguration.GCS_PRINCIPAL_ATTRIBUTION_*}), the vended token is derived from a + * per-principal federated identity, so the principal name is included here to ensure one + * principal's attributed credentials are never served to another. */ @PolarisImmutable public interface GcpStorageCredentialCacheKey extends StorageCredentialCacheKey { @@ -55,25 +61,43 @@ public interface GcpStorageCredentialCacheKey extends StorageCredentialCacheKey @Value.Parameter(order = 6) Optional refreshCredentialsEndpoint(); + /** + * The requesting principal name, included in the cache key only when GCS principal attribution is + * enabled (otherwise empty). When attribution is active the vended credential carries this + * principal's identity, so it must participate in cache identity to avoid serving one principal's + * attributed credentials to another. + */ + @Value.Parameter(order = 7) + String principalName(); + // ---- aux: app-scoped invariants, excluded from equals/hashCode ---- - @Value.Parameter(order = 7) + @Value.Parameter(order = 8) @Value.Auxiliary GoogleCredentials sourceCredentials(); - @Value.Parameter(order = 8) + @Value.Parameter(order = 9) @Value.Auxiliary HttpTransportFactory transportFactory(); @Override - @Value.Parameter(order = 9) + @Value.Parameter(order = 10) @Value.Auxiliary RealmConfig realmConfig(); - @Value.Parameter(order = 10) + @Value.Parameter(order = 11) @Value.Auxiliary GcpCredentialOps credentialOps(); + /** + * Pre-computed attribution config parameters, present only when GCS principal attribution is + * fully configured and a principal is available. Computed at cache key build time so {@code + * compute()} can use these values directly without re-reading realm config. + */ + @Value.Parameter(order = 12) + @Value.Auxiliary + Optional attributionParams(); + @Override default StorageAccessConfig load() { return GcpCredentialsStorageIntegration.compute(this); @@ -86,10 +110,12 @@ static GcpStorageCredentialCacheKey of( Set allowedListLocations, Set allowedWriteLocations, Optional refreshCredentialsEndpoint, + String principalName, GoogleCredentials sourceCredentials, HttpTransportFactory transportFactory, RealmConfig realmConfig, - GcpCredentialOps credentialOps) { + GcpCredentialOps credentialOps, + Optional attributionParams) { return ImmutableGcpStorageCredentialCacheKey.of( realmId, storageConfig, @@ -97,9 +123,11 @@ static GcpStorageCredentialCacheKey of( allowedListLocations, allowedWriteLocations, refreshCredentialsEndpoint, + principalName, sourceCredentials, transportFactory, realmConfig, - credentialOps); + credentialOps, + attributionParams); } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilderTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilderTest.java new file mode 100644 index 00000000000..17b09e91ed3 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilderTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +class GcpAttributionSubjectBuilderTest { + + @Test + void simpleSubject() { + assertThat(GcpAttributionSubjectBuilder.buildSubject("tenant1", "etl_writer")) + .isEqualTo("tenant1/etl_writer"); + } + + @Test + void neverExceedsGcpLimit() { + assertThat(GcpAttributionSubjectBuilder.buildSubject("r".repeat(500), "p".repeat(500))) + .hasSize(GcpAttributionSubjectBuilder.MAX_SUBJECT_LENGTH) + .contains("/"); + } + + @Test + void shortRealmGivesPrincipalMoreBudget() { + String subject = GcpAttributionSubjectBuilder.buildSubject("t1", "p".repeat(500)); + // 127 - "t1/" (3 chars) = 124 chars of principal + assertThat(subject).isEqualTo("t1/" + "p".repeat(124)); + } + + @Test + void shortPrincipalGivesRealmMoreBudget() { + String subject = GcpAttributionSubjectBuilder.buildSubject("r".repeat(500), "me"); + assertThat(subject).isEqualTo("r".repeat(124) + "/me"); + } + + @Test + void bothLongSplitBudgetEvenly() { + String subject = GcpAttributionSubjectBuilder.buildSubject("r".repeat(500), "p".repeat(500)); + // budget = 126; realm gets floor(126/2)=63, principal gets the remaining 63. + assertThat(subject).isEqualTo("r".repeat(63) + "/" + "p".repeat(63)); + } + + @Test + void nullAndEmptyBecomeUnknown() { + assertThat(GcpAttributionSubjectBuilder.buildSubject(null, "p")).isEqualTo("unknown/p"); + assertThat(GcpAttributionSubjectBuilder.buildSubject("r", "")).isEqualTo("r/unknown"); + assertThat(GcpAttributionSubjectBuilder.buildSubject(null, null)).isEqualTo("unknown/unknown"); + } + + @Test + void controlCharsAndSeparatorStripped() { + assertThat(GcpAttributionSubjectBuilder.buildSubject("ten\r\nant", "etl\twriter")) + .isEqualTo("tenant/etlwriter"); + // A '/' in a field must not introduce a second separator. + String subject = GcpAttributionSubjectBuilder.buildSubject("tenant1", "a/b/c"); + assertThat(subject).isEqualTo("tenant1/abc"); + assertThat(subject.chars().filter(c -> c == '/').count()).isEqualTo(1); + } +} diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchangerTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchangerTest.java new file mode 100644 index 00000000000..933d82da161 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchangerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.auth0.jwt.JWT; +import com.auth0.jwt.interfaces.DecodedJWT; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.IdentityPoolCredentials; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.util.Base64; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class GcpFederatedCredentialsExchangerTest { + + private static final String ISSUER = "https://catalog.example.org/attribution"; + private static final String AUDIENCE = + "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/polaris/providers/catalog"; + private static final String KEY_ID = "attribution-key-1"; + private static final HttpTransportFactory TRANSPORT = NetHttpTransport::new; + + @TempDir Path tempDir; + private Path keyFile; + + @BeforeEach + void writeSigningKey() throws Exception { + KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA"); + generator.initialize(2048); + KeyPair keyPair = generator.generateKeyPair(); + String pem = + "-----BEGIN PRIVATE KEY-----\n" + + Base64.getMimeEncoder(64, "\n".getBytes(StandardCharsets.UTF_8)) + .encodeToString(keyPair.getPrivate().getEncoded()) + + "\n-----END PRIVATE KEY-----\n"; + keyFile = tempDir.resolve("attribution-key.pem"); + Files.writeString(keyFile, pem); + } + + private GcpFederatedCredentialsExchanger exchanger(String keyId) { + return new GcpFederatedCredentialsExchanger(ISSUER, AUDIENCE, keyFile, keyId, TRANSPORT); + } + + @Test + void attributionJwtCarriesExpectedClaimsAndKid() throws IOException { + DecodedJWT jwt = + JWT.decode(exchanger(KEY_ID).mintAttributionJwt("tenant1/etl_writer", "tenant1")); + + assertThat(jwt.getIssuer()).isEqualTo(ISSUER); + assertThat(jwt.getSubject()).isEqualTo("tenant1/etl_writer"); + assertThat(jwt.getAudience()).containsExactly(AUDIENCE); + assertThat(jwt.getClaim("realm").asString()).isEqualTo("tenant1"); + assertThat(jwt.getAlgorithm()).isEqualTo("RS256"); + assertThat(jwt.getKeyId()).isEqualTo(KEY_ID); + assertThat(jwt.getId()).isNotBlank(); + assertThat(jwt.getExpiresAt()).isAfter(jwt.getIssuedAt()); + } + + @Test + void emptyKeyIdOmitsKidHeader() throws IOException { + DecodedJWT jwt = JWT.decode(exchanger("").mintAttributionJwt("tenant1/p", "tenant1")); + assertThat(jwt.getKeyId()).isNull(); + } + + @Test + void federatedCredentialsConfiguredForStsExchange() { + GoogleCredentials credentials = exchanger(KEY_ID).federatedCredentials("tenant1/p", "tenant1"); + assertThat(credentials).isInstanceOf(IdentityPoolCredentials.class); + IdentityPoolCredentials idp = (IdentityPoolCredentials) credentials; + assertThat(idp.getAudience()).isEqualTo(AUDIENCE); + assertThat(idp.getSubjectTokenType()) + .isEqualTo(GcpFederatedCredentialsExchanger.SUBJECT_TOKEN_TYPE); + } + + @Test + void missingKeyFileFails() { + GcpFederatedCredentialsExchanger exchanger = + new GcpFederatedCredentialsExchanger( + ISSUER, AUDIENCE, tempDir.resolve("nope.pem"), KEY_ID, TRANSPORT); + assertThatThrownBy(() -> exchanger.mintAttributionJwt("tenant1/p", "tenant1")) + .isInstanceOf(IOException.class); + } + + @Test + void signingKeyIsCachedAcrossInstances() throws IOException { + // First use parses and caches the key for this (unique) path. + exchanger(KEY_ID).mintAttributionJwt("tenant1/p", "tenant1"); + // After the file is gone, a second instance on the same path still mints from the JVM cache, + // proving the parse is amortized rather than re-read per vend. + Files.delete(keyFile); + assertThatNoException() + .isThrownBy(() -> exchanger(KEY_ID).mintAttributionJwt("tenant1/p", "tenant1")); + } + + @Test + void readPkcs8PrivateKey() throws IOException { + assertThat(GcpFederatedCredentialsExchanger.readPkcs8PrivateKey(keyFile)).isNotNull(); + } +} diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKeyTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKeyTest.java new file mode 100644 index 00000000000..6ed6b5333f1 --- /dev/null +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKeyTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import java.util.Optional; +import java.util.Set; +import org.apache.polaris.core.config.RealmConfig; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class GcpStorageCredentialCacheKeyTest { + + private static final GcpStorageConfigurationInfo CONFIG = + GcpStorageConfigurationInfo.builder() + .addAllAllowedLocations(java.util.List.of("gs://bucket/path")) + .build(); + private static final GoogleCredentials CREDS = Mockito.mock(GoogleCredentials.class); + private static final HttpTransportFactory TRANSPORT = NetHttpTransport::new; + private static final RealmConfig REALM_CONFIG = Mockito.mock(RealmConfig.class); + + private static GcpStorageCredentialCacheKey key(String principalName) { + return GcpStorageCredentialCacheKey.of( + "tenant1", + CONFIG, + Set.of("gs://bucket/path"), + Set.of(), + Set.of(), + Optional.empty(), + principalName, + CREDS, + TRANSPORT, + REALM_CONFIG, + GcpCredentialOps.DEFAULT, + Optional.empty()); + } + + @Test + void principalNameIsPartOfCacheIdentity() { + // When attribution is on, the vended token is per-principal: two principals must not collide + // on one cache entry. + assertThat(key("alice")).isNotEqualTo(key("bob")); + assertThat(key("alice")).hasSameHashCodeAs(key("alice")); + assertThat(key("alice")).isEqualTo(key("alice")); + } + + @Test + void emptyPrincipalSharesOneEntry() { + // When attribution is off the principal is empty, preserving cross-principal cache reuse. + assertThat(key("")).isEqualTo(key("")); + } +} diff --git a/runtime/admin/distribution/LICENSE b/runtime/admin/distribution/LICENSE index 9a0ec2c5a2b..b91c547c857 100644 --- a/runtime/admin/distribution/LICENSE +++ b/runtime/admin/distribution/LICENSE @@ -399,6 +399,36 @@ License: Apache License 2.0 - https://github.com/hyperxpro/Brotli4j/blob/v1.16.0 -------------------------------------------------------------------------------- +This product bundles Auth0 Java JWT. + +* Maven group:artifact IDs: com.auth0:java-jwt + +Project URL: https://github.com/auth0/java-jwt +License: MIT License +| The MIT License (MIT) +| +| Copyright (c) 2015 Auth0, Inc. (http://auth0.com) +| +| Permission is hereby granted, free of charge, to any person obtaining a copy +| of this software and associated documentation files (the "Software"), to deal +| in the Software without restriction, including without limitation the rights +| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +| copies of the Software, and to permit persons to whom the Software is +| furnished to do so, subject to the following conditions: +| +| The above copyright notice and this permission notice shall be included in all +| copies or substantial portions of the Software. +| +| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +| SOFTWARE. + +-------------------------------------------------------------------------------- + This product bundles Azure SDK for Java. * Maven group:artifact IDs: com.azure:azure-core diff --git a/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md b/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md index 4080a68ee4a..eabc569c5bd 100644 --- a/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md +++ b/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md @@ -335,6 +335,42 @@ The maximum weight for the entity cache. This is a heuristic value without any p --- +##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE"` + +Filesystem path to the PKCS#8 PEM RSA private key used to sign GCS attribution JWTs (RS256). The corresponding public key must be published in the Workload Identity Pool provider's uploaded JWKS. Empty (default) disables principal attribution. + +- **Type:** `String` +- **Default:** `` + +--- + +##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_ID"` + +Key ID (kid) written into the header of GCS attribution JWTs so the Workload Identity Pool provider can select the right public key from its JWKS during key rotation (when the JWKS holds both the old and new keys). Must match the kid of the JWKS entry for the configured signing key. Empty omits the header (only safe with a single-key JWKS). + +- **Type:** `String` +- **Default:** `` + +--- + +##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER"` + +Issuer (iss claim) of catalog-minted GCS attribution JWTs; must match the issuer configured on the Workload Identity Pool OIDC provider. The provider verifies signatures against its uploaded JWKS, so no public discovery endpoint is required. Empty (default) disables principal attribution. + +- **Type:** `String` +- **Default:** `` + +--- + +##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE"` + +Full resource name of the Workload Identity Pool provider used for GCS principal attribution, e.g. //iam.googleapis.com/projects//locations/global/workloadIdentityPools//providers/. Used as both the attribution JWT 'aud' claim and the STS token-exchange audience. Empty (default) disables principal attribution. + +- **Type:** `String` +- **Default:** `` + +--- + ##### `polaris.features."ICEBERG_COMMIT_MAX_RETRIES"` The max number of times to try committing to an Iceberg table