diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java index f470254142ede..697ca7da58fb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.errors.StreamsException; +import java.util.Optional; import java.util.Set; public interface GlobalStateManager extends StateManager { @@ -25,8 +26,11 @@ public interface GlobalStateManager extends StateManager { void setGlobalProcessorContext(final InternalProcessorContext processorContext); /** + * Bootstraps all global state stores. Returns the set of registered store names on success, + * or {@link Optional#empty()} if bootstrap was interrupted by a shutdown request. + * * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - Set initialize(); + Optional> initialize(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 8e7347e6fa472..4b5ecd190226c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Time; @@ -64,6 +65,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.BooleanSupplier; import java.util.function.Supplier; import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; @@ -123,6 +125,7 @@ private static class StateStoreMetadata { private DeserializationExceptionHandler deserializationExceptionHandler; private ProcessingExceptionHandler processingExceptionHandler; private Sensor droppedRecordsSensor; + private BooleanSupplier shouldStopBootstrappingSupplier; public GlobalStateManagerImpl(final LogContext logContext, final Time time, @@ -130,7 +133,8 @@ public GlobalStateManagerImpl(final LogContext logContext, final Consumer globalConsumer, final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, - final StreamsConfig config) { + final StreamsConfig config, + final BooleanSupplier shouldStopBootstrappingSupplier) { this.time = time; this.topology = topology; this.stateDirectory = stateDirectory; @@ -147,6 +151,7 @@ public GlobalStateManagerImpl(final LogContext logContext, logPrefix = logContext.logPrefix(); this.globalConsumer = globalConsumer; this.stateRestoreListener = stateRestoreListener; + this.shouldStopBootstrappingSupplier = shouldStopBootstrappingSupplier; final Map consumerProps = config.getGlobalConsumerConfigs("dummy"); // need to add mandatory configs; otherwise `QuietConsumerConfig` throws @@ -173,66 +178,78 @@ public void setGlobalProcessorContext(final InternalProcessorContext globa } @Override - public Set initialize() { + public Optional> initialize() { droppedRecordsSensor = droppedRecordsSensor( Thread.currentThread().getName(), globalProcessorContext.taskId().toString(), globalProcessorContext.metrics() ); - final Map wrappedStores = new HashMap<>(); - for (final StateStore stateStore : topology.globalStateStores()) { - final List storePartitions = topicPartitionsForStore(stateStore); - final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore( - stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix); - try { - maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore); - } catch (final ProcessorStateException e) { - if (eosEnabled) { - log.warn("{}Detected unclean shutdown for global store {}. " + - "Wiping global state directory.", logPrefix, stateStore.name(), e); - try { - Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile()); - } catch (final IOException ioe) { - e.addSuppressed(ioe); + try { + final Map wrappedStores = new HashMap<>(); + for (final StateStore stateStore : topology.globalStateStores()) { + final List storePartitions = topicPartitionsForStore(stateStore); + final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore( + stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix); + try { + maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore); + } catch (final ProcessorStateException e) { + if (eosEnabled) { + log.warn("{}Detected unclean shutdown for global store {}. " + + "Wiping global state directory.", logPrefix, stateStore.name(), e); + try { + Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile()); + } catch (final IOException ioe) { + e.addSuppressed(ioe); + } } + throw e; } - throw e; - } - for (final TopicPartition storePartition : storePartitions) { - wrappedStores.put(storePartition, maybeWrappedStore); + for (final TopicPartition storePartition : storePartitions) { + wrappedStores.put(storePartition, maybeWrappedStore); + } } - } - // migrate offsets from legacy checkpoint file into the stores - LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores); - - for (final StateStoreMetadata metadata : storeMetadata.values()) { - // load the committed offsets from the store - final StateStore store = metadata.stateStore; - if (store.persistent()) { - for (final TopicPartition partition : metadata.changelogPartitions) { - final Long offset = store.committedOffset(partition); - if (offset != null) { - currentOffsets.put(partition, offset); + // migrate offsets from legacy checkpoint file into the stores + LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores); + + for (final StateStoreMetadata metadata : storeMetadata.values()) { + if (shouldStopBootstrappingSupplier.getAsBoolean()) { + log.info("Global store bootstrap interrupted by shutdown before starting {}", metadata.stateStore.name()); + return Optional.empty(); + } + // load the committed offsets from the store + final StateStore store = metadata.stateStore; + if (store.persistent()) { + for (final TopicPartition partition : metadata.changelogPartitions) { + final Long offset = store.committedOffset(partition); + if (offset != null) { + currentOffsets.put(partition, offset); + } + } + } + + // restore or reprocess each registered store using the now-populated currentOffsets + try { + if (metadata.reprocessFactory.isPresent()) { + reprocessState(metadata); + } else { + restoreState(metadata); } + } finally { + globalConsumer.unsubscribe(); } } - // restore or reprocess each registered store using the now-populated currentOffsets - try { - if (metadata.reprocessFactory.isPresent()) { - reprocessState(metadata); - } else { - restoreState(metadata); - } - } finally { - globalConsumer.unsubscribe(); + return Optional.of(Collections.unmodifiableSet(globalStoreNames)); + } catch (final WakeupException e) { + if (!shouldStopBootstrappingSupplier.getAsBoolean()) { + throw e; } + log.info("Global store bootstrap interrupted by shutdown"); + return Optional.empty(); } - - return Collections.unmodifiableSet(globalStoreNames); } public StateStore globalStore(final String name) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 3717859845155..68c0202db94f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -29,8 +29,10 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; @@ -79,9 +81,14 @@ public GlobalStateUpdateTask(final LogContext logContext, */ @Override public Map initialize() { - final Set storeNames = stateMgr.initialize(); + final Optional> storeNames = stateMgr.initialize(); + if (storeNames.isEmpty()) { + // bootstrap was interrupted by shutdown; skip topology/processor init to avoid + // opening user resources via Processor#init() during a shutdown. + return Collections.emptyMap(); + } final Map storeNameToTopic = topology.storeToChangelogTopic(); - for (final String storeName : storeNames) { + for (final String storeName : storeNames.get()) { final String sourceTopic = storeNameToTopic.get(storeName); final SourceNode source = topology.source(sourceTopic); deserializers.put( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index bede888525ad9..292d369630db5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -382,7 +382,8 @@ private StateConsumer initialize() { globalConsumer, stateDirectory, stateRestoreListener, - config + config, + this::inErrorState ); final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl( @@ -429,6 +430,11 @@ private StateConsumer initialize() { ); } + if (inErrorState()) { + closeStateConsumer(stateConsumer, false); + return null; + } + setState(RUNNING); return stateConsumer; } catch (final StreamsException fatalException) { @@ -475,8 +481,15 @@ public synchronized void start() { public void shutdown() { // one could call shutdown() multiple times, so ignore subsequent calls // if already shutting down or dead - setState(PENDING_SHUTDOWN); + final boolean wakeupBootstrap; + synchronized (stateLock) { + wakeupBootstrap = (state == State.CREATED); + setState(PENDING_SHUTDOWN); + } initializationLatch.countDown(); + if (wakeupBootstrap) { + globalConsumer.wakeup(); + } } public Map consumerMetrics() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 4759bdc050841..3333f5ce444e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; @@ -63,6 +64,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; @@ -168,7 +170,8 @@ public void before() { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig); stateManager.setGlobalProcessorContext(processorContext); @@ -237,8 +240,8 @@ public void shouldInitializeStateStores() { @Test public void shouldReturnInitializedStoreNames() { initializeConsumer(0, 0, t1, t2, t3, t4, t5); - final Set storeNames = stateManager.initialize(); - assertEquals(Set.of(storeName1, storeName2, storeName3, storeName4, storeName5), storeNames); + final Optional> storeNames = stateManager.initialize(); + assertEquals(Optional.of(Set.of(storeName1, storeName2, storeName3, storeName4, storeName5)), storeNames); } @Test @@ -639,7 +642,8 @@ public synchronized Map endOffsets(final Collection false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -682,7 +686,8 @@ public synchronized Map endOffsets(final Collection false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -723,7 +728,8 @@ public synchronized Map endOffsets(final Collection false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -771,7 +777,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -805,7 +812,8 @@ public List partitionsFor(final String topic) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -848,7 +856,8 @@ public List partitionsFor(final String topic) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -889,7 +898,8 @@ public List partitionsFor(final String topic) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -937,7 +947,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -971,7 +982,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -1014,7 +1026,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -1055,7 +1068,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -1098,7 +1112,8 @@ public synchronized long position(final TopicPartition partition) { consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -1139,7 +1154,8 @@ public synchronized ConsumerRecords poll(final Duration timeout) consumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); processorContext.setStateManger(stateManager); stateManager.setGlobalProcessorContext(processorContext); @@ -1214,7 +1230,8 @@ public void shouldWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsPre43() throws consumer, downgradeStateDir, stateRestoreListener, - downgradeConfig + downgradeConfig, + () -> false ); final InternalMockProcessorContext downgradeContext = @@ -1252,6 +1269,131 @@ public void shouldNotWriteDowngradeCheckpointOnCloseWhenUpgradeFromIsNull() { assertFalse(legacyGlobalFile.exists()); } + @Test + public void shouldAbortRestoreWhenSupplierFlipsToShutdown() { + final AtomicBoolean inErrorState = new AtomicBoolean(false); + stateManager = new GlobalStateManagerImpl( + new LogContext("test"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig, + inErrorState::get + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + initializeConsumer(6, 1, t1); + initializeConsumer(0, 0, t2, t3, t4, t5); + + inErrorState.set(true); + + stateManager.initialize(); + + // Nothing should have been restored + assertEquals(0L, stateRestoreListener.totalNumRestored); + } + + @Test + public void shouldExitCleanlyOnWakeupDuringBootstrapWhenShuttingDown() { + final AtomicBoolean inErrorState = new AtomicBoolean(false); + final AtomicInteger pollCount = new AtomicInteger(0); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()) { + @Override + public ConsumerRecords poll(final Duration timeout) { + pollCount.incrementAndGet(); + inErrorState.set(true); + throw new WakeupException(); + } + }; + initializeConsumer(6, 1, t1); + initializeConsumer(0, 0, t2, t3, t4, t5); + + stateManager = new GlobalStateManagerImpl( + new LogContext("test"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig, + inErrorState::get + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + assertEquals(Optional.empty(), stateManager.initialize()); + + assertEquals(1, pollCount.get()); + assertTrue(inErrorState.get()); + } + + @Test + public void shouldPropagateWakeupDuringBootstrapWhenNotShuttingDown() { + final AtomicBoolean inErrorState = new AtomicBoolean(false); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()) { + @Override + public ConsumerRecords poll(final Duration timeout) { + throw new WakeupException(); + } + }; + initializeConsumer(6, 1, t1); + initializeConsumer(0, 0, t2, t3, t4, t5); + + stateManager = new GlobalStateManagerImpl( + new LogContext("test"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig, + inErrorState::get + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + assertThrows(WakeupException.class, () -> stateManager.initialize()); + } + + @Test + public void shouldExitCleanlyOnWakeupDuringReprocessingWhenShuttingDown() { + setUpReprocessing(); + + final AtomicBoolean inErrorState = new AtomicBoolean(false); + final AtomicInteger pollCount = new AtomicInteger(0); + consumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()) { + @Override + public ConsumerRecords poll(final Duration timeout) { + pollCount.incrementAndGet(); + inErrorState.set(true); + throw new WakeupException(); + } + }; + initializeConsumer(0, 0, t1, t2, t3, t4); + initializeConsumer(6, 1, t5); + + stateManager = new GlobalStateManagerImpl( + new LogContext("test"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig, + inErrorState::get + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + assertEquals(Optional.empty(), stateManager.initialize()); + + assertEquals(1, pollCount.get()); + assertTrue(inErrorState.get()); + } + private void writeCorruptCheckpoint() throws IOException { final File checkpointFile = new File(stateManager.baseDir(), StateManagerUtil.CHECKPOINT_FILE_NAME); try (final OutputStream stream = Files.newOutputStream(checkpointFile.toPath())) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTaskTest.java new file mode 100644 index 0000000000000..b3ef500c1d128 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTaskTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.internals.LogContext; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +public class GlobalStateUpdateTaskTest { + + @Mock + private ProcessorTopology topology; + @Mock + private InternalProcessorContext processorContext; + @Mock + private GlobalStateManager stateMgr; + @Mock + private DeserializationExceptionHandler deserializationExceptionHandler; + @Mock + private ProcessingExceptionHandler processingExceptionHandler; + + @Test + public void shouldSkipTopologyAndProcessorInitWhenBootstrapInterrupted() { + when(stateMgr.initialize()).thenReturn(Optional.empty()); + + final GlobalStateUpdateTask task = new GlobalStateUpdateTask( + new LogContext("test"), + topology, + processorContext, + stateMgr, + deserializationExceptionHandler, + processingExceptionHandler, + new MockTime(), + 0L + ); + + final Map offsets = task.initialize(); + + verify(topology, never()).processors(); + verify(processorContext, never()).initialize(); + verify(stateMgr, never()).changelogOffsets(); + assertTrue(offsets.isEmpty()); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java index 7c0216cd07f58..95688374b2894 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -56,6 +57,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; @@ -243,6 +247,64 @@ public void shouldTransitionToRunningOnStart() throws Exception { globalStreamThread.shutdown(); } + @Test + @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS) + public void shouldShutdownDuringBootstrap() throws Exception { + initializeConsumer(); + mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1_000_000L)); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future shutdownFuture = executor.submit(() -> { + try { + TestUtils.waitForCondition( + () -> stateRestoreListener.storeNameCalledStates.containsKey(MockStateRestoreListener.RESTORE_START), + 10 * 1000L, + "Bootstrap restore never started."); + } catch (final Exception e) { + throw new RuntimeException(e); + } + globalStreamThread.shutdown(); + }); + + startAndSwallowError(); + shutdownFuture.get(); + globalStreamThread.join(); + } finally { + executor.shutdown(); + } + + assertEquals(DEAD, globalStreamThread.state()); + } + + @Test + public void shouldThrowStreamsExceptionOnStartupIfWakeupOccursWithoutShutdown() throws Exception { + final MockConsumer wakeupOnPartitionsFor = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name()) { + @Override + public List partitionsFor(final String topic) { + throw new WakeupException(); + } + }; + globalStreamThread = new GlobalStreamThread( + builder.rewriteTopology(config).buildGlobalStateTopology(), + config, + wakeupOnPartitionsFor, + new StateDirectory(config, time, true, false), + 0, + new StreamsMetricsImpl(new Metrics(), "test-client", time), + time, + "clientId", + stateRestoreListener, + e -> { } + ); + + final StreamsException e = assertThrows(StreamsException.class, () -> globalStreamThread.start()); + assertThat(e.getCause(), instanceOf(WakeupException.class)); + + globalStreamThread.join(); + assertFalse(globalStreamThread.stillRunning()); + } + @Test public void shouldDieOnInvalidOffsetExceptionDuringStartup() throws Exception { final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME); diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 1237deacf9b92..af7fe5f0459d9 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -26,6 +26,7 @@ import java.io.File; import java.util.Map; +import java.util.Optional; import java.util.Set; public class GlobalStateManagerStub implements GlobalStateManager { @@ -49,9 +50,9 @@ public GlobalStateManagerStub(final Set storeNames, public void setGlobalProcessorContext(final InternalProcessorContext processorContext) {} @Override - public Set initialize() { + public Optional> initialize() { initialized = true; - return storeNames; + return Optional.of(storeNames); } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2738458062a58..424124bebf919 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -441,7 +441,8 @@ private void setupGlobalTask(final Time mockWallClockTime, globalConsumer, stateDirectory, stateRestoreListener, - streamsConfig + streamsConfig, + () -> false ); final GlobalProcessorContextImpl globalProcessorContext =