diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java b/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java deleted file mode 100644 index 45726f9dfc793..0000000000000 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/HarnessBackedClusterInstance.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tiered.storage; - -import kafka.integration.KafkaServerTestHarness; -import kafka.server.ControllerServer; -import kafka.server.KafkaBroker; - -import org.apache.kafka.common.network.ConnectionMode; -import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.api.ClusterConfig; -import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.server.fault.FaultHandlerException; -import org.apache.kafka.test.TestSslUtils; -import org.apache.kafka.test.TestUtils; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -import scala.jdk.javaapi.CollectionConverters; - -/** - * A {@link ClusterInstance} implementation backed by a {@link KafkaServerTestHarness}. - * This allows {@link TieredStorageTestContext} to depend only on {@link ClusterInstance} - * rather than on the concrete harness class. - */ -public class HarnessBackedClusterInstance implements ClusterInstance { - - private final KafkaServerTestHarness harness; - - public HarnessBackedClusterInstance(KafkaServerTestHarness harness) { - this.harness = harness; - } - - @Override - public Type type() { - return Type.KRAFT; - } - - @Override - public ClusterConfig config() { - return ClusterConfig.defaultBuilder() - .setBrokers(harness.brokers().size()) - .setControllers(harness.controllerServers().size()) - .setBrokerSecurityProtocol(harness.securityProtocol()) - .setBrokerListenerName(harness.listenerName()) - .setControllerListenerName(controllerListenerName()) - .setMetadataVersion(harness.metadataVersion()) - .build(); - } - - @Override - public Set controllerIds() { - return controllers().keySet(); - } - - @Override - public ListenerName clientListener() { - return harness.listenerName(); - } - - @Override - public ListenerName controllerListenerName() { - return controllers().values().stream() - .map(c -> new ListenerName(c.config().controllerListenerNames().get(0))) - .findFirst() - .orElseThrow(() -> new RuntimeException("No controllers available")); - } - - @Override - public String bootstrapServers() { - return harness.bootstrapServers(harness.listenerName()); - } - - @Override - public String bootstrapControllers() { - throw new UnsupportedOperationException("bootstrapControllers() is not supported in HarnessBackedClusterInstance"); - } - - @Override - public String clusterId() { - return brokers().values().stream() - .map(KafkaBroker::clusterId) - .findFirst() - .orElseThrow(() -> new RuntimeException("No brokers available")); - } - - @Override - public Map brokers() { - return CollectionConverters.asJava(harness.brokers()).stream() - .collect(Collectors.toMap(b -> b.config().brokerId(), b -> b)); - } - - @Override - public Map controllers() { - return CollectionConverters.asJava(harness.controllerServers()).stream() - .collect(Collectors.toMap(c -> c.config().nodeId(), c -> c)); - } - - @Override - public void shutdownBroker(int brokerId) { - harness.killBroker(brokerId); - } - - @Override - public void startBroker(int brokerId) { - harness.startBroker(brokerId); - } - - @Override - public void start() { - throw new UnsupportedOperationException("start() is managed by KafkaServerTestHarness"); - } - - @Override - public boolean started() { - return true; - } - - @Override - public void stop() { - throw new UnsupportedOperationException("stop() is managed by KafkaServerTestHarness"); - } - - @Override - public boolean stopped() { - return false; - } - - @Override - public void waitForReadyBrokers() throws InterruptedException { - Map brokerMap = brokers(); - - // Step 1: wait until a controller marks all brokers as registered and unfenced - ControllerServer controllerServer = controllers().values().iterator().next(); - try { - controllerServer.controller().waitForReadyBrokers(brokerMap.size()).get(); - } catch (ExecutionException e) { - throw new AssertionError("Failed while waiting for brokers to become ready", e); - } - - // Step 2: wait until each broker's metadata cache knows about all alive brokers - Set brokerIds = brokerMap.keySet(); - TestUtils.waitForCondition( - () -> brokerMap.values().stream().allMatch( - broker -> brokerIds.stream().allMatch(id -> broker.metadataCache().hasAliveBroker(id)) - ), - "Timed out waiting for metadata cache to reflect all alive brokers" - ); - } - - @Override - public Optional firstFatalException() { - return Optional.ofNullable(harness.faultHandler().firstException()); - } - - @Override - public Optional firstNonFatalException() { - return Optional.empty(); - } - - @Override - public Map setClientSslConfig(Map configs) { - if (harness.trustStoreFile().isEmpty()) { - return configs; - } - try { - Map props = new HashMap<>(configs); - props.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.CLIENT) - .useExistingTrustStore(harness.trustStoreFile().get()) - .build()); - return props; - } catch (Exception e) { - throw new RuntimeException("Failed to build client SSL config", e); - } - } -} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/README.md b/storage/src/test/java/org/apache/kafka/tiered/storage/README.md index ddfc10216720d..6d80d6e9872c1 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/README.md +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/README.md @@ -1,11 +1,11 @@ # The Test Flow -Step 1: For every test, setup is done via TieredStorageTestHarness which extends IntegrationTestHarness and sets up a cluster with TS enabled on it. +Step 1: Each test is a standalone class. It declares a `clusterConfig()` method that returns a `ClusterConfig` with tiered storage enabled (via `TieredStorageTestUtils.createServerPropsForRemoteStorage`), and test methods annotated with `@ClusterTemplate("clusterConfig")` that receive a `ClusterInstance` provided by the test framework. -Step 2: The test is written as a specification consisting of sequential actions and assertions. The spec for the complete test is written down first which creates "actions" to be executed. +Step 2: The test is written as a specification consisting of sequential actions and assertions. The spec for the complete test is built first using `TieredStorageTestBuilder`, which creates the "actions" to be executed. -Step 3: Once we have the test spec in-place (which includes assertion actions), we execute the test which will execute each action sequentially. +Step 3: A `TieredStorageTestContext` is created from the `ClusterInstance` (plus any extra consumer config). The test is then executed by running each action of the spec sequentially against the context. -Step 4: The test execution stops when any of the action throws an exception (or an assertion error). +Step 4: The test execution stops when any of the actions throws an exception (or an assertion error). -Step 5: Clean-up for the test is performed on test exit \ No newline at end of file +Step 5: Clean-up for the test is performed on test exit — the `TieredStorageTestContext` is closed (it is `AutoCloseable`, typically via try-with-resources) and the test report is printed. \ No newline at end of file diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 428cc5f766262..0e769492aab5f 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -46,6 +46,7 @@ import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; import org.apache.kafka.tiered.storage.specs.TopicSpec; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils; import java.io.FilenameFilter; import java.io.IOException; @@ -104,8 +105,8 @@ private void initClients() { } private void initContext() { - remoteStorageManagers = TieredStorageTestHarness.remoteStorageManagers(cluster.aliveBrokers().values()); - localStorages = TieredStorageTestHarness.localStorages(cluster.aliveBrokers().values()); + remoteStorageManagers = TieredStorageTestUtils.remoteStorageManagers(cluster.aliveBrokers().values()); + localStorages = TieredStorageTestUtils.localStorages(cluster.aliveBrokers().values()); } public void createTopic(TopicSpec spec) throws ExecutionException, InterruptedException { @@ -260,7 +261,7 @@ public void eraseBrokerStorage(int brokerId, boolean isStopped) throws IOException { BrokerLocalStorage brokerLocalStorage; if (isStopped) { - brokerLocalStorage = TieredStorageTestHarness.localStorages(cluster.brokers().values()) + brokerLocalStorage = TieredStorageTestUtils.localStorages(cluster.brokers().values()) .stream() .filter(bls -> bls.getBrokerId() == brokerId) .findFirst() diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java deleted file mode 100644 index a25bffa543933..0000000000000 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tiered.storage; - -import kafka.api.IntegrationTestHarness; -import kafka.server.KafkaBroker; - -import org.apache.kafka.common.replica.ReplicaSelector; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.config.ReplicationConfigs; -import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; -import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; -import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; -import org.apache.kafka.server.log.remote.storage.RemoteLogManager; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; -import org.apache.kafka.test.TestUtils; -import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; - -import scala.collection.Seq; -import scala.jdk.javaapi.CollectionConverters; - -import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC; -import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage; -/** - * Base class for integration tests exercising the tiered storage functionality in Apache Kafka. - * This uses a {@link LocalTieredStorage} instance as the second-tier storage system and - * {@link TopicBasedRemoteLogMetadataManager} as the remote log metadata manager. - */ -@Tag("integration") -public abstract class TieredStorageTestHarness extends IntegrationTestHarness { - - private TieredStorageTestContext context; - private String testClassName = ""; - private String storageDirPath = ""; - - @Override - public void modifyConfigs(Seq props) { - for (Properties p : CollectionConverters.asJava(props)) { - p.putAll(overridingProps()); - } - } - - @Override - public Seq kraftControllerConfigs(TestInfo testInfo) { - return CollectionConverters.asScala(List.of(overridingProps())).toSeq(); - } - - protected int numRemoteLogMetadataPartitions() { - return 5; - } - - public Properties overridingProps() { - Properties overridingProps = createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(), - numRemoteLogMetadataPartitions(), new Properties()); - readReplicaSelectorClass() - .ifPresent(c -> overridingProps.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, c.getName())); - return overridingProps; - } - - protected Optional> readReplicaSelectorClass() { - return Optional.empty(); - } - - protected abstract void writeTestSpecifications(TieredStorageTestBuilder builder); - - protected void overrideConsumerConfig(Map consumerConfig) { - } - - @BeforeEach - @Override - public void setUp(TestInfo testInfo) { - testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault()); - storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); - super.setUp(testInfo); - Map extraConsumerProps = new HashMap<>(); - overrideConsumerConfig(extraConsumerProps); - context = new TieredStorageTestContext(new HarnessBackedClusterInstance(this), extraConsumerProps); - } - - // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedGroupProtocolNames() in the ParameterizedTest name. - @ParameterizedTest(name = "{displayName}.groupProtocol={0}") - @MethodSource("getTestGroupProtocolParametersAll") - public void executeTieredStorageTest(String groupProtocol) { - TieredStorageTestBuilder builder = new TieredStorageTestBuilder(); - writeTestSpecifications(builder); - try { - for (TieredStorageTestAction action : builder.complete()) { - action.execute(context); - } - } catch (Exception ex) { - throw new AssertionError("Could not build test specifications. No test was executed.", ex); - } - } - - @AfterEach - @Override - public void tearDown() { - try { - Utils.closeQuietly(context, "TieredStorageTestContext"); - super.tearDown(); - context.printReport(System.out); - } catch (Exception ex) { - throw new AssertionError("Failed to close the tear down the test harness.", ex); - } - } - - public static List remoteStorageManagers(Collection brokers) { - List storages = new ArrayList<>(); - brokers.forEach(broker -> { - if (broker.remoteLogManagerOpt().isDefined()) { - RemoteLogManager remoteLogManager = broker.remoteLogManagerOpt().get(); - RemoteStorageManager storageManager = remoteLogManager.storageManager(); - if (storageManager instanceof ClassLoaderAwareRemoteStorageManager loaderAwareRSM) { - if (loaderAwareRSM.delegate() instanceof LocalTieredStorage) { - storages.add((LocalTieredStorage) loaderAwareRSM.delegate()); - } - } else if (storageManager instanceof LocalTieredStorage) { - storages.add((LocalTieredStorage) storageManager); - } - } else { - throw new AssertionError("Broker " + broker.config().brokerId() - + " does not have a remote log manager."); - } - }); - return storages; - } - - public static List localStorages(Collection brokers) { - return brokers.stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()), - STORAGE_WAIT_TIMEOUT_SEC)) - .collect(Collectors.toList()); - } -} diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index cd0d243da04e3..df34cd749eeb3 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tiered.storage.utils; +import kafka.server.KafkaBroker; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.TopicDescription; @@ -27,21 +28,27 @@ import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.junit.jupiter.api.Assertions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG; @@ -66,6 +73,34 @@ public class TieredStorageTestUtils { private static final Integer RLM_TASK_INTERVAL_MS = 500; private static final Integer RLMM_INIT_RETRY_INTERVAL_MS = 300; + public static List remoteStorageManagers(Collection brokers) { + List storages = new ArrayList<>(); + brokers.forEach(broker -> { + if (broker.remoteLogManagerOpt().isDefined()) { + RemoteLogManager remoteLogManager = broker.remoteLogManagerOpt().get(); + RemoteStorageManager storageManager = remoteLogManager.storageManager(); + if (storageManager instanceof ClassLoaderAwareRemoteStorageManager loaderAwareRSM) { + if (loaderAwareRSM.delegate() instanceof LocalTieredStorage) { + storages.add((LocalTieredStorage) loaderAwareRSM.delegate()); + } + } else if (storageManager instanceof LocalTieredStorage) { + storages.add((LocalTieredStorage) storageManager); + } + } else { + throw new AssertionError("Broker " + broker.config().brokerId() + + " does not have a remote log manager."); + } + }); + return storages; + } + + public static List localStorages(Collection brokers) { + return brokers.stream() + .map(b -> new BrokerLocalStorage(b.config().brokerId(), Set.copyOf(b.config().logDirs()), + STORAGE_WAIT_TIMEOUT_SEC)) + .collect(Collectors.toList()); + } + public static TopicDescription describeTopic(TieredStorageTestContext context, String topic) throws ExecutionException, InterruptedException { return describeTopics(context, List.of(topic)).get(topic);