diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 0bd67da17d..1273c12172 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -37,6 +37,7 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.FsPathAndFileName; import org.apache.fluss.fs.token.ObtainedSecurityToken; +import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionInfo; @@ -653,6 +654,15 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) { if (addColumn.getComment() != null) { pbAddColumn.setComment(addColumn.getComment()); } + if (addColumn.getAggFunction().isPresent()) { + AggFunction aggFunction = addColumn.getAggFunction().get(); + pbAddColumn.setAggFunctionType(aggFunction.getType().toString()); + aggFunction + .getParameters() + .forEach( + (key, value) -> + pbAddColumn.addAggFunctionParam().setKey(key).setValue(value)); + } return pbAddColumn; } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 65cca8195a..a12a43cde4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -568,6 +568,62 @@ void testAlterTableColumn() throws Exception { .hasMessageContaining("Column nested_row already exists"); } + @Test + void testAlterAggregationTableColumnWithAggFunction() throws Exception { + TablePath tablePath = TablePath.of("test_db", "alter_aggregation_table_column"); + Map properties = new HashMap<>(); + properties.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("value", DataTypes.BIGINT(), AggFunctions.SUM()) + .primaryKey("id") + .build()) + .distributedBy(3, "id") + .properties(properties) + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "new_value", + DataTypes.BIGINT(), + "new aggregate column", + TableChange.ColumnPosition.last(), + AggFunctions.SUM())), + false) + .get(); + + SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); + assertThat(schemaInfo.getSchema().getAggFunction("new_value")).hasValue(AggFunctions.SUM()); + } + + @Test + void testAlterNonAggregationTableColumnWithAggFunction() throws Exception { + TablePath tablePath = TablePath.of("test_db", "alter_non_aggregation_table_column"); + admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "new_value", + DataTypes.BIGINT(), + "new aggregate column", + TableChange.ColumnPosition.last(), + AggFunctions.SUM())), + false) + .get()) + .hasMessageContaining( + "Aggregation function is only supported for aggregation merge engine table"); + } + @Test void testCreateInvalidDatabaseAndTable() throws Exception { assertThatThrownBy( diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java index bb027d6905..0edb5e31bc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import java.util.Objects; +import java.util.Optional; /** {@link TableChange} represents the modification of the Fluss Table. */ public interface TableChange { @@ -42,7 +43,21 @@ static AddColumn addColumn( DataType dataType, @Nullable String comment, ColumnPosition position) { - return new AddColumn(columnName, dataType, comment, position); + return new AddColumn(columnName, dataType, comment, position, null); + } + + /** + * A table change to add the column with specified position and aggregation function. + * + * @return a TableChange represents the modification. + */ + static AddColumn addColumn( + String columnName, + DataType dataType, + @Nullable String comment, + ColumnPosition position, + @Nullable AggFunction aggFunction) { + return new AddColumn(columnName, dataType, comment, position, aggFunction); } /** @@ -230,15 +245,21 @@ class AddColumn implements SchemaChange { private final String name; private final DataType dataType; private final @Nullable String comment; + private final @Nullable AggFunction aggFunction; private final ColumnPosition position; private AddColumn( - String name, DataType dataType, @Nullable String comment, ColumnPosition position) { + String name, + DataType dataType, + @Nullable String comment, + ColumnPosition position, + @Nullable AggFunction aggFunction) { this.name = name; this.dataType = dataType; this.comment = comment; this.position = position; + this.aggFunction = aggFunction; } public String getName() { @@ -258,6 +279,10 @@ public ColumnPosition getPosition() { return position; } + public Optional getAggFunction() { + return Optional.ofNullable(aggFunction); + } + @Override public String toString() { return "AddColumn{" @@ -269,6 +294,8 @@ public String toString() { + ", comment='" + comment + '\'' + + ", aggFunction=" + + aggFunction + ", position=" + position + '}'; diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..356913ccc1 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -1154,6 +1154,8 @@ message PbAddColumn { required bytes data_type_json = 2; optional string comment = 3; required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 + optional string agg_function_type = 5; + repeated PbKeyValue agg_function_params = 6; } message PbDropColumn { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 106d4ef66f..65b2ee24bd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -76,6 +76,7 @@ import java.util.concurrent.Callable; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableSchema; /** A manager for metadata. */ public class MetadataManager { @@ -440,6 +441,7 @@ public void alterTableSchema( if (!schemaChanges.isEmpty()) { Schema newSchema = SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges); + validateAlterTableSchema(table, newSchema); LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( false, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index d1ad0c16c2..e85219a3b3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -82,7 +82,12 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { } // Delegate the actual addition to the builder - builder.column(addColumn.getName(), addColumn.getDataType()); + if (addColumn.getAggFunction().isPresent()) { + builder.column( + addColumn.getName(), addColumn.getDataType(), addColumn.getAggFunction().get()); + } else { + builder.column(addColumn.getName(), addColumn.getDataType()); + } // Fixed: Use null check for the String comment String comment = addColumn.getComment(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..4dfd2a62d1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -27,9 +27,13 @@ import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.config.cluster.ColumnPositionType; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.fluss.lake.committer.LakeCommitResult; +import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.AggFunctionType; +import org.apache.fluss.metadata.AggFunctions; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionSpec; @@ -355,17 +359,40 @@ public static List toAddColumns(List addColumns) { return addColumns.stream() .filter(Objects::nonNull) .map( - pbAddColumn -> - TableChange.addColumn( - pbAddColumn.getColumnName(), - JsonSerdeUtils.readValue( - pbAddColumn.getDataTypeJson(), - DataTypeJsonSerde.INSTANCE), - pbAddColumn.hasComment() ? pbAddColumn.getComment() : null, - toColumnPosition(pbAddColumn.getColumnPositionType()))) + pbAddColumn -> { + AggFunction aggFunction = toAggFunction(pbAddColumn); + return TableChange.addColumn( + pbAddColumn.getColumnName(), + JsonSerdeUtils.readValue( + pbAddColumn.getDataTypeJson(), + DataTypeJsonSerde.INSTANCE), + pbAddColumn.hasComment() ? pbAddColumn.getComment() : null, + toColumnPosition(pbAddColumn.getColumnPositionType()), + aggFunction); + }) .collect(Collectors.toList()); } + private static AggFunction toAggFunction(PbAddColumn pbAddColumn) { + if (!pbAddColumn.hasAggFunctionType()) { + return null; + } + + AggFunctionType type = AggFunctionType.fromString(pbAddColumn.getAggFunctionType()); + if (type == null) { + throw new InvalidConfigException( + String.format( + "Unknown aggregation function type: %s", + pbAddColumn.getAggFunctionType())); + } + + Map parameters = new HashMap<>(); + for (PbKeyValue parameter : pbAddColumn.getAggFunctionParamsList()) { + parameters.put(parameter.getKey(), parameter.getValue()); + } + return AggFunctions.of(type, parameters); + } + public static List toDropColumns(List dropColumns) { return dropColumns.stream() .filter(Objects::nonNull) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 949afd8c67..6b60a94099 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.utils; +import org.apache.fluss.annotation.Internal; import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -129,6 +130,19 @@ public static void validateTableDescriptor( checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); } + /** Validates the schema after altering table columns. */ + @Internal + public static void validateAlterTableSchema(TableInfo table, Schema newSchema) { + if (table.getTableConfig() + .getMergeEngineType() + .map(MergeEngineType.AGGREGATION::equals) + .orElse(false)) { + validateAggregationFunctionParameters(newSchema); + } else { + validateNoAggregationFunctions(newSchema); + } + } + private static void checkTableLakeFormatMatchesCluster( Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { if (clusterDataLakeFormat == null) { @@ -323,6 +337,9 @@ private static void checkArrowCompression(Configuration tableConf) { private static void checkMergeEngine( Configuration tableConf, boolean hasPrimaryKey, Schema schema) { MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (mergeEngine != MergeEngineType.AGGREGATION) { + validateNoAggregationFunctions(schema); + } if (mergeEngine != null) { if (!hasPrimaryKey) { throw new InvalidConfigException( @@ -377,6 +394,20 @@ private static void checkMergeEngine( } } + /** Validates that the schema doesn't contain any aggregation functions. */ + private static void validateNoAggregationFunctions(Schema schema) { + for (Schema.Column column : schema.getColumns()) { + Optional aggFunction = column.getAggFunction(); + if (aggFunction.isPresent()) { + throw new InvalidConfigException( + String.format( + "Aggregation function is only supported for aggregation merge engine table, " + + "but column '%s' has aggregation function '%s'.", + column.getName(), aggFunction.get())); + } + } + } + /** * Validates aggregation function parameters in the schema. *