Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
Preconditions.checkArgument(indexes.length == 0, "Iceberg-catalog does not support indexes");
validateIcebergIndexes(indexes);
try {
NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels());
if (!schemaExists(schemaIdent)) {
Expand Down Expand Up @@ -583,6 +583,7 @@ public Table createTable(
.withSortOrders(sortOrders)
.withProperties(properties)
.withDistribution(distribution)
.withIndexes(indexes)
.withAuditInfo(
AuditInfo.builder()
.withCreator(currentUser())
Expand All @@ -603,6 +604,36 @@ public Table createTable(
}
}

/**
* Validates the indexes supplied when creating an Iceberg table. Iceberg only supports expressing
* a primary key, which Gravitino models as a single {@code PRIMARY_KEY} index over one or more
* non-nested columns. The primary key columns are mapped to Iceberg {@code identifier-field-ids};
* Iceberg requires identifier fields to be required (NOT NULL), which is enforced by the Iceberg
* schema constructor.
*
* @param indexes The indexes to validate. An empty array is allowed (no primary key).
*/
private static void validateIcebergIndexes(Index[] indexes) {
if (indexes == null || indexes.length == 0) {
return;
}
Preconditions.checkArgument(
indexes.length == 1, "Iceberg only supports no more than one PRIMARY_KEY Index.");
Index index = indexes[0];
Preconditions.checkArgument(
index.type() == Index.IndexType.PRIMARY_KEY, "Iceberg only supports primary key Index.");
Comment on lines +620 to +624
String[][] fieldNames = index.fieldNames();
Preconditions.checkArgument(
fieldNames != null && fieldNames.length > 0,
"The primary key Index must contain at least one column.");
Arrays.stream(fieldNames)
.forEach(
fieldName ->
Preconditions.checkArgument(
fieldName != null && fieldName.length == 1,
"The primary key columns should not be nested."));
Comment on lines +629 to +634
}

/**
* Purges a table from the Iceberg.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public class IcebergTable extends BaseTable {

public static final String ICEBERG_COMMENT_FIELD_NAME = "comment";

/** The name of the synthetic primary key index reconstructed from Iceberg identifier fields. */
@VisibleForTesting
public static final String ICEBERG_PRIMARY_KEY_INDEX_NAME = "ICEBERG_PRIMARY_KEY_INDEX";
Comment on lines +75 to +77

private String location;

private IcebergTable() {}
Expand Down Expand Up @@ -164,6 +168,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
.withDistribution(getDistribution(properties))
.withIndexes(ConvertUtil.constructIndexesFromIdentifierFields(schema))
.build();
}

Expand Down Expand Up @@ -207,6 +212,7 @@ protected IcebergTable internalBuild() {
icebergTable.partitioning = partitioning;
icebergTable.distribution = distribution;
icebergTable.sortOrders = sortOrders;
icebergTable.indexes = indexes;
if (null != comment) {
icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@

import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergColumn;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTable;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand All @@ -40,7 +47,72 @@ public static Schema toIcebergSchema(IcebergTable gravitinoTable) {
toGravitinoStructType(gravitinoTable);
Type converted =
ToIcebergTypeVisitor.visit(gravitinoStructType, new ToIcebergType(gravitinoStructType));
return new Schema(converted.asNestedType().asStructType().fields());
Schema schema = new Schema(converted.asNestedType().asStructType().fields());
return applyIdentifierFields(schema, primaryKeyColumnNames(gravitinoTable));
}

/**
* Sets the Iceberg {@code identifier-field-ids} for the primary key columns, mirroring the native
* Flink Iceberg connector's {@code FlinkSchemaUtil.freshIdentifierFieldIds}. Iceberg requires
* identifier fields to be required (NOT NULL) columns; this constraint is enforced by the Iceberg
* {@link Schema} constructor itself, so no nullability promotion is performed here.
*
* @param schema The Iceberg schema before identifier fields are applied.
* @param primaryKeyColumns The primary key column names (empty when there is no primary key).
* @return The Iceberg schema carrying the identifier field ids.
*/
private static Schema applyIdentifierFields(Schema schema, List<String> primaryKeyColumns) {
if (primaryKeyColumns.isEmpty()) {
return schema;
}
Set<Integer> identifierFieldIds = new LinkedHashSet<>();
for (String columnName : primaryKeyColumns) {
Types.NestedField field = schema.findField(columnName);
Preconditions.checkArgument(
field != null, "Cannot find primary key column in table schema: %s", columnName);
identifierFieldIds.add(field.fieldId());
}
return new Schema(schema.schemaId(), schema.asStruct().fields(), identifierFieldIds);
}

/**
* Extracts the primary key column names from the table's {@code PRIMARY_KEY} index. Returns an
* empty list when no index is present.
*
* @param gravitinoTable Gravitino table of Iceberg.
* @return The ordered primary key column names.
*/
private static List<String> primaryKeyColumnNames(IcebergTable gravitinoTable) {
Index[] indexes = gravitinoTable.index();
if (indexes == null || indexes.length == 0) {
return Collections.emptyList();
}
return Arrays.stream(indexes[0].fieldNames())
.map(fieldName -> fieldName[0])
.collect(Collectors.toList());
}

/**
* Reconstructs the Gravitino {@code PRIMARY_KEY} index from the Iceberg {@code
* identifier-field-ids}, mirroring Paimon's {@code constructIndexesFromPrimaryKeys}. The primary
* key columns are ordered by their position in the schema for deterministic results, since
* Iceberg identifier fields are an unordered set.
*
* @param schema The Iceberg schema loaded from the catalog.
* @return A single-element {@code PRIMARY_KEY} index array, or an empty array when the schema has
* no identifier fields.
*/
public static Index[] constructIndexesFromIdentifierFields(Schema schema) {
Set<Integer> identifierFieldIds = schema.identifierFieldIds();
if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
return Indexes.EMPTY_INDEXES;
}
String[][] fieldNames =
schema.columns().stream()
.filter(field -> identifierFieldIds.contains(field.fieldId()))
.map(field -> new String[] {field.name()})
.toArray(String[][]::new);
return new Index[] {Indexes.primary(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, fieldNames)};
Comment on lines +110 to +115
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -95,4 +101,49 @@ public void testListSchemasFlatOnlyReturnsUnchangedNames() {
Assertions.assertTrue(Arrays.stream(result).anyMatch(id -> "db1".equals(id.name())));
Assertions.assertTrue(Arrays.stream(result).anyMatch(id -> "db2".equals(id.name())));
}

@Test
public void testCreateTableRejectsMultiplePrimaryKeyIndexes() {
Index[] indexes =
new Index[] {
Indexes.primary("pk1", new String[][] {{"id"}}),
Indexes.primary("pk2", new String[][] {{"name"}})
};
IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class, () -> createTableWithIndexes(indexes));
Assertions.assertTrue(exception.getMessage().contains("no more than one PRIMARY_KEY Index"));
}

@Test
public void testCreateTableRejectsNonPrimaryKeyIndex() {
Index[] indexes = new Index[] {Indexes.unique("uk", new String[][] {{"id"}})};
IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class, () -> createTableWithIndexes(indexes));
Assertions.assertTrue(exception.getMessage().contains("only supports primary key Index"));
}

@Test
public void testCreateTableRejectsNestedPrimaryKeyColumn() {
Index[] indexes = new Index[] {Indexes.primary("pk", new String[][] {{"struct", "field"}})};
IllegalArgumentException exception =
Assertions.assertThrows(
IllegalArgumentException.class, () -> createTableWithIndexes(indexes));
Assertions.assertTrue(exception.getMessage().contains("should not be nested"));
}

Comment on lines +127 to +135
private static void createTableWithIndexes(Index[] indexes) {
// validateIcebergIndexes runs before any catalog interaction, so empty schema args suffice.
new IcebergCatalogOperations()
.createTable(
NameIdentifier.of(METALAKE, CATALOG, "db", "table"),
new Column[0],
null,
ImmutableMap.of(),
new Transform[0],
Distributions.NONE,
new SortOrder[0],
indexes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Maps;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -33,6 +34,8 @@
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.gravitino.rel.types.Types.ByteType;
import org.apache.gravitino.rel.types.Types.ShortType;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -108,6 +111,108 @@ public void testToIcebergSchemaWithNullColumns() {
Assertions.assertEquals("columns must not be null", exception.getMessage());
}

@Test
public void testToIcebergSchemaWithSinglePrimaryKey() {
IcebergTable table =
primaryKeyTable(
new String[][] {{"id"}},
requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()),
nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get()));
Schema schema = ConvertUtil.toIcebergSchema(table);
Assertions.assertEquals(
Collections.singleton(schema.findField("id").fieldId()), schema.identifierFieldIds());
}

@Test
public void testToIcebergSchemaWithCompositePrimaryKey() {
IcebergTable table =
primaryKeyTable(
new String[][] {{"id"}, {"region"}},
requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()),
requiredColumn("region", org.apache.gravitino.rel.types.Types.StringType.get()),
nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get()));
Schema schema = ConvertUtil.toIcebergSchema(table);
Assertions.assertEquals(
com.google.common.collect.ImmutableSet.of(
schema.findField("id").fieldId(), schema.findField("region").fieldId()),
schema.identifierFieldIds());
Comment on lines +135 to +138
}

