Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -75,6 +76,13 @@ public class KubernetesOptimizerContainer extends AbstractOptimizerContainer {

private static final String EXTRA_PROPERTY_PREFIX = "extra.";

/**
* Buffer added on top of optimizer's shutdown-timeout when deriving K8s
* terminationGracePeriodSeconds, to leave room for thread join, best-effort task result
* reporting, and log flush before SIGKILL.
*/
static final long TERMINATION_GRACE_BUFFER_SECONDS = 30L;

private static final Map<String, String> EXTRA_PROPERTY_DEFAULTS = new HashMap<>();

static {
Expand All @@ -86,6 +94,30 @@ private String getExtraProperty(Map<String, String> properties, String key) {
EXTRA_PROPERTY_PREFIX + key, EXTRA_PROPERTY_DEFAULTS.getOrDefault(key, null));
}

static long parseShutdownTimeoutMs(String startUpArgs) {
if (startUpArgs == null) {
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}
String[] tokens = startUpArgs.split("\\s+");
String longFlag = "--" + OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS;
for (int i = 0; i < tokens.length - 1; i++) {
if ("-st".equals(tokens[i]) || longFlag.equals(tokens[i])) {
try {
return Long.parseLong(tokens[i + 1]);
} catch (NumberFormatException e) {
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}
}
}
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}

static long resolveTerminationGracePeriodSeconds(String startUpArgs) {
long shutdownTimeoutMs = parseShutdownTimeoutMs(startUpArgs);
long shutdownTimeoutSeconds = (shutdownTimeoutMs + 999L) / 1000L;
return shutdownTimeoutSeconds + TERMINATION_GRACE_BUFFER_SECONDS;
}

private KubernetesClient client;

@Override
Expand Down Expand Up @@ -220,6 +252,8 @@ public Deployment initPodTemplateWithoutConfig(
long memory,
List<LocalObjectReference> imagePullSecretsList) {

long terminationGraceSeconds = resolveTerminationGracePeriodSeconds(startUpArgs);

DeploymentBuilder deploymentBuilder =
new DeploymentBuilder()
.withNewMetadata()
Expand All @@ -234,11 +268,12 @@ public Deployment initPodTemplateWithoutConfig(
.addToLabels("AmoroResourceId", resourceId)
.endMetadata()
.withNewSpec()
.withTerminationGracePeriodSeconds(terminationGraceSeconds)
.addNewContainer()
.withName("optimizer")
.withImage(image)
.withImagePullPolicy(pullPolicy)
.withCommand("sh", "-c", startUpArgs)
.withCommand("sh", "-c", "exec " + startUpArgs)
.withResources(
new ResourceRequirementsBuilder()
.withLimits(
Expand Down Expand Up @@ -305,7 +340,7 @@ public Deployment initPodTemplateFromFrontEnd(
container.setName("optimizer");
container.setImage(image);
container.setImagePullPolicy(pullPolicy);
container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", startUpArgs)));
container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", "exec " + startUpArgs)));

ResourceRequirements resourceRequirements = new ResourceRequirements();
resourceRequirements.setLimits(
Expand All @@ -324,6 +359,13 @@ public Deployment initPodTemplateFromFrontEnd(
podTemplate.getTemplate().getSpec().setImagePullSecrets(imagePullSecretsList);
}

if (podTemplate.getTemplate().getSpec().getTerminationGracePeriodSeconds() == null) {
podTemplate
.getTemplate()
.getSpec()
.setTerminationGracePeriodSeconds(resolveTerminationGracePeriodSeconds(startUpArgs));
}

DeploymentSpec deploymentSpec = new DeploymentSpec();
deploymentSpec.setTemplate(podTemplate.getTemplate());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,187 @@ public void testBuildPodTemplateConfig() {
.toString());
}

@Test
public void testContainerCommandUsesExecForSignalForwarding() {
ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateWithoutConfig(
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

List<String> command =
deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand();
Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command);
}

@Test
public void testContainerCommandUsesExecWithPodTemplate() {
PodTemplate podTemplate =
kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties);

ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateFromFrontEnd(
podTemplate,
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

List<String> command =
deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand();
Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command);
}

@Test
public void testTerminationGracePeriodFromDefaultShutdownTimeout() {
ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateWithoutConfig(
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds();
Assert.assertNotNull(grace);
// default shutdown-timeout = 600_000ms → 600s + 30s buffer
Assert.assertEquals(630L, grace.longValue());
}

@Test
public void testTerminationGracePeriodFromShutdownTimeoutArg() {
String startUpArgs = "/entrypoint.sh optimizer 1024 -a thrift://x:1261 -p 1 -st 120000";
long grace = KubernetesOptimizerContainer.resolveTerminationGracePeriodSeconds(startUpArgs);
// 120_000ms → 120s + 30s buffer
Assert.assertEquals(150L, grace);
}

@Test
public void testTerminationGracePeriodFromUserPodTemplateRespected() {
PodTemplate podTemplate =
kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties);
podTemplate.getTemplate().getSpec().setTerminationGracePeriodSeconds(900L);

ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateFromFrontEnd(
podTemplate,
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds();
Assert.assertEquals(Long.valueOf(900L), grace);
}

@Test
public void testAMSWithConfigMap() throws Exception {
ConfigMap configMap = buildConfigMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ public class OptimizerProperties {
public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED = "master-slave-mode-enabled";
public static final String OPTIMIZER_SHUTDOWN_TIMEOUT_MS = "shutdown-timeout-ms";
public static final long OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 600_000L; // 10 min
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected void waitAShortTime(long waitTime) {
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
// ignore
Thread.currentThread().interrupt();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Optimizer {
private final OptimizerConfig config;
private final OptimizerToucher toucher;
private final OptimizerExecutor[] executors;
private volatile Thread[] executorThreads;

public Optimizer(OptimizerConfig config) {
this(config, () -> new OptimizerToucher(config), (i) -> new OptimizerExecutor(config, i));
Expand All @@ -54,20 +55,70 @@ protected Optimizer(

public void startOptimizing() {
LOG.info("Starting optimizer with configuration:{}", config);
Arrays.stream(executors)
executorThreads = new Thread[executors.length];
IntStream.range(0, executors.length)
.forEach(
optimizerExecutor -> {
new Thread(
optimizerExecutor::start,
String.format("Optimizer-executor-%d", optimizerExecutor.getThreadId()))
.start();
i -> {
executorThreads[i] =
new Thread(
executors[i]::start,
String.format("Optimizer-executor-%d", executors[i].getThreadId()));
executorThreads[i].start();
});
toucher.withTokenChangeListener(new SetTokenToExecutors()).start();
}

public void stopOptimizing() {
toucher.stop();
LOG.info("Stopping optimizer, waiting for in-progress tasks to complete...");
// Stop executors first so they don't poll new tasks, but keep the toucher alive
// so it keeps sending heartbeats. Otherwise AMS hits its heartbeat-timeout during
// long-running in-flight tasks, unregisters this optimizer, and the subsequent
// best-effort completeTask fails with "Optimizer has not been authenticated".
Arrays.stream(executors).forEach(OptimizerExecutor::stop);

Thread[] threads = executorThreads;
if (threads == null) {
toucher.stop();
LOG.info("Optimizer stopped (no executor threads to wait for)");
return;
}

long shutdownTimeoutMs = config.getShutdownTimeoutMs();
long deadline = System.currentTimeMillis() + shutdownTimeoutMs;
for (Thread t : threads) {
if (t == null) {
continue;
}
long remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
break;
}
try {
t.join(remaining);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for executor thread {} to finish", t.getName());
Thread.currentThread().interrupt();
break;
}
}

for (Thread t : threads) {
if (t != null && t.isAlive()) {
LOG.warn(
"Executor thread {} did not terminate within {}ms timeout, force-interrupting",
t.getName(),
shutdownTimeoutMs);
t.interrupt();
try {
t.join(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
toucher.stop();
LOG.info("Optimizer stopped");
}

public OptimizerToucher getToucher() {
Expand Down
Loading
Loading