Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import org.apache.kafka.streams.errors.StreamsException;

import java.util.Optional;
import java.util.Set;

public interface GlobalStateManager extends StateManager {

void setGlobalProcessorContext(final InternalProcessorContext<?, ?> processorContext);

/**
* Bootstraps all global state stores. Returns the set of registered store names on success,
* or {@link Optional#empty()} if bootstrap was interrupted by a shutdown request.
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
Set<String> initialize();
Optional<Set<String>> initialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
Expand Down Expand Up @@ -123,14 +125,16 @@ private static class StateStoreMetadata {
private DeserializationExceptionHandler deserializationExceptionHandler;
private ProcessingExceptionHandler processingExceptionHandler;
private Sensor droppedRecordsSensor;
private BooleanSupplier shouldStopBootstrappingSupplier;

public GlobalStateManagerImpl(final LogContext logContext,
final Time time,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
final StreamsConfig config,
final BooleanSupplier shouldStopBootstrappingSupplier) {
this.time = time;
this.topology = topology;
this.stateDirectory = stateDirectory;
Expand All @@ -147,6 +151,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
logPrefix = logContext.logPrefix();
this.globalConsumer = globalConsumer;
this.stateRestoreListener = stateRestoreListener;
this.shouldStopBootstrappingSupplier = shouldStopBootstrappingSupplier;

final Map<String, Object> consumerProps = config.getGlobalConsumerConfigs("dummy");
// need to add mandatory configs; otherwise `QuietConsumerConfig` throws
Expand All @@ -173,66 +178,78 @@ public void setGlobalProcessorContext(final InternalProcessorContext<?, ?> globa
}

@Override
public Set<String> initialize() {
public Optional<Set<String>> initialize() {
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
globalProcessorContext.taskId().toString(),
globalProcessorContext.metrics()
);

final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
for (final StateStore stateStore : topology.globalStateStores()) {
final List<TopicPartition> storePartitions = topicPartitionsForStore(stateStore);
final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore(
stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix);
try {
maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
} catch (final ProcessorStateException e) {
if (eosEnabled) {
log.warn("{}Detected unclean shutdown for global store {}. " +
"Wiping global state directory.", logPrefix, stateStore.name(), e);
try {
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
} catch (final IOException ioe) {
e.addSuppressed(ioe);
try {
final Map<TopicPartition, StateStore> wrappedStores = new HashMap<>();
for (final StateStore stateStore : topology.globalStateStores()) {
final List<TopicPartition> storePartitions = topicPartitionsForStore(stateStore);
final StateStore maybeWrappedStore = LegacyCheckpointingStateStore.maybeWrapStore(
stateStore, eosEnabled, new HashSet<>(storePartitions), stateDirectory, null, logPrefix);
try {
maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
} catch (final ProcessorStateException e) {
if (eosEnabled) {
log.warn("{}Detected unclean shutdown for global store {}. " +
"Wiping global state directory.", logPrefix, stateStore.name(), e);
try {
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
}
throw e;
}
throw e;
}

for (final TopicPartition storePartition : storePartitions) {
wrappedStores.put(storePartition, maybeWrappedStore);
for (final TopicPartition storePartition : storePartitions) {
wrappedStores.put(storePartition, maybeWrappedStore);
}
}
}

// migrate offsets from legacy checkpoint file into the stores
LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores);

for (final StateStoreMetadata metadata : storeMetadata.values()) {
// load the committed offsets from the store
final StateStore store = metadata.stateStore;
if (store.persistent()) {
for (final TopicPartition partition : metadata.changelogPartitions) {
final Long offset = store.committedOffset(partition);
if (offset != null) {
currentOffsets.put(partition, offset);
// migrate offsets from legacy checkpoint file into the stores
LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores);

for (final StateStoreMetadata metadata : storeMetadata.values()) {
if (shouldStopBootstrappingSupplier.getAsBoolean()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: so we ether catch the "interrupted bootstrapping" fact on next iteration or when handling WakeupException. That makes sense

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. First path is supplier check between stores, the other one is wakeupException thrown by methods like poll/partitionsFor/...

log.info("Global store bootstrap interrupted by shutdown before starting {}", metadata.stateStore.name());
return Optional.empty();
}
// load the committed offsets from the store
final StateStore store = metadata.stateStore;
if (store.persistent()) {
for (final TopicPartition partition : metadata.changelogPartitions) {
final Long offset = store.committedOffset(partition);
if (offset != null) {
currentOffsets.put(partition, offset);
}
}
}

// restore or reprocess each registered store using the now-populated currentOffsets
try {
if (metadata.reprocessFactory.isPresent()) {
reprocessState(metadata);
} else {
restoreState(metadata);
}
} finally {
globalConsumer.unsubscribe();
}
}

// restore or reprocess each registered store using the now-populated currentOffsets
try {
if (metadata.reprocessFactory.isPresent()) {
reprocessState(metadata);
} else {
restoreState(metadata);
}
} finally {
globalConsumer.unsubscribe();
return Optional.of(Collections.unmodifiableSet(globalStoreNames));
} catch (final WakeupException e) {
if (!shouldStopBootstrappingSupplier.getAsBoolean()) {
throw e;
}
log.info("Global store bootstrap interrupted by shutdown");
return Optional.empty();
}

return Collections.unmodifiableSet(globalStoreNames);
}

public StateStore globalStore(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
Expand Down Expand Up @@ -79,9 +81,14 @@ public GlobalStateUpdateTask(final LogContext logContext,
*/
@Override
public Map<TopicPartition, Long> initialize() {
final Set<String> storeNames = stateMgr.initialize();
final Optional<Set<String>> storeNames = stateMgr.initialize();
if (storeNames.isEmpty()) {
// bootstrap was interrupted by shutdown; skip topology/processor init to avoid
// opening user resources via Processor#init() during a shutdown.
return Collections.emptyMap();
}
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
for (final String storeName : storeNames) {
for (final String storeName : storeNames.get()) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode<?, ?> source = topology.source(sourceTopic);
deserializers.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ private StateConsumer initialize() {
globalConsumer,
stateDirectory,
stateRestoreListener,
config
config,
this::inErrorState
);

final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
Expand Down Expand Up @@ -429,6 +430,11 @@ private StateConsumer initialize() {
);
}

if (inErrorState()) {
closeStateConsumer(stateConsumer, false);
return null;
}
Comment on lines +433 to +436
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for which scenario we need this check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous try{ stateConsumer.initialize() } catch(...) block calls GlobalStateUpdateTask and will return directly if the shutdown signal is called. In that case, stateConsumer.initialize() returns normally with no exceptions thrown.

This follow-up checks the situation where shutdown is already requested, and routes to cleanup, causes the run() loop to go to the early-exit path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I see it now. thanks for clarification


setState(RUNNING);
return stateConsumer;
} catch (final StreamsException fatalException) {
Expand Down Expand Up @@ -475,8 +481,15 @@ public synchronized void start() {
public void shutdown() {
// one could call shutdown() multiple times, so ignore subsequent calls
// if already shutting down or dead
setState(PENDING_SHUTDOWN);
final boolean wakeupBootstrap;
synchronized (stateLock) {
wakeupBootstrap = (state == State.CREATED);
setState(PENDING_SHUTDOWN);
}
initializationLatch.countDown();
if (wakeupBootstrap) {
globalConsumer.wakeup();
}
}

public Map<MetricName, Metric> consumerMetrics() {
Expand Down
Loading