@Test
public void testToIcebergSchemaWithNullablePrimaryKeyThrows() {
IcebergTable table =
primaryKeyTable(
new String[][] {{"id"}},
nullableColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()));
// Iceberg's Schema constructor requires identifier fields to be NOT NULL.
Assertions.assertThrows(
IllegalArgumentException.class, () -> ConvertUtil.toIcebergSchema(table));
}

@Test
public void testConstructIndexesFromIdentifierFields() {
IcebergTable table =
primaryKeyTable(
new String[][] {{"id"}, {"region"}},
requiredColumn("id", org.apache.gravitino.rel.types.Types.LongType.get()),
requiredColumn("region", org.apache.gravitino.rel.types.Types.StringType.get()),
nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get()));
Schema schema = ConvertUtil.toIcebergSchema(table);

Index[] indexes = ConvertUtil.constructIndexesFromIdentifierFields(schema);
Assertions.assertEquals(1, indexes.length);
Assertions.assertEquals(Index.IndexType.PRIMARY_KEY, indexes[0].type());
Assertions.assertEquals(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, indexes[0].name());
// Reconstructed primary key columns follow the schema column order, not the index input order.
Assertions.assertArrayEquals(new String[][] {{"id"}, {"region"}}, indexes[0].fieldNames());
}

