diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 0e7f8d48bd4..3bf55caaa1a 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -548,7 +548,7 @@ public Table createTable( SortOrder[] sortOrders, Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { - Preconditions.checkArgument(indexes.length == 0, "Iceberg-catalog does not support indexes"); + validateIcebergIndexes(indexes); try { NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); if (!schemaExists(schemaIdent)) { @@ -583,6 +583,7 @@ public Table createTable( .withSortOrders(sortOrders) .withProperties(properties) .withDistribution(distribution) + .withIndexes(indexes) .withAuditInfo( AuditInfo.builder() .withCreator(currentUser()) @@ -603,6 +604,36 @@ public Table createTable( } } + /** + * Validates the indexes supplied when creating an Iceberg table. Iceberg only supports expressing + * a primary key, which Gravitino models as a single {@code PRIMARY_KEY} index over one or more + * non-nested columns. The primary key columns are mapped to Iceberg {@code identifier-field-ids}; + * Iceberg requires identifier fields to be required (NOT NULL), which is enforced by the Iceberg + * schema constructor. + * + * @param indexes The indexes to validate. An empty array is allowed (no primary key). + */ + private static void validateIcebergIndexes(Index[] indexes) { + if (indexes == null || indexes.length == 0) { + return; + } + Preconditions.checkArgument( + indexes.length == 1, "Iceberg only supports no more than one PRIMARY_KEY Index."); + Index index = indexes[0]; + Preconditions.checkArgument( + index.type() == Index.IndexType.PRIMARY_KEY, "Iceberg only supports primary key Index."); + String[][] fieldNames = index.fieldNames(); + Preconditions.checkArgument( + fieldNames != null && fieldNames.length > 0, + "The primary key Index must contain at least one column."); + Arrays.stream(fieldNames) + .forEach( + fieldName -> + Preconditions.checkArgument( + fieldName != null && fieldName.length == 1, + "The primary key columns should not be nested.")); + } + /** * Purges a table from the Iceberg. * diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 3f2f54c1b3e..6f86c2b21d8 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -72,6 +72,10 @@ public class IcebergTable extends BaseTable { public static final String ICEBERG_COMMENT_FIELD_NAME = "comment"; + /** The name of the synthetic primary key index reconstructed from Iceberg identifier fields. */ + @VisibleForTesting + public static final String ICEBERG_PRIMARY_KEY_INDEX_NAME = "ICEBERG_PRIMARY_KEY_INDEX"; + private String location; private IcebergTable() {} @@ -164,6 +168,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam .withPartitioning(partitionSpec) .withSortOrders(sortOrder) .withDistribution(getDistribution(properties)) + .withIndexes(ConvertUtil.constructIndexesFromIdentifierFields(schema)) .build(); } @@ -207,6 +212,7 @@ protected IcebergTable internalBuild() { icebergTable.partitioning = partitioning; icebergTable.distribution = distribution; icebergTable.sortOrders = sortOrders; + icebergTable.indexes = indexes; if (null != comment) { icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java index 3443c5782eb..da438178f4d 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/ConvertUtil.java @@ -20,9 +20,16 @@ import com.google.common.base.Preconditions; import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergColumn; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTable; import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -40,7 +47,72 @@ public static Schema toIcebergSchema(IcebergTable gravitinoTable) { toGravitinoStructType(gravitinoTable); Type converted = ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType)); - return new Schema(converted.asNestedType().asStructType().fields()); + Schema schema = new Schema(converted.asNestedType().asStructType().fields()); + return applyIdentifierFields(schema, primaryKeyColumnNames(gravitinoTable)); + } + + /** + * Sets the Iceberg {@code identifier-field-ids} for the primary key columns, mirroring the native + * Flink Iceberg connector's {@code FlinkSchemaUtil.freshIdentifierFieldIds}. Iceberg requires + * identifier fields to be required (NOT NULL) columns; this constraint is enforced by the Iceberg + * {@link Schema} constructor itself, so no nullability promotion is performed here. + * + * @param schema The Iceberg schema before identifier fields are applied. + * @param primaryKeyColumns The primary key column names (empty when there is no primary key). + * @return The Iceberg schema carrying the identifier field ids. + */ + private static Schema applyIdentifierFields(Schema schema, List primaryKeyColumns) { + if (primaryKeyColumns.isEmpty()) { + return schema; + } + Set identifierFieldIds = new LinkedHashSet<>(); + for (String columnName : primaryKeyColumns) { + Types.NestedField field = schema.findField(columnName); + Preconditions.checkArgument( + field != null, "Cannot find primary key column in table schema: %s", columnName); + identifierFieldIds.add(field.fieldId()); + } + return new Schema(schema.schemaId(), schema.asStruct().fields(), identifierFieldIds); + } + + /** + * Extracts the primary key column names from the table's {@code PRIMARY_KEY} index. Returns an + * empty list when no index is present. + * + * @param gravitinoTable Gravitino table of Iceberg. + * @return The ordered primary key column names. + */ + private static List primaryKeyColumnNames(IcebergTable gravitinoTable) { + Index[] indexes = gravitinoTable.index(); + if (indexes == null || indexes.length == 0) { + return Collections.emptyList(); + } + return Arrays.stream(indexes[0].fieldNames()) + .map(fieldName -> fieldName[0]) + .collect(Collectors.toList()); + } + + /** + * Reconstructs the Gravitino {@code PRIMARY_KEY} index from the Iceberg {@code + * identifier-field-ids}, mirroring Paimon's {@code constructIndexesFromPrimaryKeys}. The primary + * key columns are ordered by their position in the schema for deterministic results, since + * Iceberg identifier fields are an unordered set. + * + * @param schema The Iceberg schema loaded from the catalog. + * @return A single-element {@code PRIMARY_KEY} index array, or an empty array when the schema has + * no identifier fields. + */ + public static Index[] constructIndexesFromIdentifierFields(Schema schema) { + Set identifierFieldIds = schema.identifierFieldIds(); + if (identifierFieldIds == null || identifierFieldIds.isEmpty()) { + return Indexes.EMPTY_INDEXES; + } + String[][] fieldNames = + schema.columns().stream() + .filter(field -> identifierFieldIds.contains(field.fieldId())) + .map(field -> new String[] {field.name()}) + .toArray(String[][]::new); + return new Index[] {Indexes.primary(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, fieldNames)}; } /** diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalogOperations.java index dfbd129a235..9e34078d63b 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergCatalogOperations.java @@ -29,6 +29,12 @@ import org.apache.gravitino.Namespace; import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; import org.apache.iceberg.rest.responses.ListNamespacesResponse; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -95,4 +101,49 @@ public void testListSchemasFlatOnlyReturnsUnchangedNames() { Assertions.assertTrue(Arrays.stream(result).anyMatch(id -> "db1".equals(id.name()))); Assertions.assertTrue(Arrays.stream(result).anyMatch(id -> "db2".equals(id.name()))); } + + @Test + public void testCreateTableRejectsMultiplePrimaryKeyIndexes() { + Index[] indexes = + new Index[] { + Indexes.primary("pk1", new String[][] {{"id"}}), + Indexes.primary("pk2", new String[][] {{"name"}}) + }; + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> createTableWithIndexes(indexes)); + Assertions.assertTrue(exception.getMessage().contains("no more than one PRIMARY_KEY Index")); + } + + @Test + public void testCreateTableRejectsNonPrimaryKeyIndex() { + Index[] indexes = new Index[] {Indexes.unique("uk", new String[][] {{"id"}})}; + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> createTableWithIndexes(indexes)); + Assertions.assertTrue(exception.getMessage().contains("only supports primary key Index")); + } + + @Test + public void testCreateTableRejectsNestedPrimaryKeyColumn() { + Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] {{"struct", "field"}})}; + IllegalArgumentException exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> createTableWithIndexes(indexes)); + Assertions.assertTrue(exception.getMessage().contains("should not be nested")); + } + + private static void createTableWithIndexes(Index[] indexes) { + // validateIcebergIndexes runs before any catalog interaction, so empty schema args suffice. + new IcebergCatalogOperations() + .createTable( + NameIdentifier.of(METALAKE, CATALOG, "db", "table"), + new Column[0], + null, + ImmutableMap.of(), + new Transform[0], + Distributions.NONE, + new SortOrder[0], + indexes); + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java index f9099666d44..17ac1b6a0a5 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/converter/TestConvertUtil.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -33,6 +34,8 @@ import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.indexes.Index; +import org.apache.gravitino.rel.indexes.Indexes; import org.apache.gravitino.rel.types.Types.ByteType; import org.apache.gravitino.rel.types.Types.ShortType; import org.apache.iceberg.Schema; @@ -108,6 +111,108 @@ public void testToIcebergSchemaWithNullColumns() { Assertions.assertEquals("columns must not be null", exception.getMessage()); } + @Test + public void testToIcebergSchemaWithSinglePrimaryKey() { + IcebergTable table = + primaryKeyTable( + new String[][] {{"id"}}, + requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()), + nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get())); + Schema schema = ConvertUtil.toIcebergSchema(table); + Assertions.assertEquals( + Collections.singleton(schema.findField("id").fieldId()), schema.identifierFieldIds()); + } + + @Test + public void testToIcebergSchemaWithCompositePrimaryKey() { + IcebergTable table = + primaryKeyTable( + new String[][] {{"id"}, {"region"}}, + requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()), + requiredColumn("region", org.apache.gravitino.rel.types.Types.StringType.get()), + nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get())); + Schema schema = ConvertUtil.toIcebergSchema(table); + Assertions.assertEquals( + com.google.common.collect.ImmutableSet.of( + schema.findField("id").fieldId(), schema.findField("region").fieldId()), + schema.identifierFieldIds()); + } + + @Test + public void testToIcebergSchemaWithNullablePrimaryKeyThrows() { + IcebergTable table = + primaryKeyTable( + new String[][] {{"id"}}, + nullableColumn("id", org.apache.gravitino.rel.types.Types.LongType.get())); + // Iceberg's Schema constructor requires identifier fields to be NOT NULL. + Assertions.assertThrows( + IllegalArgumentException.class, () -> ConvertUtil.toIcebergSchema(table)); + } + + @Test + public void testConstructIndexesFromIdentifierFields() { + IcebergTable table = + primaryKeyTable( + new String[][] {{"id"}, {"region"}}, + requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()), + requiredColumn("region", org.apache.gravitino.rel.types.Types.StringType.get()), + nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get())); + Schema schema = ConvertUtil.toIcebergSchema(table); + + Index[] indexes = ConvertUtil.constructIndexesFromIdentifierFields(schema); + Assertions.assertEquals(1, indexes.length); + Assertions.assertEquals(Index.IndexType.PRIMARY_KEY, indexes[0].type()); + Assertions.assertEquals(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, indexes[0].name()); + // Reconstructed primary key columns follow the schema column order, not the index input order. + Assertions.assertArrayEquals(new String[][] {{"id"}, {"region"}}, indexes[0].fieldNames()); + } + + @Test + public void testConstructIndexesFromIdentifierFieldsWhenAbsent() { + Schema schema = + ConvertUtil.toIcebergSchema( + new Column[] { + nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get()) + }); + Assertions.assertEquals(0, ConvertUtil.constructIndexesFromIdentifierFields(schema).length); + } + + private static IcebergColumn requiredColumn( + String name, org.apache.gravitino.rel.types.Type type) { + return IcebergColumn.builder() + .withName(name) + .withType(type) + .withNullable(false) + .withComment(TEST_COMMENT) + .build(); + } + + private static IcebergColumn nullableColumn( + String name, org.apache.gravitino.rel.types.Type type) { + return IcebergColumn.builder() + .withName(name) + .withType(type) + .withNullable(true) + .withComment(TEST_COMMENT) + .build(); + } + + private static IcebergTable primaryKeyTable(String[][] primaryKeyFieldNames, Column... columns) { + Index[] indexes = + new Index[] { + Indexes.primary(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, primaryKeyFieldNames) + }; + return IcebergTable.builder() + .withName(TEST_NAME) + .withAuditInfo( + AuditInfo.builder().withCreator(TEST_NAME).withCreateTime(Instant.now()).build()) + .withProperties(Maps.newHashMap()) + .withColumns(columns) + .withIndexes(indexes) + .withComment(TEST_COMMENT) + .build(); + } + @Test public void testToPrimitiveType() { ByteType byteType = ByteType.get();