Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
### Highlights

### Upgrade notes
- The relational-JDBC metastore schema is bumped to v5, which reshapes the (previously unused) `idempotency_records` table for the optimistic-commit idempotency model. New installations bootstrap at v5 automatically. Existing v4 installations are not migrated in place; enabling idempotency requires bootstrapping at v5 or applying the equivalent DDL from `schema-v5.sql`. The v4 `idempotency_records` table was never wired to request handling, so no data migration is needed.
- Event listeners are now executed on a dedicated executor. **This executor does not propagate the original request's CDI context**; listeners that were improperly relying on that should instead manage their own CDI request scope from now on. Furthermore, two new configuration options were introduced to configure the executor:
- `polaris.event-listener.executor.pool-size` configures the thread pool size.
- `polaris.event-listener.executor.queue-size` configures the queue size for pending events when all threads are busy.
Expand All @@ -52,6 +53,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti
- Added support for `register table` overwrite semantics in the Iceberg REST catalog flow (`overwrite=true`) for internal Polaris catalogs. With overwrite enabled, existing table pointers can be updated to a new metadata location while preserving default behavior for `overwrite=false`.
- Added `REGISTER_TABLE_OVERWRITE` authorization operation mapped to `TABLE_FULL_METADATA` for deterministic overwrite authorization.
- Added Polaris Spark 4.0 client.
- Added handler-level support for the Iceberg REST `Idempotency-Key` header on `createTable`, using an optimistic-commit model: the terminal outcome is recorded only after a successful (2xx) response and retries replay an equivalent response rebuilt from current catalog state (no response body is stored). The key is bound to the request-derived resource (operation, namespace, name and access-delegation modes) and the caller principal, so reusing a key for a different resource or by a different caller is rejected with HTTP 422. A retry that loses a concurrent create race replays the winning request instead of returning 409, and a replay returns 422 if the table has advanced beyond the originally-created state. Only successful outcomes are recorded — a retry after a failure simply re-runs the operation. Idempotency is disabled by default and configured under `polaris.idempotency`; records are kept in a standalone idempotency store decoupled from the metastore persistence.

### Changes
- Added REPL support to Polaris CLI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public String getDisplayName() {
*/
public int getLatestSchemaVersion() {
return switch (this) {
case POSTGRES -> 4; // PostgreSQL has schemas v1, v2, v3, v4
case COCKROACHDB -> 4; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 4; // H2 uses same schemas as PostgreSQL
case POSTGRES -> 5; // PostgreSQL has schemas v1, v2, v3, v4, v5
case COCKROACHDB -> 5; // CockroachDB schema version kept in sync with PostgreSQL
case H2 -> 5; // H2 uses same schemas as PostgreSQL
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,88 +20,54 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.polaris.core.entity.IdempotencyRecord;
import org.apache.polaris.core.persistence.IdempotencyPersistenceException;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;
import org.apache.polaris.persistence.relational.jdbc.QueryGenerator;
import org.apache.polaris.persistence.relational.jdbc.RelationalJdbcConfiguration;
import org.apache.polaris.persistence.relational.jdbc.models.Converter;
import org.apache.polaris.persistence.relational.jdbc.models.ModelIdempotencyRecord;
import org.jspecify.annotations.NonNull;

/**
* JDBC-backed {@link IdempotencyStore}.
*
* <p>Implements the "optimistic commit" model: a row is inserted only after the originating
* operation has finalized. Race conditions between concurrent retries are detected via the table's
* {@code (realm_id, idempotency_key)} primary key — a duplicate INSERT surfaces as a constraint
* violation, which we translate into a {@link RecordResultType#DUPLICATE} along with the existing
* row.
*
* <p>Following the {@code JdbcBasePersistenceImpl} pattern, an instance is bound to a single realm
* at construction; realm scoping is then applied to every query via the {@code realm_id} column.
*/
public class RelationalJdbcIdempotencyStore implements IdempotencyStore {

private final DatasourceOperations datasourceOperations;
private final String realmId;

public RelationalJdbcIdempotencyStore(
@NonNull DataSource dataSource, @NonNull RelationalJdbcConfiguration cfg)
throws SQLException {
this.datasourceOperations = new DatasourceOperations(dataSource, cfg);
@NonNull DatasourceOperations datasourceOperations, @NonNull String realmId) {
this.datasourceOperations = datasourceOperations;
Comment thread
dimas-b marked this conversation as resolved.
this.realmId = realmId;
}

@Override
public ReserveResult reserve(
String realmId,
String idempotencyKey,
String operationType,
String normalizedResourceId,
Instant expiresAt,
String executorId,
Instant now) {
try {
// Build insert values directly to avoid requiring an Immutables-generated model type.
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.RESOURCE_ID, normalizedResourceId);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, null);
insertMap.put(ModelIdempotencyRecord.ERROR_SUBTYPE, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, null);
insertMap.put(ModelIdempotencyRecord.RESPONSE_HEADERS, null);
insertMap.put(ModelIdempotencyRecord.FINALIZED_AT, null);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.HEARTBEAT_AT, Timestamp.from(now));
insertMap.put(ModelIdempotencyRecord.EXECUTOR_ID, executorId);
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));

List<Object> values = insertMap.values().stream().toList();
QueryGenerator.PreparedQuery insert =
QueryGenerator.generateInsertQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
values,
realmId);
datasourceOperations.executeUpdate(insert);
return new ReserveResult(ReserveResultType.OWNED, Optional.empty());
} catch (SQLException e) {
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
return new ReserveResult(ReserveResultType.DUPLICATE, load(realmId, idempotencyKey));
}
throw new IdempotencyPersistenceException("Failed to reserve idempotency key", e);
}
}