@Test
public void testConstructIndexesFromIdentifierFieldsWhenAbsent() {
Schema schema =
ConvertUtil.toIcebergSchema(
new Column[] {
nullableColumn("name", org.apache.gravitino.rel.types.Types.StringType.get())
});
Assertions.assertEquals(0, ConvertUtil.constructIndexesFromIdentifierFields(schema).length);
}

private static IcebergColumn requiredColumn(
String name, org.apache.gravitino.rel.types.Type type) {
return IcebergColumn.builder()
.withName(name)
.withType(type)
.withNullable(false)
.withComment(TEST_COMMENT)
.build();
}

private static IcebergColumn nullableColumn(
String name, org.apache.gravitino.rel.types.Type type) {
return IcebergColumn.builder()
.withName(name)
.withType(type)
.withNullable(true)
.withComment(TEST_COMMENT)
.build();
}

private static IcebergTable primaryKeyTable(String[][] primaryKeyFieldNames, Column... columns) {
Index[] indexes =
new Index[] {
Indexes.primary(IcebergTable.ICEBERG_PRIMARY_KEY_INDEX_NAME, primaryKeyFieldNames)
};
return IcebergTable.builder()
.withName(TEST_NAME)
.withAuditInfo(
AuditInfo.builder().withCreator(TEST_NAME).withCreateTime(Instant.now()).build())
.withProperties(Maps.newHashMap())
.withColumns(columns)
.withIndexes(indexes)
.withComment(TEST_COMMENT)
.build();
}

@Test
public void testToPrimitiveType() {
ByteType byteType = ByteType.get();
Expand Down