Skip to content

KAFKA-10317: Global thread should honor shutdown signal during bootstrapping#22417

Open
lucliu1108 wants to merge 8 commits into
apache:trunkfrom
lucliu1108:KAFKA-10317
Open

KAFKA-10317: Global thread should honor shutdown signal during bootstrapping#22417
lucliu1108 wants to merge 8 commits into
apache:trunkfrom
lucliu1108:KAFKA-10317

Conversation

@lucliu1108
Copy link
Copy Markdown
Contributor

@lucliu1108 lucliu1108 commented May 29, 2026

Summary

This PR introduces a shutdown-aware bootstrap loop in GlobalStateManagerImpl and a consumer.wakeup() call during GlobalStreamThread.shutdown() that together let KafkaStreams#close() interrupt global-store restoration in progress, instead of waiting for the entire changelog to be replayed.

Ticket: https://issues.apache.org/jira/browse/KAFKA-10317

Implementation

The global thread passes its inErrorState() predicate to the state manager, which checks it before each batch in the bootstrap poll loop and exits cleanly when shutdown is requested. The wakeup() call additionally interrupts any in-flight poll() so shutdown takes effect right away, even if the loop is currently blocked on a fetch. A matching WakeupException catch in the main update loop ensures clean shutdowns aren't reported through the uncaught-exception handler.

Tests

Added unit tests in GlobalStateManagerImplTest covering the supplier check and WakeupException handling in both restoreState and reprocessState, and end-to-end tests in GlobalStreamThreadTest for the close-during-bootstrap scenario.

@github-actions github-actions Bot added triage PRs from the community streams labels May 29, 2026
Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your hard work!
I left a comment.
When you get a chance, please take a look 🙇‍♂️

Comment on lines +519 to +521
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make the shutdown-interrupted bootstrap path explicit instead of returning normally from GlobalStateManagerImpl?

Currently, when inErrorStateSupplier.getAsBoolean() is true, restoreState() / reprocessState() just return, so GlobalStateManagerImpl#initialize() can also return as if bootstrap completed successfully. As a result, GlobalStateUpdateTask#initialize() may continue into initTopology(), processorContext.initialize(), and flushState() even though shutdown has already been requested.

Since initTopology() can invoke user-provided Processor#init(), this could unnecessarily open external resources during shutdown. Maybe this should use an explicit internal signal, such as a dedicated bootstrap-interrupted exception caught only on the clean shutdown path, or return an initialize status like completed/interrupted so the follow-up initialization can be skipped.

What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chickenchickenlove , thanks for the review!
Good point, i refactored to make the interrupted path explicit by:

  • GlobalStateManager.initialize() now returns Optional<Set<String>> intead of Set<String>. Optional.empty() is the explicit "bootstrap was interrupted by shutdown" signal — set both when the supplier-check fires between polls and when a WakeupException is caught during shutdown.
  • GlobalStateUpdateTask.initialize() checks the Optional first; if empty, it returns Collections.emptyMap() immediately and skips the rest of the process.

@github-actions github-actions Bot removed the triage PRs from the community label May 31, 2026
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made initial pass

Comment on lines +360 to +363
if (inErrorStateSupplier.getAsBoolean()) {
logBootstrapInterrupted(storeMetadata);
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a lot of code duplication. Can we move it to dedicated method, or keep it on high level?
AFAIU, when restoration will be completed (on current store) and we'll move to next one we'll interrupt it anyway. Kinda trade-off to not check same condition n-times

for (final StateStoreMetadata metadata : storeMetadata.values()) {
    if (inErrorStateSupplier.getAsBoolean()) {
        log.info("Global store bootstrap interrupted by shutdown before starting {}", metadata.stateStore.name());
        break;
    }
...
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice!
I have removed the check inside the restoreState or reprocessState and instead just have outer supplier check at per-store look in GlobalStateManagerImpl.initialize(), returning empty directly instead of continuing to the next store.

Comment on lines +303 to +309
try {
stateConsumer.pollAndUpdate();
} catch (final WakeupException e) {
if (!inErrorState()) {
throw e;
}
}
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok May 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not part of bootstrapping, is it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The steady-state catch wasn't really about bootstrap. I removed it, and also tightened shutdown() to only triggering globalConsumer.wakeup() when the thread is still in CREATED state (i.e., still bootstrapping). The steady-state main loop is now untouched by this PR.

Comment on lines +440 to +454
if (inErrorState()) {
closeStateConsumer(stateConsumer, false);
return null;
}

setState(RUNNING);
return stateConsumer;
} catch (final WakeupException e) {
closeStateConsumer(stateConsumer, false);
if (inErrorState()) {
log.info("Global thread initialization interrupted by shutdown");
} else {
startupException = new StreamsException(
"Unexpected wakeup during initialization of GlobalStreamThread", e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part should be enough:

if (inErrorState()) {
    closeStateConsumer(stateConsumer, false);
    return null;
}

Do we need to catch WakeupException?
UPD: Overall idea is to break execution of GlobalStateManagerImpl and verify if it was interrupted (check inErrorState())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this!

For the catch {} block, it was originally used for covering the other blocking consumer calls during bootstrap that aren't inside the local poll-catch: GlobalStateManagerImpl#partitionsFor(), endOffsets() and position() (also part of the bootstrapping path). If shutdown() is fired during these paths, WakeupException` will propagate up directly.

Right now after applying the suggestion of @chickenchickenlove , this catch is no longer needed and has been removed. All WakeupException during bootstrap are now caught inside GlobalStateManagerImpl.initialize() and converted to Optional.empty(), so no WakeupExceiption reaches the GlobalStreamThread.initialize().

Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left minor comments, but LGTM overall

LegacyCheckpointingStateStore.migrateLegacyOffsets(logPrefix, stateDirectory, null, wrappedStores);

for (final StateStoreMetadata metadata : storeMetadata.values()) {
if (shouldStopBootstrappingSupplier.getAsBoolean()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: so we ether catch the "interrupted bootstrapping" fact on next iteration or when handling WakeupException. That makes sense

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. First path is supplier check between stores, the other one is wakeupException thrown by methods like poll/partitionsFor/...

Comment on lines +433 to +436
if (inErrorState()) {
closeStateConsumer(stateConsumer, false);
return null;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for which scenario we need this check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous try{ stateConsumer.initialize() } catch(...) block calls GlobalStateUpdateTask and will return directly if the shutdown signal is called. In that case, stateConsumer.initialize() returns normally with no exceptions thrown.

This follow-up checks the situation where shutdown is already requested, and routes to cleanup, causes the run() loop to go to the early-exit path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I see it now. thanks for clarification

Comment on lines +253 to +264
final Thread shutdownThread = new Thread(() -> {
try {
TestUtils.waitForCondition(
() -> stateRestoreListener.storeNameCalledStates.containsKey(MockStateRestoreListener.RESTORE_START),
10 * 1000L,
"Bootstrap restore never started.");
} catch (final Exception e) {
throw new RuntimeException(e);
}
globalStreamThread.shutdown();
});
shutdownThread.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runnable + Executors is better imo, but this is super minor. You can ignore this comment if you like current approach more

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Seems that the raw thread path would not surface the exceptions thrown inside the lambda to the main test thread and returns normally on shutdownThead.join(). I switched to the executor + runnable path.


startAndSwallowError();
shutdownThread.join();
globalStreamThread.join(5_000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is already timeout on test, so we can remove this one I guess

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants