From 1a4430f908e790215ef8b83990f3b76e1d374249 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 13 May 2026 20:28:26 -0700 Subject: [PATCH] Extract IdempotencyRecord data-model change and JDBC schema v5 --- .../relational/jdbc/DatabaseType.java | 6 +- .../RelationalJdbcIdempotencyStore.java | 4 +- .../jdbc/models/ModelIdempotencyRecord.java | 38 +-- .../main/resources/cockroachdb/schema-v5.sql | 318 ++++++++++++++++++ .../src/main/resources/h2/schema-v5.sql | 315 +++++++++++++++++ .../src/main/resources/postgres/schema-v5.sql | 317 +++++++++++++++++ ...thJdbcBasePersistenceImplV5SchemaTest.java | 29 ++ .../jdbc/JdbcBootstrapUtilsTest.java | 10 +- ...ationalJdbcIdempotencyStorePostgresIT.java | 24 +- .../core/entity/IdempotencyRecord.java | 106 +----- .../core/persistence/IdempotencyStore.java | 2 - 11 files changed, 1010 insertions(+), 159 deletions(-) create mode 100644 persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v5.sql create mode 100644 persistence/relational-jdbc/src/main/resources/h2/schema-v5.sql create mode 100644 persistence/relational-jdbc/src/main/resources/postgres/schema-v5.sql create mode 100644 persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplV5SchemaTest.java diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java index cc104e1f39f..aa8aab5b12b 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatabaseType.java @@ -53,9 +53,9 @@ public String getDisplayName() { */ public int getLatestSchemaVersion() { return switch (this) { - case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4 - case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL - case H2 -> 4; // H2 uses same schemas as PostgreSQL + case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5 + case COCKROACHDB -> 5; // CockroachDB schema version kept in sync with PostgreSQL + case H2 -> 5; // H2 uses same schemas as PostgreSQL }; } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java index 6b59413b352..0e13e2690ff 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStore.java @@ -62,10 +62,10 @@ public ReserveResult reserve( insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey); insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType); insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId); + insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, ""); insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null); insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null); insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null); - insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null); insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null); insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now)); insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now)); @@ -197,14 +197,12 @@ public boolean finalizeRecord( Integer httpStatus, String errorSubtype, String responseSummary, - String responseHeaders, Instant finalizedAt) { // Use ordered/set maps so we can include nullable values (Map.of disallows nulls). Map setClause = new LinkedHashMap<>(); setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus); setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype); setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary); - setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders); setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt)); setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt)); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java index 74473cc246c..5bafbf95cf4 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelIdempotencyRecord.java @@ -8,8 +8,8 @@ * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations @@ -42,35 +42,25 @@ public interface ModelIdempotencyRecord extends Converter { String TABLE_NAME = "idempotency_records"; - // Logical tenant / realm identifier. String REALM_ID = "realm_id"; - // Client-provided idempotency key. String IDEMPOTENCY_KEY = "idempotency_key"; - // Logical operation type (e.g. commit-table). String OPERATION_TYPE = "operation_type"; - // Normalized identifier of the affected resource. String RESOURCE_ID = "resource_id"; - // Final HTTP status code once the operation is completed (null while in-progress). + // Hash of caller principal identity bound to the reservation. + // Compared on replay to prevent cross-principal cache hits. + String PRINCIPAL_HASH = "principal_hash"; + String HTTP_STATUS = "http_status"; - // Optional error subtype for failures. String ERROR_SUBTYPE = "error_subtype"; - // Short serialized representation of the response body. + String RESPONSE_SUMMARY = "response_summary"; - // Serialized representation of response headers. - String RESPONSE_HEADERS = "response_headers"; - // Timestamp when the operation was finalized (null while in-progress). String FINALIZED_AT = "finalized_at"; - // Timestamp when the record was created. String CREATED_AT = "created_at"; - // Timestamp when the record was last updated. String UPDATED_AT = "updated_at"; - // Timestamp for the last heartbeat update (null if no heartbeat recorded). String HEARTBEAT_AT = "heartbeat_at"; - // Identifier of the executor that owns the in-progress record (null if not owned). String EXECUTOR_ID = "executor_id"; - // Expiration timestamp after which the record can be considered stale/purgeable. String EXPIRES_AT = "expires_at"; List ALL_COLUMNS = @@ -78,10 +68,10 @@ public interface ModelIdempotencyRecord extends Converter { IDEMPOTENCY_KEY, OPERATION_TYPE, RESOURCE_ID, + PRINCIPAL_HASH, HTTP_STATUS, ERROR_SUBTYPE, RESPONSE_SUMMARY, - RESPONSE_HEADERS, FINALIZED_AT, CREATED_AT, UPDATED_AT, @@ -97,6 +87,8 @@ public interface ModelIdempotencyRecord extends Converter { String getResourceId(); + String getPrincipalHash(); + @Nullable Integer getHttpStatus(); @@ -106,9 +98,6 @@ public interface ModelIdempotencyRecord extends Converter { @Nullable String getResponseSummary(); - @Nullable - String getResponseHeaders(); - @Nullable Instant getFinalizedAt(); @@ -131,7 +120,6 @@ default IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException { /** Convert the current ResultSet row into an {@link IdempotencyRecord}. */ static IdempotencyRecord fromRow(ResultSet rs) throws SQLException { - // Requires realm_id to be projected in the ResultSet. return fromRow(rs.getString(REALM_ID), rs); } @@ -143,11 +131,11 @@ static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws SQLExcepti String idempotencyKey = rs.getString(IDEMPOTENCY_KEY); String operationType = rs.getString(OPERATION_TYPE); String resourceId = rs.getString(RESOURCE_ID); + String principalHash = rs.getString(PRINCIPAL_HASH); Integer httpStatus = (Integer) rs.getObject(HTTP_STATUS); String errorSubtype = rs.getString(ERROR_SUBTYPE); String responseSummary = rs.getString(RESPONSE_SUMMARY); - String responseHeaders = rs.getString(RESPONSE_HEADERS); Instant createdAt = rs.getTimestamp(CREATED_AT).toInstant(); Instant updatedAt = rs.getTimestamp(UPDATED_AT).toInstant(); @@ -166,10 +154,10 @@ static IdempotencyRecord fromRow(String realmId, ResultSet rs) throws SQLExcepti idempotencyKey, operationType, resourceId, + principalHash, httpStatus, errorSubtype, responseSummary, - responseHeaders, createdAt, updatedAt, finalizedAt, @@ -184,10 +172,10 @@ default Map toMap(DatabaseType databaseType) { map.put(IDEMPOTENCY_KEY, getIdempotencyKey()); map.put(OPERATION_TYPE, getOperationType()); map.put(RESOURCE_ID, getResourceId()); + map.put(PRINCIPAL_HASH, getPrincipalHash()); map.put(HTTP_STATUS, getHttpStatus()); map.put(ERROR_SUBTYPE, getErrorSubtype()); map.put(RESPONSE_SUMMARY, getResponseSummary()); - map.put(RESPONSE_HEADERS, getResponseHeaders()); map.put(FINALIZED_AT, getFinalizedAt() == null ? null : Timestamp.from(getFinalizedAt())); map.put(CREATED_AT, Timestamp.from(getCreatedAt())); map.put(UPDATED_AT, Timestamp.from(getUpdatedAt())); diff --git a/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v5.sql b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v5.sql new file mode 100644 index 00000000000..7eb6b13f784 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/cockroachdb/schema-v5.sql @@ -0,0 +1,318 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- CockroachDB schema v5 (matching PostgreSQL schema v5) +-- Schema version is kept in sync with PostgreSQL to ensure correct column selection in ModelEntity. +-- Changes from v4: +-- * idempotency_records: add `principal_hash` (NOT NULL) so handler-level idempotency can +-- bind reservations to the calling principal and reject cross-principal cache hits. +-- * idempotency_records: drop the unused `response_headers` column. The handler-level design +-- rebuilds responses from authoritative state on replay rather than serving stored bodies, +-- so no headers need to be replayed. +-- +-- Migration notes: +-- * The idempotency feature was disabled by default in 1.4.0, so the table is expected to be +-- empty and the in-place ADD/DROP is safe. The ALTER statements below are written with +-- IF [NOT] EXISTS so they are idempotent on both fresh installs (where the CREATE TABLE +-- above already produced the new shape) and existing 1.4.0 installs. + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INT4 NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 5) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT4 NOT NULL, + type_code INT4 NOT NULL, + sub_type_code INT4 NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties JSONB not null default '{}'::JSONB, + internal_properties JSONB not null default '{}'::JSONB, + grant_records_version INT4 NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations + ON entities USING btree (realm_id, parent_id, location_without_scheme) + WHERE location_without_scheme IS NOT NULL; + +COMMENT ON TABLE entities IS 'all the entities'; + +COMMENT ON COLUMN entities.realm_id IS 'realm_id used for multi-tenancy'; +COMMENT ON COLUMN entities.catalog_id IS 'catalog id'; +COMMENT ON COLUMN entities.id IS 'entity id'; +COMMENT ON COLUMN entities.parent_id IS 'entity id of parent'; +COMMENT ON COLUMN entities.name IS 'entity name'; +COMMENT ON COLUMN entities.entity_version IS 'version of the entity'; +COMMENT ON COLUMN entities.type_code IS 'type code'; +COMMENT ON COLUMN entities.sub_type_code IS 'sub type of entity'; +COMMENT ON COLUMN entities.create_timestamp IS 'creation time of entity'; +COMMENT ON COLUMN entities.drop_timestamp IS 'time of drop of entity'; +COMMENT ON COLUMN entities.purge_timestamp IS 'time to start purging entity'; +COMMENT ON COLUMN entities.last_update_timestamp IS 'last time the entity is touched'; +COMMENT ON COLUMN entities.properties IS 'entities properties json'; +COMMENT ON COLUMN entities.internal_properties IS 'entities internal properties json'; +COMMENT ON COLUMN entities.grant_records_version IS 'the version of grant records change on the entity'; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INT4, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; + +COMMENT ON COLUMN grant_records.securable_catalog_id IS 'catalog id of the securable'; +COMMENT ON COLUMN grant_records.securable_id IS 'entity id of the securable'; +COMMENT ON COLUMN grant_records.grantee_catalog_id IS 'catalog id of the grantee'; +COMMENT ON COLUMN grant_records.grantee_id IS 'id of the grantee'; +COMMENT ON COLUMN grant_records.privilege_code IS 'privilege code'; + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INT4 NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id) + principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay to prevent cross-principal cache hits + + -- Finalization/replay + http_status INT4, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); + +-- v4 -> v5 migration for idempotency_records: idempotent ALTER TABLE statements that bring an +-- existing v4 table up to the v5 shape and are no-ops on a fresh v5 install. +ALTER TABLE idempotency_records ADD COLUMN IF NOT EXISTS principal_hash TEXT; +UPDATE idempotency_records SET principal_hash = '' WHERE principal_hash IS NULL; +ALTER TABLE idempotency_records ALTER COLUMN principal_hash SET NOT NULL; +ALTER TABLE idempotency_records DROP COLUMN IF EXISTS response_headers; + +-- ============================================================================ +-- SCAN METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Scan context + snapshot_id BIGINT, + schema_id INT4, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + + -- Scan metrics + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + + -- Equality/positional delete metrics + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + + -- Additional metadata (for extensibility) + metadata JSONB DEFAULT '{}'::JSONB, + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE scan_metrics_report IS 'Scan metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +-- ============================================================================ +-- COMMIT METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Commit context + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + + -- File metrics + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + + -- Equality delete files + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + + -- Positional delete files + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + + -- Record metrics + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + + -- Size metrics + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + + -- Duration and attempts + total_duration_ms BIGINT DEFAULT 0, + attempts INT4 DEFAULT 1, + + -- Additional metadata (for extensibility) + metadata JSONB DEFAULT '{}'::JSONB, + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE commit_metrics_report IS 'Commit metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +-- INT4 type used directly in table definitions for CockroachDB JDBC compatibility +-- CockroachDB requires explicit INT4 type declarations to correctly map columns to Java's Integer type. +-- Using generic INTEGER or INT types causes type mapping failures in CockroachDB's JDBC driver. +-- INT4 is equivalent to INTEGER in PostgreSQL, ensuring compatibility with both databases. diff --git a/persistence/relational-jdbc/src/main/resources/h2/schema-v5.sql b/persistence/relational-jdbc/src/main/resources/h2/schema-v5.sql new file mode 100644 index 00000000000..0b47ff72f76 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/h2/schema-v5.sql @@ -0,0 +1,315 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +-- Changes from v4: +-- * idempotency_records: add `principal_hash` (NOT NULL) so handler-level idempotency can +-- bind reservations to the calling principal and reject cross-principal cache hits. +-- * idempotency_records: drop the unused `response_headers` column. The handler-level design +-- rebuilds responses from authoritative state on replay rather than serving stored bodies, +-- so no headers need to be replayed. +-- +-- Migration notes: +-- * The idempotency feature was disabled by default in 1.4.0, so the table is expected to be +-- empty and the in-place ADD/DROP is safe. The ALTER statements below are written with +-- IF [NOT] EXISTS so they are idempotent on both fresh installs (where the CREATE TABLE +-- above already produced the new shape) and existing 1.4.0 installs. + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET SCHEMA POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key VARCHAR PRIMARY KEY, + version_value INTEGER NOT NULL +); + +MERGE INTO version (version_key, version_value) + KEY (version_key) + VALUES ('version', 5); + +-- H2 supports COMMENT, but some modes may ignore it +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties TEXT NOT NULL DEFAULT '{}', + internal_properties TEXT NOT NULL DEFAULT '{}', + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +CREATE INDEX IF NOT EXISTS idx_locations ON entities(realm_id, catalog_id, location_without_scheme); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_entities_catalog_id_id ON entities (catalog_id, id); + +COMMENT ON TABLE entities IS 'all the entities'; + +COMMENT ON COLUMN entities.catalog_id IS 'catalog id'; +COMMENT ON COLUMN entities.id IS 'entity id'; +COMMENT ON COLUMN entities.parent_id IS 'entity id of parent'; +COMMENT ON COLUMN entities.name IS 'entity name'; +COMMENT ON COLUMN entities.entity_version IS 'version of the entity'; +COMMENT ON COLUMN entities.type_code IS 'type code'; +COMMENT ON COLUMN entities.sub_type_code IS 'sub type of entity'; +COMMENT ON COLUMN entities.create_timestamp IS 'creation time of entity'; +COMMENT ON COLUMN entities.drop_timestamp IS 'time of drop of entity'; +COMMENT ON COLUMN entities.purge_timestamp IS 'time to start purging entity'; +COMMENT ON COLUMN entities.last_update_timestamp IS 'last time the entity is touched'; +COMMENT ON COLUMN entities.properties IS 'entities properties json'; +COMMENT ON COLUMN entities.internal_properties IS 'entities internal properties json'; +COMMENT ON COLUMN entities.grant_records_version IS 'the version of grant records change on the entity'; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; +COMMENT ON COLUMN grant_records.securable_catalog_id IS 'catalog id of the securable'; +COMMENT ON COLUMN grant_records.securable_id IS 'entity id of the securable'; +COMMENT ON COLUMN grant_records.grantee_catalog_id IS 'catalog id of the grantee'; +COMMENT ON COLUMN grant_records.grantee_id IS 'id of the grantee'; +COMMENT ON COLUMN grant_records.privilege_code IS 'privilege code'; + +CREATE INDEX IF NOT EXISTS idx_grants_realm_grantee + ON grant_records (realm_id, grantee_id); +CREATE INDEX IF NOT EXISTS idx_grants_realm_securable + ON grant_records (realm_id, securable_id); + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties TEXT NOT NULL, + PRIMARY KEY (event_id) +); + +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id) + principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay to prevent cross-principal cache hits + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); + +-- v4 -> v5 migration for idempotency_records: idempotent ALTER TABLE statements that bring an +-- existing v4 table up to the v5 shape and are no-ops on a fresh v5 install. +ALTER TABLE idempotency_records ADD COLUMN IF NOT EXISTS principal_hash TEXT; +UPDATE idempotency_records SET principal_hash = '' WHERE principal_hash IS NULL; +ALTER TABLE idempotency_records ALTER COLUMN principal_hash SET NOT NULL; +ALTER TABLE idempotency_records DROP COLUMN IF EXISTS response_headers; + +-- ============================================================================ +-- SCAN METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Scan context + snapshot_id BIGINT, + schema_id INTEGER, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + + -- Scan metrics + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + + -- Equality/positional delete metrics + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + + -- Additional metadata (for extensibility) + metadata TEXT DEFAULT '{}', + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE scan_metrics_report IS 'Scan metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +-- ============================================================================ +-- COMMIT METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Commit context + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + + -- File metrics + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + + -- Equality delete files + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + + -- Positional delete files + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + + -- Record metrics + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + + -- Size metrics + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + + -- Duration and attempts + total_duration_ms BIGINT DEFAULT 0, + attempts INTEGER DEFAULT 1, + + -- Additional metadata (for extensibility) + metadata TEXT DEFAULT '{}', + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE commit_metrics_report IS 'Commit metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); diff --git a/persistence/relational-jdbc/src/main/resources/postgres/schema-v5.sql b/persistence/relational-jdbc/src/main/resources/postgres/schema-v5.sql new file mode 100644 index 00000000000..c2f1808a2b1 --- /dev/null +++ b/persistence/relational-jdbc/src/main/resources/postgres/schema-v5.sql @@ -0,0 +1,317 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file-- +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Changes from v4: +-- * idempotency_records: add `principal_hash` (NOT NULL) so handler-level idempotency can +-- bind reservations to the calling principal and reject cross-principal cache hits. +-- * idempotency_records: drop the unused `response_headers` column. The handler-level design +-- rebuilds responses from authoritative state on replay rather than serving stored bodies, +-- so no headers need to be replayed. +-- +-- Migration notes: +-- * The idempotency feature was disabled by default in 1.4.0, so the table is expected to be +-- empty and the in-place ADD/DROP is safe. The ALTER statements below are written with +-- IF [NOT] EXISTS so they are idempotent on both fresh installs (where the CREATE TABLE +-- above already produced the new shape) and existing 1.4.0 installs. + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 5) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +CREATE TABLE IF NOT EXISTS entities ( + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + id BIGINT NOT NULL, + parent_id BIGINT NOT NULL, + name TEXT NOT NULL, + entity_version INT NOT NULL, + type_code INT NOT NULL, + sub_type_code INT NOT NULL, + create_timestamp BIGINT NOT NULL, + drop_timestamp BIGINT NOT NULL, + purge_timestamp BIGINT NOT NULL, + to_purge_timestamp BIGINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + properties JSONB not null default '{}'::JSONB, + internal_properties JSONB not null default '{}'::JSONB, + grant_records_version INT NOT NULL, + location_without_scheme TEXT, + PRIMARY KEY (realm_id, id), + CONSTRAINT constraint_name UNIQUE (realm_id, catalog_id, parent_id, type_code, name) +); + +-- TODO: create indexes based on all query pattern. +CREATE INDEX IF NOT EXISTS idx_entities ON entities (realm_id, catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_entities_catalog_id_id ON entities (catalog_id, id); +CREATE INDEX IF NOT EXISTS idx_locations + ON entities USING btree (realm_id, parent_id, location_without_scheme) + WHERE location_without_scheme IS NOT NULL; + +COMMENT ON TABLE entities IS 'all the entities'; + +COMMENT ON COLUMN entities.realm_id IS 'realm_id used for multi-tenancy'; +COMMENT ON COLUMN entities.catalog_id IS 'catalog id'; +COMMENT ON COLUMN entities.id IS 'entity id'; +COMMENT ON COLUMN entities.parent_id IS 'entity id of parent'; +COMMENT ON COLUMN entities.name IS 'entity name'; +COMMENT ON COLUMN entities.entity_version IS 'version of the entity'; +COMMENT ON COLUMN entities.type_code IS 'type code'; +COMMENT ON COLUMN entities.sub_type_code IS 'sub type of entity'; +COMMENT ON COLUMN entities.create_timestamp IS 'creation time of entity'; +COMMENT ON COLUMN entities.drop_timestamp IS 'time of drop of entity'; +COMMENT ON COLUMN entities.purge_timestamp IS 'time to start purging entity'; +COMMENT ON COLUMN entities.last_update_timestamp IS 'last time the entity is touched'; +COMMENT ON COLUMN entities.properties IS 'entities properties json'; +COMMENT ON COLUMN entities.internal_properties IS 'entities internal properties json'; +COMMENT ON COLUMN entities.grant_records_version IS 'the version of grant records change on the entity'; + +CREATE TABLE IF NOT EXISTS grant_records ( + realm_id TEXT NOT NULL, + securable_catalog_id BIGINT NOT NULL, + securable_id BIGINT NOT NULL, + grantee_catalog_id BIGINT NOT NULL, + grantee_id BIGINT NOT NULL, + privilege_code INTEGER, + PRIMARY KEY (realm_id, securable_catalog_id, securable_id, grantee_catalog_id, grantee_id, privilege_code) +); + +COMMENT ON TABLE grant_records IS 'grant records for entities'; + +COMMENT ON COLUMN grant_records.securable_catalog_id IS 'catalog id of the securable'; +COMMENT ON COLUMN grant_records.securable_id IS 'entity id of the securable'; +COMMENT ON COLUMN grant_records.grantee_catalog_id IS 'catalog id of the grantee'; +COMMENT ON COLUMN grant_records.grantee_id IS 'id of the grantee'; +COMMENT ON COLUMN grant_records.privilege_code IS 'privilege code'; + +CREATE INDEX IF NOT EXISTS idx_grants_realm_grantee + ON grant_records (realm_id, grantee_id); +CREATE INDEX IF NOT EXISTS idx_grants_realm_securable + ON grant_records (realm_id, securable_id); + +CREATE TABLE IF NOT EXISTS principal_authentication_data ( + realm_id TEXT NOT NULL, + principal_id BIGINT NOT NULL, + principal_client_id VARCHAR(255) NOT NULL, + main_secret_hash VARCHAR(255) NOT NULL, + secondary_secret_hash VARCHAR(255) NOT NULL, + secret_salt VARCHAR(255) NOT NULL, + PRIMARY KEY (realm_id, principal_client_id) +); + +COMMENT ON TABLE principal_authentication_data IS 'authentication data for client'; + +CREATE TABLE IF NOT EXISTS policy_mapping_record ( + realm_id TEXT NOT NULL, + target_catalog_id BIGINT NOT NULL, + target_id BIGINT NOT NULL, + policy_type_code INTEGER NOT NULL, + policy_catalog_id BIGINT NOT NULL, + policy_id BIGINT NOT NULL, + parameters JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (realm_id, target_catalog_id, target_id, policy_type_code, policy_catalog_id, policy_id) +); + +CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id); + +CREATE TABLE IF NOT EXISTS events ( + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + event_id TEXT NOT NULL, + request_id TEXT, + event_type TEXT NOT NULL, + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + resource_type TEXT NOT NULL, + resource_identifier TEXT NOT NULL, + additional_properties JSONB NOT NULL DEFAULT '{}'::JSONB, + PRIMARY KEY (event_id) +); + +-- Idempotency records (key-only idempotency; durable replay) +CREATE TABLE IF NOT EXISTS idempotency_records ( + realm_id TEXT NOT NULL, + idempotency_key TEXT NOT NULL, + operation_type TEXT NOT NULL, + resource_id TEXT NOT NULL, -- normalized request-derived resource identifier (not a generated entity id) + principal_hash TEXT NOT NULL, -- hash of caller principal + realm; checked on replay to prevent cross-principal cache hits + + -- Finalization/replay + http_status INTEGER, -- NULL while IN_PROGRESS; set only on finalized 2xx/terminal 4xx + error_subtype TEXT, -- optional: e.g., already_exists, namespace_not_empty, idempotency_replay_failed + response_summary TEXT, -- minimal body to reproduce equivalent response (JSON string); null for credential-bearing mutations + finalized_at TIMESTAMP, -- when http_status was written + + -- Liveness/ops + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + heartbeat_at TIMESTAMP, -- updated by owner while IN_PROGRESS + executor_id TEXT, -- owner pod/worker id + expires_at TIMESTAMP, + + PRIMARY KEY (realm_id, idempotency_key) +); + +-- Helpful indexes +CREATE INDEX IF NOT EXISTS idx_idemp_realm_expires + ON idempotency_records (realm_id, expires_at); + +-- v4 -> v5 migration for idempotency_records: idempotent ALTER TABLE statements that bring an +-- existing v4 table up to the v5 shape and are no-ops on a fresh v5 install. +ALTER TABLE idempotency_records ADD COLUMN IF NOT EXISTS principal_hash TEXT; +UPDATE idempotency_records SET principal_hash = '' WHERE principal_hash IS NULL; +ALTER TABLE idempotency_records ALTER COLUMN principal_hash SET NOT NULL; +ALTER TABLE idempotency_records DROP COLUMN IF EXISTS response_headers; + +-- ============================================================================ +-- SCAN METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Scan context + snapshot_id BIGINT, + schema_id INTEGER, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + + -- Scan metrics + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + + -- Equality/positional delete metrics + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + + -- Additional metadata (for extensibility) + metadata JSONB DEFAULT '{}'::JSONB, + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE scan_metrics_report IS 'Scan metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_scan_report_timestamp ON scan_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_scan_report_lookup ON scan_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); + +-- ============================================================================ +-- COMMIT METRICS REPORT TABLE +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS commit_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Commit context + snapshot_id BIGINT NOT NULL, + sequence_number BIGINT, + operation TEXT NOT NULL, + + -- File metrics + added_data_files BIGINT DEFAULT 0, + removed_data_files BIGINT DEFAULT 0, + total_data_files BIGINT DEFAULT 0, + added_delete_files BIGINT DEFAULT 0, + removed_delete_files BIGINT DEFAULT 0, + total_delete_files BIGINT DEFAULT 0, + + -- Equality delete files + added_equality_delete_files BIGINT DEFAULT 0, + removed_equality_delete_files BIGINT DEFAULT 0, + + -- Positional delete files + added_positional_delete_files BIGINT DEFAULT 0, + removed_positional_delete_files BIGINT DEFAULT 0, + + -- Record metrics + added_records BIGINT DEFAULT 0, + removed_records BIGINT DEFAULT 0, + total_records BIGINT DEFAULT 0, + + -- Size metrics + added_file_size_bytes BIGINT DEFAULT 0, + removed_file_size_bytes BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + + -- Duration and attempts + total_duration_ms BIGINT DEFAULT 0, + attempts INTEGER DEFAULT 1, + + -- Additional metadata (for extensibility) + metadata JSONB DEFAULT '{}'::JSONB, + + PRIMARY KEY (realm_id, report_id) +); + +COMMENT ON TABLE commit_metrics_report IS 'Commit metrics reports as first-class entities'; + +-- Index for retention cleanup by timestamp +CREATE INDEX IF NOT EXISTS idx_commit_report_timestamp ON commit_metrics_report(realm_id, timestamp_ms); + +-- Index for query lookups by catalog_id and table_id +CREATE INDEX IF NOT EXISTS idx_commit_report_lookup ON commit_metrics_report(realm_id, catalog_id, table_id, timestamp_ms); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplV5SchemaTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplV5SchemaTest.java new file mode 100644 index 00000000000..f59e2604dd1 --- /dev/null +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplV5SchemaTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.persistence.relational.jdbc; + +public class AtomicMetastoreManagerWithJdbcBasePersistenceImplV5SchemaTest + extends AtomicMetastoreManagerWithJdbcBasePersistenceImplTest { + + @Override + public int schemaVersion() { + return 5; + } +} diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtilsTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtilsTest.java index 671baf8ac24..b4f7a2fe650 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtilsTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/JdbcBootstrapUtilsTest.java @@ -61,7 +61,7 @@ void getVersion_whenFreshDbAndNoRealms() { // Act & Assert assertEquals( - 4, + 5, JdbcBootstrapUtils.getRealmBootstrapSchemaVersion( DatabaseType.POSTGRES, currentVersion, -1, hasRealms)); assertEquals( @@ -96,9 +96,11 @@ void getVersion_whenFreshDbAndRealmsExist() { "2, true, 2", "3, true, 3", "4, true, 4", - "2, false, 4", - "3, false, 4", - "4, false, 4" + "5, true, 5", + "2, false, 5", + "3, false, 5", + "4, false, 5", + "5, false, 5" }) void getVersion_whenExistingDbAndAutoDetect( int currentVersion, boolean hasRealms, int expectedVersion) { diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java index 8c6f70b26ae..46e415ce9aa 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/idempotency/RelationalJdbcIdempotencyStorePostgresIT.java @@ -87,9 +87,9 @@ public Optional databaseType() { try (InputStream is = Thread.currentThread() .getContextClassLoader() - .getResourceAsStream("postgres/schema-v4.sql")) { + .getResourceAsStream("postgres/schema-v5.sql")) { if (is == null) { - throw new IllegalStateException("schema-v4.sql not found on classpath"); + throw new IllegalStateException("schema-v5.sql not found on classpath"); } ops.executeScript(is); } @@ -138,27 +138,11 @@ void heartbeatAndFinalize() { HeartbeatResult hb = store.updateHeartbeat(realm, key, "A", now.plusSeconds(1)); assertThat(hb).isEqualTo(HeartbeatResult.UPDATED); - boolean fin = - store.finalizeRecord( - realm, - key, - 201, - null, - "{\"ok\":true}", - "{\"Content-Type\":\"application/json\"}", - now.plusSeconds(2)); + boolean fin = store.finalizeRecord(realm, key, 201, null, "{\"ok\":true}", now.plusSeconds(2)); assertThat(fin).isTrue(); // finalize again should be a no-op - boolean fin2 = - store.finalizeRecord( - realm, - key, - 201, - null, - "{\"ok\":true}", - "{\"Content-Type\":\"application/json\"}", - now.plusSeconds(3)); + boolean fin2 = store.finalizeRecord(realm, key, 201, null, "{\"ok\":true}", now.plusSeconds(3)); assertThat(fin2).isFalse(); Optional rec = store.load(realm, key); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java index b7a05b9cee0..25509e8d824 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/IdempotencyRecord.java @@ -26,15 +26,15 @@ * * @param realmId Logical tenant / realm identifier. * @param idempotencyKey Client-provided idempotency key. - * @param operationType Logical operation type (e.g. {@code "commit-table"}). + * @param operationType Logical operation type (e.g. {@code "create-table"}). * @param normalizedResourceId Request-derived, fully-qualified identifier of the affected resource * (see {@link #normalizedResourceId ()}). + * @param principalHash Hash of the caller principal identity bound to this reservation. Compared on + * replay to prevent cross-principal cache hits. * @param httpStatus HTTP status code returned to the client once finalized; {@code null} while * in-progress. * @param errorSubtype Optional error subtype/code when the operation failed. * @param responseSummary Minimal serialized representation of the response body for replay. - * @param responseHeaders Serialized representation of a small, whitelisted set of response headers - * for replay. * @param finalizedAt Timestamp when the operation was finalized; {@code null} while in-progress. * @param createdAt Timestamp when the record was created. * @param updatedAt Timestamp when the record was last updated. @@ -50,10 +50,10 @@ public record IdempotencyRecord( String idempotencyKey, String operationType, String normalizedResourceId, + String principalHash, Integer httpStatus, String errorSubtype, String responseSummary, - String responseHeaders, Instant createdAt, Instant updatedAt, Instant finalizedAt, @@ -61,104 +61,6 @@ public record IdempotencyRecord( String executorId, Instant expiresAt) { - /** - * Normalized identifier of the resource affected by the operation. - * - *

This should be derived from the request (for example, a canonicalized and fully-qualified - * identifier like {@code "catalogs//tables/ns.tbl"}), not from a generated internal - * entity id. - * - *

The identifier must be stable even on failure (before any entities are created) and must be - * scoped to avoid false conflicts (for example, include the catalog/warehouse identifier when - * applicable). - */ - @Override - public String normalizedResourceId() { - return normalizedResourceId; - } - - /** - * HTTP status code returned to the client for this idempotent operation. - * - *

Remains {@code null} while the record is {@code IN_PROGRESS} and is set only when the - * operation reaches a terminal 2xx or 4xx state. - */ - @Override - public Integer httpStatus() { - return httpStatus; - } - - /** - * Optional error subtype or code that provides additional detail when the operation failed. - * - *

Examples include {@code already_exists}, {@code namespace_not_empty}, or {@code - * idempotency_replay_failed}. - */ - @Override - public String errorSubtype() { - return errorSubtype; - } - - /** - * Minimal serialized representation of the response body used to reproduce an equivalent - * response. - * - *

This is typically a compact JSON string that contains just enough information for the HTTP - * layer to reconstruct the response for duplicate idempotent requests. - */ - @Override - public String responseSummary() { - return responseSummary; - } - - /** - * Serialized representation of a small, whitelisted set of HTTP response headers. - * - *

Stored as a JSON string so that the HTTP layer can replay key headers (such as {@code - * Content-Type}) when serving a duplicate idempotent request. - */ - @Override - public String responseHeaders() { - return responseHeaders; - } - - /** - * Timestamp indicating when the record was finalized. - * - *

Set at the same time as {@link #httpStatus ()} when the operation completes; {@code null} - * while the record is still {@code IN_PROGRESS}. - */ - @Override - public Instant finalizedAt() { - return finalizedAt; - } - - /** - * Timestamp of the most recent successful heartbeat while the operation is {@code IN_PROGRESS}. - * - *

This is updated by the owning executor to signal liveness and is used by reconciliation - * logic to detect stuck or abandoned in-progress records. - */ - @Override - public Instant heartbeatAt() { - return heartbeatAt; - } - - /** - * Identifier of the executor (for example pod or worker id) that currently owns the in-progress - * reservation. - */ - @Override - public String executorId() { - return executorId; - } - - /** Timestamp after which the reservation is considered expired and eligible for purging. */ - @Override - public Instant expiresAt() { - return expiresAt; - } - public boolean isFinalized() { return httpStatus != null; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java index 125ea081714..851aaf0cfc9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IdempotencyStore.java @@ -142,7 +142,6 @@ HeartbeatResult updateHeartbeat( * @param httpStatus HTTP status code returned to the client, or {@code null} if not applicable * @param errorSubtype optional error subtype or code, if the operation failed * @param responseSummary short, serialized representation of the response body - * @param responseHeaders serialized representation of response headers * @param finalizedAt timestamp when the operation completed * @return {@code true} if the record was transitioned to a finalized state, {@code false} * otherwise @@ -153,7 +152,6 @@ boolean finalizeRecord( Integer httpStatus, String errorSubtype, String responseSummary, - String responseHeaders, Instant finalizedAt); /**