Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
Expand Down Expand Up @@ -181,6 +182,18 @@ private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
// Fail-safe: a persisted group carrying an invalid DRA config (e.g. manual DB edits)
// must not crash AMS. Surface it and fall back to DRA-disabled behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a more implicit way to disable it? It doesn’t seem appropriate to validate the group's configuration when loading optimizing queues.

DynamicAllocationConfig.warnDeprecatedMinParallelism(group);
try {
DynamicAllocationConfig.parse(group).validate();
} catch (IllegalArgumentException e) {
LOG.warn(
"Resource group:{} has an invalid dynamic-allocation config, "
+ "falling back to DRA-disabled mode: {}",
groupName,
e.getMessage());
}
List<DefaultTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
Expand Down Expand Up @@ -840,19 +853,7 @@ public int compareTo(@NotNull Delayed o) {
}

public int getMinParallelism(ResourceGroup resourceGroup) {
if (!resourceGroup
.getProperties()
.containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) {
return 0;
}
String minParallelism =
resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
try {
return Integer.parseInt(minParallelism);
} catch (Throwable t) {
LOG.warn("Illegal minParallelism : {}, will use default value 0", minParallelism, t);
return 0;
}
return DynamicAllocationConfig.resolveMinParallelism(resourceGroup);
}

public int tryKeeping(ResourceGroup resourceGroup) {
Expand Down Expand Up @@ -921,6 +922,17 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {

if (keepingTask.getAttempts() > groupMaxKeepingAttempts) {
int minParallelism = keepingTask.getMinParallelism(resourceGroup);
if (DynamicAllocationConfig.isEffectivelyEnabled(resourceGroup)) {
// Dynamic allocation owns scale decisions for the group; never erode its
// min-parallelism floor automatically.
LOG.warn(
"Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}; dynamic allocation is enabled so min-parallel is kept",
resourceGroup.getName(),
keepingTask.getAttempts(),
minParallelism);
keepInTouch(resourceGroup.getName(), 1);
return;
}
LOG.warn(
"Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}",
resourceGroup.getName(),
Expand All @@ -930,7 +942,7 @@ protected void processTask(OptimizerGroupKeepingTask keepingTask) {
resourceGroup
.getProperties()
.put(
OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM,
DynamicAllocationConfig.effectiveMinParallelismKey(resourceGroup),
String.valueOf(minParallelism - requiredCores));
updateResourceGroup(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.server.dashboard.utils.PropertiesUtil;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.dra.DynamicAllocationConfig;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.OptimizerInstance;
Expand Down Expand Up @@ -242,7 +243,9 @@ public void createResourceGroup(Context ctx) {
validateGroupName(name);
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
ResourceGroup resourceGroup = builder.build();
validateDynamicAllocation(resourceGroup);
optimizerManager.createResourceGroup(resourceGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}

Expand All @@ -257,7 +260,9 @@ public void updateResourceGroup(Context ctx) {
Map<String, String> properties = PropertiesUtil.sanitizeProperties((Map) map.get("properties"));
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
ResourceGroup resourceGroup = builder.build();
validateDynamicAllocation(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

Expand All @@ -283,6 +288,15 @@ public void getContainers(Context ctx) {
.collect(Collectors.toList())));
}

private void validateDynamicAllocation(ResourceGroup resourceGroup) {
DynamicAllocationConfig.warnDeprecatedMinParallelism(resourceGroup);
try {
DynamicAllocationConfig.parse(resourceGroup).validate();
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
}

private void validateGroupName(String groupName) {
if (StringUtils.isEmpty(groupName)) {
throw new BadRequestException(
Expand Down
Loading
Loading