KAFKA-20602: Bound DLQ topic creation, fix log level and restore interrupt in DeadLetterQueueReporter#22424
Open
PavelZeger wants to merge 2 commits into
Open
KAFKA-20602: Bound DLQ topic creation, fix log level and restore interrupt in DeadLetterQueueReporter#22424PavelZeger wants to merge 2 commits into
PavelZeger wants to merge 2 commits into
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
DeadLetterQueueReporter.createAndSetupverifies/creates the dead letter queue topic during sink task startup. The current implementation has three issues:Indefinite hang on broker outage. Both
admin.listTopics().names().get()andadmin.createTopics(...).all().get()are unbounded blocking calls. When the broker is unreachable, each call blocks until the AdminClient's internaldefault.api.timeout.mselapses (default 60s). Because tasks are started serially, a worker with N DLQ-enabled connectors can stall for up to N x 60s on a cold-broker scenario (DR failover, outage recovery), which surfaces to operators as "all connectors stuck Initializing".Wrong log level. The expected "topic does not exist yet" case on first startup was logged at ERROR, even though the very next line creates the topic. This produces spurious ERROR alerts on every first start of a connector with DLQ enabled.
Interrupt status not restored. The
InterruptedExceptionhandler threw aConnectExceptionwithout callingThread.currentThread().interrupt(), clearing the interrupt signal for any upstream blocking call.Changes:
get(30, TimeUnit.SECONDS)so startup fails fast on an unreachable broker regardless of the AdminClient's configured api timeout, and add acatch (TimeoutException)that surfaces a clearConnectException.InterruptedException.The 30s timeout is hardcoded rather than exposed as a new config to avoid a KIP; it covers the overwhelming majority of clusters and can be revisited if a user reports needing longer.
Testing strategy:
Added three unit tests to
ErrorReporterTest.createAndSetupconstructs its ownAdminandKafkaProducerinternally, so the tests usemockStatic(Admin.class)to inject a mock admin (andmockConstructionforthe producer), then stub the returned
KafkaFuture.get(timeout, unit)to drive each path:createAndSetupTimesOutWhenBrokerUnreachable- stubs aTimeoutExceptionand asserts aConnectExceptionis thrown and the admin client is closed. Done with a mocked timeout rather than a real unreachable broker so the test is deterministic and does not wait the full 30s.createAndSetupRestoresInterruptOnInterruptedException- stubs anInterruptedException, runs on a dedicated thread (so the restored interrupt flag does not leak into other tests), and asserts the flag is set after theConnectExceptionpropagates.firstTimeDlqTopicCreationLogsAtInfoNotError- usesLogCaptureAppenderto assert the first-time creation path logs at INFO and not ERROR.No new integration or system tests: the change is a localized fix to an existing code path with no new public surface or cross-component behavior.