diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index efc1c375b7..6fc88fae25 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -45,6 +45,7 @@ import org.apache.amoro.server.optimizing.OptimizingQueue; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; +import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig; import org.apache.amoro.server.persistence.StatedPersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizerMapper; import org.apache.amoro.server.persistence.mapper.ResourceMapper; @@ -181,6 +182,18 @@ private void loadOptimizingQueues(List tableRuntimeList) { optimizerGroups.forEach( group -> { String groupName = group.getName(); + // Fail-safe: a persisted group carrying an invalid DRA config (e.g. manual DB edits) + // must not crash AMS. Surface it and fall back to DRA-disabled behavior. + DynamicAllocationConfig.warnDeprecatedMinParallelism(group); + try { + DynamicAllocationConfig.parse(group).validate(); + } catch (IllegalArgumentException e) { + LOG.warn( + "Resource group:{} has an invalid dynamic-allocation config, " + + "falling back to DRA-disabled mode: {}", + groupName, + e.getMessage()); + } List tableRuntimes = groupToTableRuntimes.remove(groupName); OptimizingQueue optimizingQueue = new OptimizingQueue( @@ -840,19 +853,7 @@ public int compareTo(@NotNull Delayed o) { } public int getMinParallelism(ResourceGroup resourceGroup) { - if (!resourceGroup - .getProperties() - .containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) { - return 0; - } - String minParallelism = - resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); - try { - return Integer.parseInt(minParallelism); - } catch (Throwable t) { - LOG.warn("Illegal minParallelism : {}, will use default value 0", minParallelism, t); - return 0; - } + return DynamicAllocationConfig.resolveMinParallelism(resourceGroup); } public int tryKeeping(ResourceGroup resourceGroup) { @@ -921,6 +922,17 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) { if (keepingTask.getAttempts() > groupMaxKeepingAttempts) { int minParallelism = keepingTask.getMinParallelism(resourceGroup); + if (DynamicAllocationConfig.isEffectivelyEnabled(resourceGroup)) { + // Dynamic allocation owns scale decisions for the group; never erode its + // min-parallelism floor automatically. + LOG.warn( + "Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}; dynamic allocation is enabled so min-parallel is kept", + resourceGroup.getName(), + keepingTask.getAttempts(), + minParallelism); + keepInTouch(resourceGroup.getName(), 1); + return; + } LOG.warn( "Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}", resourceGroup.getName(), @@ -930,7 +942,7 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) { resourceGroup .getProperties() .put( - OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, + DynamicAllocationConfig.effectiveMinParallelismKey(resourceGroup), String.valueOf(minParallelism - requiredCores)); updateResourceGroup(resourceGroup); optimizerManager.updateResourceGroup(resourceGroup); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java index fac5b89676..2362018b68 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java @@ -31,6 +31,7 @@ import org.apache.amoro.server.dashboard.utils.PropertiesUtil; import org.apache.amoro.server.manager.AbstractOptimizerContainer; import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.OptimizerInstance; @@ -242,7 +243,9 @@ public void createResourceGroup(Context ctx) { validateGroupName(name); ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container); builder.addProperties(properties); - optimizerManager.createResourceGroup(builder.build()); + ResourceGroup resourceGroup = builder.build(); + validateDynamicAllocation(resourceGroup); + optimizerManager.createResourceGroup(resourceGroup); ctx.json(OkResponse.of("The optimizer group has been successfully created.")); } @@ -257,7 +260,9 @@ public void updateResourceGroup(Context ctx) { Map properties = PropertiesUtil.sanitizeProperties((Map) map.get("properties")); ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container); builder.addProperties(properties); - optimizerManager.updateResourceGroup(builder.build()); + ResourceGroup resourceGroup = builder.build(); + validateDynamicAllocation(resourceGroup); + optimizerManager.updateResourceGroup(resourceGroup); ctx.json(OkResponse.of("The optimizer group has been successfully updated.")); } @@ -283,6 +288,15 @@ public void getContainers(Context ctx) { .collect(Collectors.toList()))); } + private void validateDynamicAllocation(ResourceGroup resourceGroup) { + DynamicAllocationConfig.warnDeprecatedMinParallelism(resourceGroup); + try { + DynamicAllocationConfig.parse(resourceGroup).validate(); + } catch (IllegalArgumentException e) { + throw new BadRequestException(e.getMessage()); + } + } + private void validateGroupName(String groupName) { if (StringUtils.isEmpty(groupName)) { throw new BadRequestException( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/dra/DynamicAllocationConfig.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/dra/DynamicAllocationConfig.java new file mode 100644 index 0000000000..cd8fe88713 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/dra/DynamicAllocationConfig.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.dra; + +import org.apache.amoro.Constants; +import org.apache.amoro.OptimizerProperties; +import org.apache.amoro.config.ConfigHelpers; +import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.utils.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Map; + +/** + * Dynamic resource allocation (DRA) configuration of a resource group (AIP-5). Parsed from the + * group's properties; {@link #validate()} enforces the AIP-5 constraints. + */ +public class DynamicAllocationConfig { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicAllocationConfig.class); + + private final String groupName; + private final String container; + private final boolean enabled; + private final int minParallelism; + private final String minParallelismRaw; + private final Integer maxParallelism; + private final Duration schedulerBacklogTimeout; + private final Duration sustainedBacklogTimeout; + private final Duration executorIdleTimeout; + private final Duration scaleDownCooldown; + private final Duration drainTimeout; + + private DynamicAllocationConfig( + String groupName, + String container, + boolean enabled, + int minParallelism, + String minParallelismRaw, + Integer maxParallelism, + Duration schedulerBacklogTimeout, + Duration sustainedBacklogTimeout, + Duration executorIdleTimeout, + Duration scaleDownCooldown, + Duration drainTimeout) { + this.groupName = groupName; + this.container = container; + this.enabled = enabled; + this.minParallelism = minParallelism; + this.minParallelismRaw = minParallelismRaw; + this.maxParallelism = maxParallelism; + this.schedulerBacklogTimeout = schedulerBacklogTimeout; + this.sustainedBacklogTimeout = sustainedBacklogTimeout; + this.executorIdleTimeout = executorIdleTimeout; + this.scaleDownCooldown = scaleDownCooldown; + this.drainTimeout = drainTimeout; + } + + /** + * Parse the DRA configuration of a resource group. Malformed numeric or duration values throw + * {@link IllegalArgumentException}; semantic constraints are checked by {@link #validate()}. + */ + public static DynamicAllocationConfig parse(ResourceGroup group) { + Map properties = group.getProperties(); + boolean enabled = + PropertyUtil.propertyAsBoolean( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, + OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED_DEFAULT); + int minParallelism = resolveMinParallelism(group); + String minParallelismRaw = rawMinParallelism(group); + + Integer maxParallelism = + PropertyUtil.propertyAsNullableInt( + properties, OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM); + + return new DynamicAllocationConfig( + group.getName(), + group.getContainer(), + enabled, + minParallelism, + minParallelismRaw, + maxParallelism, + parseDuration( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT, + OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT_DEFAULT), + parseDuration( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_SUSTAINED_BACKLOG_TIMEOUT, + OptimizerProperties.DYNAMIC_ALLOCATION_SUSTAINED_BACKLOG_TIMEOUT_DEFAULT), + parseDuration( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, + OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT_DEFAULT), + parseDuration( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_SCALE_DOWN_COOLDOWN, + OptimizerProperties.DYNAMIC_ALLOCATION_SCALE_DOWN_COOLDOWN_DEFAULT), + parseDuration( + properties, + OptimizerProperties.DYNAMIC_ALLOCATION_DRAIN_TIMEOUT, + OptimizerProperties.DYNAMIC_ALLOCATION_DRAIN_TIMEOUT_DEFAULT)); + } + + /** + * Resolve the effective min-parallelism of a group, honoring the deprecated flat {@code + * min-parallelism} as a fallback. Resolution order: {@link + * OptimizerProperties#DYNAMIC_ALLOCATION_MIN_PARALLELISM} → {@link + * OptimizerProperties#OPTIMIZER_GROUP_MIN_PARALLELISM} → {@code 0}. Lenient: an unparsable value + * falls back to {@code 0} rather than throwing, preserving legacy behavior. This is on the keeper + * hot path and therefore stays silent; deprecation is reported by {@link + * #warnDeprecatedMinParallelism(ResourceGroup)} at config-entry points instead. + */ + public static int resolveMinParallelism(ResourceGroup group) { + Map properties = group.getProperties(); + String namespaced = properties.get(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM); + if (namespaced != null) { + return parseIntOrDefault(group.getName(), namespaced); + } + String legacy = properties.get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + if (legacy != null) { + return parseIntOrDefault(group.getName(), legacy); + } + return OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM_DEFAULT; + } + + /** + * The raw, unparsed min-parallelism string the group relies on, following the same resolution + * order as {@link #resolveMinParallelism(ResourceGroup)} ({@link + * OptimizerProperties#DYNAMIC_ALLOCATION_MIN_PARALLELISM} → {@link + * OptimizerProperties#OPTIMIZER_GROUP_MIN_PARALLELISM}), or {@code null} when neither is set. + * Retained so {@link #validate()} can distinguish "explicitly configured but unparsable" from + * "unset", which the lenient resolved {@code int} collapses to the same {@code 0}. + */ + private static String rawMinParallelism(ResourceGroup group) { + Map properties = group.getProperties(); + String namespaced = properties.get(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM); + if (namespaced != null) { + return namespaced; + } + return properties.get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + } + + /** + * Log a one-off deprecation warning when a group still relies on the flat {@code + * min-parallelism}. Intended for config-entry points (startup load, REST create/update), not the + * keeper hot path. + */ + public static void warnDeprecatedMinParallelism(ResourceGroup group) { + Map properties = group.getProperties(); + boolean hasNamespaced = + properties.containsKey(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM); + boolean hasLegacy = properties.containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + if (hasNamespaced && hasLegacy) { + LOG.warn( + "Resource group:{} sets both '{}' and the deprecated '{}'; the namespaced value wins.", + group.getName(), + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + } else if (hasLegacy) { + LOG.warn( + "Resource group:{} uses the deprecated '{}'; please migrate to '{}'.", + group.getName(), + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM); + } + } + + /** + * Whether dynamic allocation is effectively enabled for the group: opted in and the configuration + * is valid. An invalid configuration counts as disabled, mirroring the startup fail-safe + * behavior. + */ + public static boolean isEffectivelyEnabled(ResourceGroup group) { + try { + DynamicAllocationConfig config = parse(group); + config.validate(); + return config.isEnabled(); + } catch (IllegalArgumentException e) { + return false; + } + } + + /** + * The min-parallelism property key that {@link #resolveMinParallelism(ResourceGroup)} actually + * reads for this group. Writers updating the effective value (e.g. the keeper's auto-reset) must + * target this key; writing the deprecated flat key while the namespaced one is present would be + * shadowed. + */ + public static String effectiveMinParallelismKey(ResourceGroup group) { + return group.getProperties().containsKey(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM) + ? OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM + : OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM; + } + + /** Enforce the AIP-5 constraints. No-op when DRA is disabled. */ + public void validate() { + if (!enabled) { + return; + } + if (Constants.EXTERNAL_RESOURCE_CONTAINER.equals(container)) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s cannot enable dynamic allocation on an externally-registered " + + "optimizer (container '%s'); AMS cannot scale optimizers it did not launch.", + groupName, container)); + } + if (maxParallelism == null) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s enables dynamic allocation but '%s' is not set; it is required.", + groupName, OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM)); + } + // min-parallelism: resolveMinParallelism() stays lenient for the keeper hot path and legacy + // compatibility, but on an opted-in group an explicitly configured value must be strict — + // otherwise a typo silently degrades to 0 and a negative value slips through unchecked. + if (minParallelismRaw != null) { + int parsedMin; + try { + parsedMin = Integer.parseInt(minParallelismRaw.trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%s) is not a valid integer.", + groupName, + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, + minParallelismRaw)); + } + if (parsedMin < 0) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%d) must not be negative.", + groupName, OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, parsedMin)); + } + } + if (maxParallelism < minParallelism) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%d) must be >= '%s'(%d).", + groupName, + OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, + maxParallelism, + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, + minParallelism)); + } + if (maxParallelism > OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM_LIMIT) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%d) must not exceed the hard limit %d.", + groupName, + OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, + maxParallelism, + OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM_LIMIT)); + } + Duration idleMin = + ConfigHelpers.TimeUtils.parseDuration( + OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT_MIN); + if (executorIdleTimeout.compareTo(idleMin) < 0) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%s) must be >= %s.", + groupName, + OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, + executorIdleTimeout, + OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT_MIN)); + } + // TODO AIP-5 Phase 2: these polling-oriented timeouts only enforce a positive lower bound; a + // value like "1ns" passes here but would busy-spin the scale loop. Add realistic minimums once + // the scaling loop that consumes them lands. + requirePositive( + OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT, schedulerBacklogTimeout); + requirePositive( + OptimizerProperties.DYNAMIC_ALLOCATION_SUSTAINED_BACKLOG_TIMEOUT, sustainedBacklogTimeout); + requirePositive(OptimizerProperties.DYNAMIC_ALLOCATION_SCALE_DOWN_COOLDOWN, scaleDownCooldown); + requirePositive(OptimizerProperties.DYNAMIC_ALLOCATION_DRAIN_TIMEOUT, drainTimeout); + } + + private void requirePositive(String property, Duration value) { + if (value.isZero() || value.isNegative()) { + throw new IllegalArgumentException( + String.format( + "Resource group:%s '%s'(%s) must be a positive duration.", + groupName, property, value)); + } + } + + private static Duration parseDuration( + Map properties, String property, String defaultValue) { + String value = properties.getOrDefault(property, defaultValue); + return ConfigHelpers.TimeUtils.parseDuration(value); + } + + private static int parseIntOrDefault(String groupName, String value) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + LOG.warn( + "Resource group:{} has an illegal min-parallelism value '{}', using default {}.", + groupName, + value, + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM_DEFAULT); + return OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM_DEFAULT; + } + } + + public boolean isEnabled() { + return enabled; + } + + public int getMinParallelism() { + return minParallelism; + } + + /** + * The configured max-parallelism. Precondition: only call on a config that has passed {@link + * #validate()} with DRA enabled — {@code max-parallelism} is nullable and only mandatory when + * enabled, so calling this on a disabled or unvalidated config may throw {@link + * NullPointerException} on unboxing. + */ + public int getMaxParallelism() { + return maxParallelism; + } + + public Duration getSchedulerBacklogTimeout() { + return schedulerBacklogTimeout; + } + + public Duration getSustainedBacklogTimeout() { + return sustainedBacklogTimeout; + } + + public Duration getExecutorIdleTimeout() { + return executorIdleTimeout; + } + + public Duration getScaleDownCooldown() { + return scaleDownCooldown; + } + + public Duration getDrainTimeout() { + return drainTimeout; + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupController.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupController.java index 6e40077c7f..a4dfa42908 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupController.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupController.java @@ -19,14 +19,19 @@ package org.apache.amoro.server; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.javalin.http.Context; +import org.apache.amoro.OptimizerProperties; +import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.dashboard.controller.OptimizerGroupController; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.table.TableManager; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -77,4 +82,43 @@ void createInvalidGroupName(String groupName) { when(ctx.bodyAsClass(Map.class)).thenReturn(requestBody); assertThrows(BadRequestException.class, () -> controller.createResourceGroup(ctx)); } + + private Map groupRequest(String name, Map properties) { + Map requestBody = new HashMap<>(); + requestBody.put("name", name); + requestBody.put("container", "flink"); + requestBody.put("properties", properties); + return requestBody; + } + + @Test + void createWithInvalidDynamicAllocationIsRejected() { + Map properties = new HashMap<>(); + // enabled without the required max-parallelism + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + + when(ctx.bodyAsClass(Map.class)).thenReturn(groupRequest("group1", properties)); + assertThrows(BadRequestException.class, () -> controller.createResourceGroup(ctx)); + } + + @Test + void updateWithInvalidDynamicAllocationIsRejected() { + Map properties = new HashMap<>(); + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "2048"); + + when(ctx.bodyAsClass(Map.class)).thenReturn(groupRequest("group1", properties)); + assertThrows(BadRequestException.class, () -> controller.updateResourceGroup(ctx)); + } + + @Test + void createWithValidDynamicAllocationSucceeds() { + Map properties = new HashMap<>(); + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "16"); + + when(ctx.bodyAsClass(Map.class)).thenReturn(groupRequest("group1", properties)); + controller.createResourceGroup(ctx); + verify(optimizerManager).createResourceGroup(any(ResourceGroup.class)); + } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java index 889fb02cc7..20d4504cc0 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java @@ -313,6 +313,67 @@ public void testMinParallelismResetToOptimizerParallelWhenNoMoreResource() + ":min-parallelism should be reset to optimizer's current total cores (1) when no more resources available"); } + /** + * Test scenario 5: min-parallelism auto-reset is skipped when dynamic allocation is enabled. + * + *

When a group opts into dynamic allocation, DRA owns scale decisions for the group; the + * keeper must not erode its min-parallelism floor even after exhausting creation attempts. + */ + @Test + public void testAutoResetSkippedWhenDynamicAllocationEnabled() throws InterruptedException { + resourceAvailable.set(false); + scaleOutCallCount.set(0); + ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + "-5", 2); + resourceGroup.getProperties().put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + resourceGroup.getProperties().put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "8"); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + Thread.sleep(200); + + ResourceGroup updatedGroup = optimizerManager().getResourceGroup(resourceGroup.getName()); + Assertions.assertEquals( + "2", + updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM), + resourceGroup.getName() + + ":min-parallelism must not be auto-reset when dynamic allocation is enabled"); + } + + /** + * Test scenario 6: auto-reset writes the namespaced min-parallelism key when the group uses it. + * + *

A group configured with dynamic-allocation.min-parallelism (DRA not enabled) must have the + * auto-reset applied to that key; writing the deprecated flat key would be shadowed by the + * namespaced one, turning the reset into an endless no-op loop. + */ + @Test + public void testAutoResetWritesNamespacedKeyWhenUsed() throws InterruptedException { + resourceAvailable.set(false); + scaleOutCallCount.set(0); + String groupName = TEST_GROUP_NAME + "-6"; + this.currentGroupName = groupName; + Map properties = Maps.newHashMap(); + properties.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "2"); + properties.put("memory", "1024"); + ResourceGroup resourceGroup = + new ResourceGroup.Builder(groupName, MOCK_CONTAINER_NAME).addProperties(properties).build(); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + Thread.sleep(200); + + ResourceGroup updatedGroup = optimizerManager().getResourceGroup(resourceGroup.getName()); + Assertions.assertEquals( + "0", + updatedGroup.getProperties().get(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM), + groupName + ":auto-reset must write the namespaced min-parallelism key the group uses"); + Assertions.assertNull( + updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM), + groupName + ":auto-reset must not introduce the deprecated flat min-parallelism key"); + } + private static OptimizerRegisterInfo buildRegisterInfo(String groupName, int threadCount) { OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo(); Map registerProperties = Maps.newHashMap(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/dra/TestDynamicAllocationConfig.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/dra/TestDynamicAllocationConfig.java new file mode 100644 index 0000000000..31438aa849 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/dra/TestDynamicAllocationConfig.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing.dra; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import org.apache.amoro.OptimizerProperties; +import org.apache.amoro.resource.ResourceGroup; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +public class TestDynamicAllocationConfig { + + private static final String CONTAINER = "flink"; + + private ResourceGroup group(Map properties) { + return new ResourceGroup.Builder("group1", CONTAINER).addProperties(properties).build(); + } + + private Map enabledProps() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "16"); + return props; + } + + private void parseAndValidate(ResourceGroup group) { + DynamicAllocationConfig.parse(group).validate(); + } + + @Test + void enabledWithoutMaxParallelismIsRejected() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void maxParallelismBelowMinIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "32"); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "16"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void maxParallelismAboveHardLimitIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MAX_PARALLELISM, "2048"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void enabledWithUnparsableMinParallelismIsRejected() { + // resolveMinParallelism() is lenient (legacy/keeper path), but an opted-in group must not + // silently degrade an unparsable min-parallelism to 0. + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "abc"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void enabledWithNegativeMinParallelismIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "-3"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void executorIdleTimeoutBelowMinimumIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, "10s"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void unparsableDurationIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT, "not-a-duration"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void zeroDurationIsRejected() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT, "0s"); + Assertions.assertThrows(IllegalArgumentException.class, () -> parseAndValidate(group(props))); + } + + @Test + void externalContainerWithEnabledIsRejected() { + ResourceGroup external = + new ResourceGroup.Builder("group1").addProperties(enabledProps()).build(); + Assertions.assertThrows( + IllegalArgumentException.class, () -> DynamicAllocationConfig.parse(external).validate()); + } + + @Test + void validConfigIsParsedCorrectly() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "4"); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT, "2min"); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, "10min"); + + DynamicAllocationConfig config = DynamicAllocationConfig.parse(group(props)); + assertDoesNotThrow(config::validate); + + Assertions.assertTrue(config.isEnabled()); + Assertions.assertEquals(4, config.getMinParallelism()); + Assertions.assertEquals(16, config.getMaxParallelism()); + Assertions.assertEquals(Duration.ofMinutes(2), config.getSchedulerBacklogTimeout()); + Assertions.assertEquals(Duration.ofMinutes(10), config.getExecutorIdleTimeout()); + Assertions.assertEquals(Duration.ofSeconds(30), config.getSustainedBacklogTimeout()); + Assertions.assertEquals(Duration.ofMinutes(1), config.getScaleDownCooldown()); + Assertions.assertEquals(Duration.ofMinutes(15), config.getDrainTimeout()); + } + + @Test + void minParallelismResolutionPrefersNamespacedValue() { + Map props = enabledProps(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "8"); + props.put(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, "2"); + Assertions.assertEquals(8, DynamicAllocationConfig.resolveMinParallelism(group(props))); + } + + @Test + void minParallelismResolutionFallsBackToLegacyValue() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, "5"); + Assertions.assertEquals(5, DynamicAllocationConfig.resolveMinParallelism(group(props))); + } + + @Test + void minParallelismResolutionDefaultsToZero() { + Assertions.assertEquals( + 0, DynamicAllocationConfig.resolveMinParallelism(group(new HashMap<>()))); + } + + @Test + void disabledConfigSkipsValidation() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "false"); + // max-parallelism absent and an external container would fail if validated, but disabled. + ResourceGroup external = new ResourceGroup.Builder("group1").addProperties(props).build(); + assertDoesNotThrow(() -> DynamicAllocationConfig.parse(external).validate()); + } + + @Test + void effectivelyEnabledWhenValidConfig() { + Assertions.assertTrue(DynamicAllocationConfig.isEffectivelyEnabled(group(enabledProps()))); + } + + @Test + void notEffectivelyEnabledWhenDisabled() { + Assertions.assertFalse(DynamicAllocationConfig.isEffectivelyEnabled(group(new HashMap<>()))); + } + + @Test + void notEffectivelyEnabledWhenConfigInvalid() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_ENABLED, "true"); + // enabled without max-parallelism is invalid, so DRA falls back to disabled. + Assertions.assertFalse(DynamicAllocationConfig.isEffectivelyEnabled(group(props))); + } + + @Test + void effectiveMinParallelismKeyPrefersNamespacedWhenPresent() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, "2"); + props.put(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, "2"); + Assertions.assertEquals( + OptimizerProperties.DYNAMIC_ALLOCATION_MIN_PARALLELISM, + DynamicAllocationConfig.effectiveMinParallelismKey(group(props))); + } + + @Test + void effectiveMinParallelismKeyFallsBackToLegacy() { + Map props = new HashMap<>(); + props.put(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, "2"); + Assertions.assertEquals( + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, + DynamicAllocationConfig.effectiveMinParallelismKey(group(props))); + Assertions.assertEquals( + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, + DynamicAllocationConfig.effectiveMinParallelismKey(group(new HashMap<>()))); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java index dd4a6f624e..bbf9a79d01 100644 --- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java +++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java @@ -32,7 +32,13 @@ public class OptimizerProperties { public static final String OPTIMIZER_EXECUTION_PARALLEL = "execution-parallel"; public static final String OPTIMIZER_MEMORY_SIZE = "memory-size"; public static final String OPTIMIZER_GROUP_NAME = "group-name"; - public static final String OPTIMIZER_GROUP_MIN_PARALLELISM = "min-parallelism"; + + /** + * @deprecated since 0.9.0, use {@link #DYNAMIC_ALLOCATION_MIN_PARALLELISM} instead. Still honored + * as a fallback when the namespaced property is absent. + */ + @Deprecated public static final String OPTIMIZER_GROUP_MIN_PARALLELISM = "min-parallelism"; + public static final String OPTIMIZER_HEART_BEAT_INTERVAL = "heart-beat-interval"; public static final String OPTIMIZER_EXTEND_DISK_STORAGE = "extend-disk-storage"; public static final boolean OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT = false; @@ -49,4 +55,54 @@ public class OptimizerProperties { public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout"; public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min"; public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED = "master-slave-mode-enabled"; + + // Dynamic resource allocation (DRA) properties (AIP-5), configured at the resource group level. + // Semantics and validation rules are documented in DynamicAllocationConfig (amoro-ams). + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_ENABLED = "dynamic-allocation.enabled"; + + public static final boolean DYNAMIC_ALLOCATION_ENABLED_DEFAULT = false; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_MIN_PARALLELISM = + "dynamic-allocation.min-parallelism"; + + public static final int DYNAMIC_ALLOCATION_MIN_PARALLELISM_DEFAULT = 0; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_MAX_PARALLELISM = + "dynamic-allocation.max-parallelism"; + + public static final int DYNAMIC_ALLOCATION_MAX_PARALLELISM_LIMIT = 1024; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT = + "dynamic-allocation.scheduler-backlog-timeout"; + + public static final String DYNAMIC_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT_DEFAULT = "1min"; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_SUSTAINED_BACKLOG_TIMEOUT = + "dynamic-allocation.sustained-backlog-timeout"; + + public static final String DYNAMIC_ALLOCATION_SUSTAINED_BACKLOG_TIMEOUT_DEFAULT = "30s"; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = + "dynamic-allocation.executor-idle-timeout"; + + public static final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT_DEFAULT = "5min"; + public static final String DYNAMIC_ALLOCATION_EXECUTOR_IDLE_TIMEOUT_MIN = "30s"; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_SCALE_DOWN_COOLDOWN = + "dynamic-allocation.scale-down-cooldown"; + + public static final String DYNAMIC_ALLOCATION_SCALE_DOWN_COOLDOWN_DEFAULT = "1min"; + + /** @since 0.9.0 */ + public static final String DYNAMIC_ALLOCATION_DRAIN_TIMEOUT = "dynamic-allocation.drain-timeout"; + + public static final String DYNAMIC_ALLOCATION_DRAIN_TIMEOUT_DEFAULT = "15min"; }