[#11753] feat(catalog-iceberg): support primary key via identifier fields#11754
Open
lzshlzsh wants to merge 1 commit into
Open
[#11753] feat(catalog-iceberg): support primary key via identifier fields#11754lzshlzsh wants to merge 1 commit into
lzshlzsh wants to merge 1 commit into
Conversation
…ier fields Map a Gravitino PRIMARY_KEY index to Iceberg identifier-field-ids on create, and reconstruct the PRIMARY_KEY index from identifier-field-ids on load, forming a create/load round-trip closure. - ConvertUtil: applyIdentifierFields + primaryKeyColumnNames (forward), constructIndexesFromIdentifierFields (reverse, ordered by schema column position for determinism) - IcebergTable: ICEBERG_PRIMARY_KEY_INDEX_NAME constant, store indexes in internalBuild, back-fill indexes in fromIcebergTable - IcebergCatalogOperations: replace the hard "does not support indexes" rejection with validateIcebergIndexes (single non-nested PRIMARY_KEY), pass indexes through to the table builder Tests: forward/reverse/round-trip and validation rejection cases.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR wires Gravitino PRIMARY_KEY indexes to Iceberg V2 schema-level identifier-field-ids (and back) in the catalog-lakehouse-iceberg module, enabling create/load round-trips of primary keys for Iceberg tables.
Changes:
- Map a single Gravitino
PRIMARY_KEYindex to IcebergSchemaidentifier field IDs during schema conversion, and reconstruct a syntheticPRIMARY_KEYindex when loading an Iceberg table. - Relax Iceberg create-time index rejection by validating/allowing a single top-level
PRIMARY_KEYindex and passing indexes through to the Iceberg table builder. - Add unit tests for forward/reverse mapping and basic index validation rejections.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java | Applies identifier-field-ids from PK indexes and reconstructs PK indexes from Iceberg schema identifier fields. |
| catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java | Replaces blanket “indexes not supported” check with targeted PK-only validation and passes indexes into table builder. |
| catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java | Persists indexes on built tables and reconstructs PK indexes when loading from Iceberg metadata. |
| catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java | Adds unit tests for PK ↔ identifier-field-ids mapping and NOT NULL constraint behavior. |
| catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalogOperations.java | Adds unit tests for create-time index validation failures. |
Comment on lines
+620
to
+624
| Preconditions.checkArgument( | ||
| indexes.length == 1, "Iceberg only supports no more than one PRIMARY_KEY Index."); | ||
| Index index = indexes[0]; | ||
| Preconditions.checkArgument( | ||
| index.type() == Index.IndexType.PRIMARY_KEY, "Iceberg only supports primary key Index."); |
Comment on lines
+629
to
+634
| Arrays.stream(fieldNames) | ||
| .forEach( | ||
| fieldName -> | ||
| Preconditions.checkArgument( | ||
| fieldName != null && fieldName.length == 1, | ||
| "The primary key columns should not be nested.")); |
Comment on lines
+110
to
+115
| String[][] fieldNames = | ||
| schema.columns().stream() | ||
| .filter(field -> identifierFieldIds.contains(field.fieldId())) | ||
| .map(field -> new String[] {field.name()}) | ||
| .toArray(String[][]::new); | ||
| return new Index[] {Indexes.primary(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, fieldNames)}; |
Comment on lines
+135
to
+138
| Assertions.assertEquals( | ||
| com.google.common.collect.ImmutableSet.of( | ||
| schema.findField("id").fieldId(), schema.findField("region").fieldId()), | ||
| schema.identifierFieldIds()); |
Comment on lines
+75
to
+77
| /** The name of the synthetic primary key index reconstructed from Iceberg identifier fields. */ | ||
| @VisibleForTesting | ||
| public static final String ICEBERG_PRIMARY_KEY_INDEX_NAME = "ICEBERG_PRIMARY_KEY_INDEX"; |
Comment on lines
+127
to
+135
| @Test | ||
| public void testCreateTableRejectsNestedPrimaryKeyColumn() { | ||
| Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] {{"struct", "field"}})}; | ||
| IllegalArgumentException exception = | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, () -> createTableWithIndexes(indexes)); | ||
| Assertions.assertTrue(exception.getMessage().contains("should not be nested")); | ||
| } | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Wire the Gravitino
PRIMARY_KEYindex through to / from Iceberg'sschema-level
identifier-field-ids, forming a create/load round-trip closurefor primary keys on Iceberg V2 tables.
ConvertUtilapplyIdentifierFields+primaryKeyColumnNames(forward): afterbuilding the Iceberg
Schema, setidentifier-field-idsfrom the fieldids of the columns referenced by the single
PRIMARY_KEYindex. Thismirrors
FlinkSchemaUtil.freshIdentifierFieldIdsin apache/iceberg.Iceberg requires identifier fields to be
required(NOT NULL); we relyon Iceberg's own
Schemaconstructor to enforce that constraint andintentionally do not silently promote nullability.
constructIndexesFromIdentifierFields(reverse): reconstruct aPRIMARY_KEYindex fromSchema#identifierFieldIds(), ordered by thecolumn position in the schema for deterministic results (Iceberg
identifier fields are an unordered set).
IcebergTableICEBERG_PRIMARY_KEY_INDEX_NAME = "ICEBERG_PRIMARY_KEY_INDEX"(analogous to Paimon's
PAIMON_PRIMARY_KEY_INDEX).internalBuildnow storesindexeson the built table;fromIcebergTableback-fills the
PRIMARY_KEYindex from the loaded Iceberg schema.IcebergCatalogOperationsvalidateIcebergIndexes: allow at most onePRIMARY_KEYindex over oneor more non-nested columns; reject multiple indexes, non-
PRIMARY_KEYtypes, empty column lists, and nested column references with clear
messages.
Out of scope: primary-key evolution via
TableChange(Iceberg'sUpdateSchema#setIdentifierFieldsexists, but PK evolution interacts withNOT NULL promotion and equality deletes and warrants a separate discussion);
no changes to the
iceberg-rest-servermodule.Why are the changes needed?
Iceberg V2 natively supports a primary key concept via
identifier-field-ids,which Flink CDC and the Iceberg Flink connector use for upsert /
equality-delete writes. Gravitino's Iceberg catalog currently rejects every
Indexat create time, so users cannot:engine routed via Gravitino.
underlying Iceberg table already carries
identifier-field-ids,IcebergTable#fromIcebergTablepreviously dropped them andTable#index()always returned empty.
The Paimon catalog already handles the analogous mapping in both directions
(
PaimonTable#constructIndexesFromPrimaryKeys,GravitinoToPaimonTableConverter); this PR aligns the Iceberg catalog withthat behavior.
Fix: #11753
Does this PR introduce any user-facing change?
Yes, in the
catalog-lakehouse-icebergmodule only:PRIMARY_KEYindex is nowaccepted and produces an Iceberg V2 table with
identifier-field-idsset on its schema (previously rejected with
"Iceberg does not support indexes").
identifier-field-idsnowsurfaces a
PRIMARY_KEYindex namedICEBERG_PRIMARY_KEY_INDEXviaTable#index()(previously always empty for Iceberg).PRIMARY_KEYtype,empty column list, nested column reference) are still rejected, now
with focused error messages.
No new property keys, no public API signature changes.
How was this patch tested?
New unit tests cover both directions of the mapping and the validation
rules (run via
./gradlew :catalogs:catalog-lakehouse-iceberg:test):TestConvertUtilPRIMARY_KEYindex → Icebergidentifier-field-ids(single &composite columns).
identifier-field-ids→PRIMARY_KEYindex, asserting thereconstructed columns are ordered by schema position.
Indexes.EMPTY_INDEXES.relying on the
Schemaconstructor's own validation.TestIcebergCatalogOperations(validation):IllegalArgumentException.PRIMARY_KEYindex type ⇒IllegalArgumentException.IllegalArgumentException.a.b) ⇒IllegalArgumentException.No existing tests required changes; the previous "indexes not supported"
behavior had no positive coverage to remove.