@Override
public Optional<IdempotencyRecord> load(String realmId, String idempotencyKey) {
public Optional<IdempotencyRecord> load(String idempotencyKey) {
try {
QueryGenerator.PreparedQuery query =
QueryGenerator.generateSelectQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey));
ModelIdempotencyRecord.REALM_ID, realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey));
List<IdempotencyRecord> results =
datasourceOperations.executeSelect(
query,
Expand All @@ -112,8 +78,7 @@ public IdempotencyRecord fromResultSet(ResultSet rs) throws SQLException {
}

@Override
public Map<String, Object> toMap(
org.apache.polaris.persistence.relational.jdbc.DatabaseType databaseType) {
public Map<String, Object> toMap(DatabaseType databaseType) {
throw new UnsupportedOperationException("Not used for SELECT conversion");
}
});
Expand All @@ -134,104 +99,60 @@ public Map<String, Object> toMap(
}

@Override
public HeartbeatResult updateHeartbeat(
String realmId, String idempotencyKey, String executorId, Instant now) {
Optional<IdempotencyRecord> existing = load(realmId, idempotencyKey);
if (existing.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}

IdempotencyRecord record = existing.get();
if (record.httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
if (record.executorId() == null || !record.executorId().equals(executorId)) {
return HeartbeatResult.LOST_OWNERSHIP;
}

QueryGenerator.PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
Map.of(
ModelIdempotencyRecord.HEARTBEAT_AT,
Timestamp.from(now),
ModelIdempotencyRecord.UPDATED_AT,
Timestamp.from(now)),
Map.of(
ModelIdempotencyRecord.REALM_ID,
realmId,
ModelIdempotencyRecord.IDEMPOTENCY_KEY,
idempotencyKey,
ModelIdempotencyRecord.EXECUTOR_ID,
executorId),
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());

try {
int updated = datasourceOperations.executeUpdate(update);
if (updated > 0) {
return HeartbeatResult.UPDATED;
}
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to update idempotency heartbeat", e);
}

// Raced with finalize/ownership loss; re-check to return a meaningful result.
Optional<IdempotencyRecord> after = load(realmId, idempotencyKey);
if (after.isEmpty()) {
return HeartbeatResult.NOT_FOUND;
}
if (after.get().httpStatus() != null) {
return HeartbeatResult.FINALIZED;
}
return HeartbeatResult.LOST_OWNERSHIP;
}

@Override
public boolean finalizeRecord(
String realmId,
public RecordResult recordIfAbsent(
String idempotencyKey,
Integer httpStatus,
String errorSubtype,
String responseSummary,
String responseHeaders,
Instant finalizedAt) {
// Use ordered/set maps so we can include nullable values (Map.of disallows nulls).
Map<String, Object> setClause = new LinkedHashMap<>();
setClause.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
setClause.put(ModelIdempotencyRecord.ERROR_SUBTYPE, errorSubtype);
setClause.put(ModelIdempotencyRecord.RESPONSE_SUMMARY, responseSummary);
setClause.put(ModelIdempotencyRecord.RESPONSE_HEADERS, responseHeaders);
setClause.put(ModelIdempotencyRecord.FINALIZED_AT, Timestamp.from(finalizedAt));
setClause.put(ModelIdempotencyRecord.UPDATED_AT, Timestamp.from(finalizedAt));

Map<String, Object> whereEquals = new HashMap<>();
whereEquals.put(ModelIdempotencyRecord.REALM_ID, realmId);
whereEquals.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);

QueryGenerator.PreparedQuery update =
QueryGenerator.generateUpdateQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
setClause,
whereEquals,
Map.of(),
Map.of(),
Set.of(ModelIdempotencyRecord.HTTP_STATUS),
Set.of());

String operationType,
String resourceHash,
String principalHash,
int httpStatus,
String metadataLocation,
Instant createdAt,
Instant expiresAt) {
try {
return datasourceOperations.executeUpdate(update) > 0;
Map<String, Object> insertMap = new LinkedHashMap<>();
insertMap.put(ModelIdempotencyRecord.IDEMPOTENCY_KEY, idempotencyKey);
insertMap.put(ModelIdempotencyRecord.OPERATION_TYPE, operationType);
insertMap.put(ModelIdempotencyRecord.RESOURCE_HASH, resourceHash);
insertMap.put(ModelIdempotencyRecord.PRINCIPAL_HASH, principalHash);
insertMap.put(ModelIdempotencyRecord.HTTP_STATUS, httpStatus);
insertMap.put(ModelIdempotencyRecord.METADATA_LOCATION, metadataLocation);
insertMap.put(ModelIdempotencyRecord.CREATED_AT, Timestamp.from(createdAt));
insertMap.put(ModelIdempotencyRecord.EXPIRES_AT, Timestamp.from(expiresAt));

List<Object> values = insertMap.values().stream().toList();
QueryGenerator.PreparedQuery insert =
QueryGenerator.generateInsertQuery(
ModelIdempotencyRecord.ALL_COLUMNS,
ModelIdempotencyRecord.TABLE_NAME,
values,
realmId);
datasourceOperations.executeUpdate(insert);
return new RecordResult(RecordResultType.OWNED, Optional.empty());
} catch (SQLException e) {
throw new IdempotencyPersistenceException("Failed to finalize idempotency record", e);
if (datasourceOperations.isUniquenessConstraintViolation(e)) {
Optional<IdempotencyRecord> existing = load(idempotencyKey);
if (existing.isEmpty()) {
// The insert lost the race on the (realm_id, idempotency_key) constraint, yet the winning
// row is no longer visible (e.g. purged or rolled back between the conflict and this
// reload). Surface a persistence error rather than a DUPLICATE without a record, which
// the
// handler layer treats as an invariant violation.
throw new IdempotencyPersistenceException(
"Insert for realm/key "
+ realmId
+ "/"
+ idempotencyKey
+ " conflicted on the unique constraint but the existing record could not be"
+ " reloaded");
}
return new RecordResult(RecordResultType.DUPLICATE, existing);
}
throw new IdempotencyPersistenceException("Failed to record idempotency entry", e);
}
Comment on lines 132 to 152
}

@Override
public int purgeExpired(String realmId, Instant before) {
public int purgeExpired(Instant before) {
try {
QueryGenerator.PreparedQuery delete =
QueryGenerator.generateDeleteQuery(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polaris.persistence.relational.jdbc.idempotency;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.IdempotencyStore;
import org.apache.polaris.core.persistence.IdempotencyStoreFactory;
import org.apache.polaris.persistence.relational.jdbc.DatasourceOperations;

/**
* {@link IdempotencyStoreFactory} backed by the same JDBC {@link DatasourceOperations} used by the
* primary metastore.
*
* <p>Each call vends a lightweight {@link RelationalJdbcIdempotencyStore} bound to the requested
* realm (mirroring {@code JdbcBasePersistenceImpl}); realm scoping is enforced inside SQL via the
* {@code realm_id} column.
*/
@ApplicationScoped
@Identifier("relational-jdbc")
public class RelationalJdbcIdempotencyStoreFactory implements IdempotencyStoreFactory {

private final DatasourceOperations datasourceOperations;

@Inject
public RelationalJdbcIdempotencyStoreFactory(DatasourceOperations datasourceOperations) {
this.datasourceOperations = datasourceOperations;
}

@Override
public IdempotencyStore getOrCreateIdempotencyStore(RealmContext realmContext) {
return new RelationalJdbcIdempotencyStore(
datasourceOperations, realmContext.getRealmIdentifier());
}
}
Loading