[Fix][Connector-v2][CDC] Fix job hang caused by checkpoint failure during schema evolution under multi-parallelism on Flink1.13#10951
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I re-reviewed the latest head from scratch and traced the full schema-evolution control path through SchemaOperator and the Flink sink writers.
What this PR fixes
- User pain: on Flink 1.13 multi-parallel CDC jobs, schema changes can stall forever after some source subtasks finish and checkpoints stop progressing.
- Fix approach: buffer schema changes in
SchemaOperator, add a checkpoint-stall fallback timer, and force sink-side flush / commit when schema-change control messages arrive. - One-line summary: the problem is real, but the current fix crosses an unsafe boundary by moving commit behavior into the writer path itself.
Runtime chain I checked
schema-change detection
-> SchemaOperator.processElement(...) [SchemaOperator.java:143-173]
-> buffer schema event and later trigger fallback handling [175-199]
sink control path
-> FlinkSinkWriter.handleControlMessage(...) [FlinkSinkWriter.java:159-177]
-> schema_change_event / flush_signal
-> flushBeforeSchemaChange(...) [248-265]
-> sinkWriter.prepareCommit(checkpointId)
-> commitPreparedData(...)
stalled-write path
-> write(...) [131-157]
-> if checkpointStalled => prepareCommit(checkpointId)
-> immediateCommitIfStalled(...)
Findings
Issue 1: the writer now commits prepared data directly, bypassing the normal Flink commit lifecycle
- Location:
FlinkSinkWriter.java:149-156,179-200,248-286 - Problem: the writer path now calls
prepareCommit(...)and then immediately invokessinkCommitter/sinkAggregatedCommitteritself. - Risk: this breaks the usual Flink writer -> committer / aggregated-committer boundary and can invalidate exactly-once assumptions for sinks that rely on the normal checkpoint lifecycle.
- Better fix: keep the stall workaround in a control path that still preserves the standard Flink commit contract instead of committing directly inside the writer.
- Severity: High
Issue 2: flush_signal / stalled mode changes the main write semantics for subsequent data rows
- Location:
FlinkSinkWriter.java:171-190and149-156 - Problem: once
checkpointStalledis enabled, later writes auto-flush and attempt immediate commit on the hot write path. - Risk: data-commit semantics now depend on an internal fallback mode rather than only on checkpoint boundaries, which is too risky for a core Flink sink path.
- Better fix: keep the fallback scoped to schema-change progress handling without converting the normal data write path into ad hoc commit mode.
- Severity: High
Merge conclusion
Conclusion: not ready to merge
- Blocking items
- Issue 1: restore the normal Flink commit boundary instead of committing directly inside the writer.
- Issue 2: do not let checkpoint-stalled fallback mode redefine the normal data write semantics.
- Suggested non-blocking follow-up
- Once the commit-boundary issue is addressed, the fallback-timer idea itself is worth preserving because the original stall condition is real.
Overall, I agree with the problem statement, but the current revision is not safe enough on the sink commit path to merge as-is.
davidzollo
left a comment
There was a problem hiding this comment.
Thanks for working on this. I re-reviewed the latest head from scratch and traced the current Flink 1.13 schema-evolution path again, including SourceExecuteProcessor -> FlinkSourceReader keep-alive -> SchemaOperator checkpoint/fallback -> BroadcastSchemaSinkOperator -> FlinkSinkWriter.
The direct-commit blocker from the previous head is gone, but I still found one blocking correctness gap in the current fallback path.
Runtime path I checked:
DDL detection
-> SchemaOperator.processElement(...)
-> schema change enters pendingQueue and starts fallback timer
Normal safe path
-> notifyCheckpointComplete(...)
-> firstSeenCheckpointId is recorded first
-> after the extra completed checkpoint round, applyNextPendingSchemaChange(...)
Current fallback path
-> handleFallbackTimerOnTaskThread(...)
-> when no checkpoint completes in time, it calls applyNextPendingSchemaChange(...) directly
-> this happens even if the post-DDL checkpoint safety round never completed
Problem 1: the fallback timer now bypasses the checkpoint-completion safety fence that the operator itself relies on to avoid XA/MDL conflicts
- Location:
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:225-238seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java:296-321seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java:179-201
- Severity: High
- Raised before by others: No
- Why this is a real bug:
The operator comment explicitly says the DDL must wait for an extra completed checkpoint round so the earlier XA commit can finish before ALTER TABLE runs. But the new fallback path skips that fence completely: if no checkpoint completes after the schema event,handleFallbackTimerOnTaskThread(...)jumps straight intoapplyNextPendingSchemaChange(...). The new unit test even locks in that behavior by asserting the fallback succeeds without any checkpoint completion. - Risk:
This reopens the exact safety boundary the operator comment is trying to protect. Under the stalled-checkpoint scenario, the DDL can now run while the prior transactional commit / MDL-sensitive work may still be in flight. - Better fix:
Keep the Flink 1.13 keep-alive improvement, but do not let the fallback timer bypass the checkpoint-based safety fence. If a true fallback is still needed, it should preserve the same post-checkpoint ordering guarantee instead of applying the DDL immediately.
Conclusion: fix required before merge
Blocking items:
- Problem 1
Non-blocking suggestions:
- The keep-alive direction itself is reasonable and worth preserving once the fallback ordering is made safe.
|
Re-posting this under Daniel because the previous review submission from my environment was attributed incorrectly. Please treat this Daniel-authored comment as the authoritative maintainer review for the current head. Thanks for working on this. I re-reviewed the latest head from scratch and traced the current Flink 1.13 schema-evolution path again, including The direct-commit blocker from the previous head is gone, but I still found one blocking correctness gap in the current fallback path. Runtime path I checked: Problem 1: the fallback timer now bypasses the checkpoint-completion safety fence that the operator itself relies on to avoid XA/MDL conflicts
Conclusion: fix required before mergeBlocking items:
Non-blocking suggestions:
|
|
Re-posting this under Daniel because the previous submission from my environment was attributed incorrectly. Please treat this Daniel-authored comment as the authoritative maintainer note for the current head. Thanks for working on this. I re-reviewed the latest head from scratch and traced the current Flink 1.13 schema-evolution path again, including The direct-commit blocker from the previous head is gone, but I still found one blocking correctness gap in the current fallback path. Runtime path I checked: Problem 1: the fallback timer now bypasses the checkpoint-completion safety fence that the operator itself relies on to avoid XA/MDL conflicts
Conclusion: fix required before mergeBlocking items:
Non-blocking suggestions:
|
3e38391 to
ba39f9a
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up. I re-reviewed the latest head from scratch and focused on the full current schema-evolution path again, especially the sink commit boundary that I blocked earlier.
What this PR solves
- User pain: in Flink 1.13 multi-parallel CDC jobs, schema changes can stall when checkpoint progress stops and the evolution path cannot advance.
- Fix approach: coordinate schema-change progress through the operator / source-reader / sink-writer path, and send schema-change acknowledgements back without forcing an ad hoc sink commit path.
- One-line summary: the earlier sink-commit blocker is fixed on the latest head; I do not see a new source-level blocker, though I would still tighten one E2E observability pattern.
Runtime path I checked
schema change production
-> SchemaOperator.processElement(...)
-> buffer / coordinate schema-change progress
source-side liveness
-> SourceExecuteProcessor
-> FlinkSourceReader.isAvailable() / keep-alive
sink-side handling
-> FlinkSinkWriter.write(...)
-> handleSchemaChangeEvent(...) [163-205]
-> applySchemaChange(...)
-> sendSchemaChangeAck(...)
normal commit boundary
-> FlinkSinkWriter.prepareCommit(...) [240-262]
-> FlinkSinkWriter.snapshotState() [265-283]
-> stays on the normal Flink lifecycle
Re-review result
- The old blocker is gone: on the latest head I no longer see the writer directly committing prepared data on the schema-change hot path.
FlinkSinkWriterTestalso now protects that boundary explicitly.- So from the source-level side, the current revision is much safer than the earlier direct-commit approach.
Test stability
- Rating: Has risk
- Why:
- In
MysqlCDCWithFlinkSchemaChangeIT.java:122-130,156-164,175-183,199-207, the new E2E still launchescontainer.executeJob(...)via fire-and-forgetCompletableFuture.runAsync(...)and only logs the exception inside that async task. - If the job thread fails early, the test usually surfaces it later as an Awaitility timeout instead of failing immediately with the original exception.
- In
- I would treat this as a non-blocking hardening item, not a merge blocker.
Conclusion: can merge
- Blocking items
- None from my side on the latest head. The commit-boundary issue I blocked earlier is fixed now.
- Suggested non-blocking follow-up
- Keep the async future and fold exceptional completion into the test assertions, so CI failures surface earlier and more clearly.
Overall, the latest head is in much better shape. The high-risk sink-commit semantic issue is no longer present in the current code path.
|
@yzeng1618 Please help review this PR, thx |
|
Thanks for the ping. I rechecked the PR metadata before replying: the current head is still My latest source-level conclusion on this exact revision is unchanged: the earlier sink-commit-boundary blocker I raised is fixed on the current head, and I do not have a new code-path blocker to add on top of the latest implementation. At this point the remaining gate is operational rather than a new source review issue from me:
|
|
One clarification: the issue targets Flink 1.13, whereas this bug has already been fixed in Flink 1.14 and later versions. The planned code changes will be made in the common module. Is this placement appropriate? Would putting the changes into common introduce unnecessary overhead from fallback timers and undesirable reflection coupling? |
| if (fallbackTimerFired) { | ||
| handleFallbackTimerOnTaskThread(); | ||
| } |
There was a problem hiding this comment.
One entry of the fallback logic in SchemaOperator relies on continuous invocations of processElement. However, this PR addresses cases where no more data flows from the source and processElement is no longer triggered, rendering this path invalid. Consequently, fallback processing entirely depends on the timer registered via reflection.
If reflective registration fails, the logic falls back to a flag-based implementation that also requires processElement to run. When there is no incoming data, neither fallback path can be executed, the fallback mechanism fails to work, and the task hanging issue remains unresolved under reflection failure scenarios.
There was a problem hiding this comment.
okay, I plan to remove the fallbackTimerFired flag and the if check in processElement. handleFallbackTimerOnTaskThread() will now be called directly from the ProcessingTimeService timer callback function, so it will fire on the Flink task thread regardless of whether source data arrives. This avoids the blocking fallback path of "reflection failure -> setting flags -> waiting for processElement".
| TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| private void registerProcessingTimeCallback() { |
There was a problem hiding this comment.
registerProcessingTimeCallback() uses reflection throughout to infer the method names of Flink’s internal APIs. Does this introduce potential risks? For instance, any change to method names or parameters would break the logic. Could we instead register the ProcessingTimeCallback via the strongly-typed public API exposed by getProcessingTimeService()?
There was a problem hiding this comment.
okay, thank you for reviewing. I plan to remove registerProcessingTimeCallback() and all its reflection code. SchemaOperator13 overrides the scheduleFallbackTimer() method, and ProcessingTimeService and ProcessingTimeCallback are directly imported as compile-time types into the seatunnel-translation-flink-13 module, so they will not be broken by method renaming in future Flink versions.
| // Schema changes (DDL) are database-level operations that only need to execute once. | ||
| // Due to Flink's partitioning, only one subtask receives the schema change event, | ||
| // so we only need 1 ACK to confirm the DDL was applied successfully. | ||
| int expectedAcks = 1; |
There was a problem hiding this comment.
Completing the request on a single ACK is correct for "the DDL runs once on the DB", but it implicitly assumes every sink subtask's local schema view is refreshed by the broadcast/State path rather than by receiving the event. Could we add a comment stating this precondition, and a multi-table (table-names with ≥2 tables) regression test so the "view refresh covers all subtasks" invariant is guarded? Current E2E only covers single-table.
There was a problem hiding this comment.
The multi-table E2E test itself is a follow-up item and has been noted in the comment for tracking.
Okay, this fix has been moved to seatunnel-translation-flink-13. I plan to implement a new overridden SchemaOperator later; Flink 1.13 will have its own SchemaOperator13, and SourceExecuteProcessor will instantiate SchemaOperator13, which will not affect other versions. |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the follow-up. I re-reviewed the latest head from scratch and focused on the full current schema-evolution path again, especially the sink commit boundary that I blocked earlier.
What this PR solves
- User pain: in Flink 1.13 multi-parallel CDC jobs, schema changes can stall when checkpoint progress stops and the evolution path cannot advance.
- Fix approach: coordinate schema-change progress through the operator / source-reader / sink-writer path, and send schema-change acknowledgements back without forcing an ad hoc sink commit path.
- One-line summary: the earlier sink-commit blocker is fixed on the latest head; I do not see a new source-level blocker, though I would still tighten one E2E observability pattern.
Runtime path I checked
schema change production
-> SchemaOperator.processElement(...)
-> buffer / coordinate schema-change progress
source-side liveness
-> SourceExecuteProcessor
-> FlinkSourceReader.isAvailable() / keep-alive
sink-side handling
-> FlinkSinkWriter.write(...)
-> handleSchemaChangeEvent(...) [163-205]
-> applySchemaChange(...)
-> sendSchemaChangeAck(...)
normal commit boundary
-> FlinkSinkWriter.prepareCommit(...) [240-262]
-> FlinkSinkWriter.snapshotState() [265-283]
-> stays on the normal Flink lifecycle
Re-review result
- The old blocker is gone: on the latest head I no longer see the writer directly committing prepared data on the schema-change hot path.
FlinkSinkWriterTestalso now protects that boundary explicitly.- So from the source-level side, the current revision is much safer than the earlier direct-commit approach.
Test stability
- Rating: Has risk
- Why:
- In
MysqlCDCWithFlinkSchemaChangeIT.java:122-130,156-164,175-183,199-207, the new E2E still launchescontainer.executeJob(...)via fire-and-forgetCompletableFuture.runAsync(...)and only logs the exception inside that async task. - If the job thread fails early, the test usually surfaces it later as an Awaitility timeout instead of failing immediately with the original exception.
- In
- I would treat this as a non-blocking hardening item, not a merge blocker.
CI
- The current GitHub
Buildis still red on the latest head. - The failing fork run I checked contains failures in
unit-test (11, windows-latest),kafka-connector-it (8, ubuntu-latest),all-connectors-it-1 (11, ubuntu-latest), andengine-v2-it (11, ubuntu-latest), so the branch still needs a clean CI signal before merge even though I do not have a new source-level blocker in this PR path.
Conclusion: can merge after fixes
- Blocking items
- No new source-level blocker from my side on the latest head.
- Please get the current
Buildgreen before merge.
- Suggested non-blocking follow-up
- Keep the async future and fold exceptional completion into the test assertions, so CI failures surface earlier and more clearly.
Overall, the latest head is in much better shape. The high-risk sink-commit semantic issue is no longer present in the current code path.
Purpose of this pull request
This PR fixes a Flink schema-evolution deadlock path when checkpoints stop progressing, notably seen on Flink 1.13-style behavior where some source subtasks finish and checkpoint completion stalls.
Fix: #10787
Main changes in seatunnel-translation-flink-common:
Also in this branch:
E2E Flink schema-change test configs raise parallelism from 1 to 5 in:
Does this PR introduce any user-facing change?
Yes.
Previous behavior:
On Flink1.13 With schema evolution enabled, if checkpoint completion stalled, pending schema changes could remain blocked indefinitely and data flow could hang.
New behavior:
Compatibility:
How was this patch tested?
The above configuration file has been adjusted. For specific testing, please see: MysqlCDCWithFlinkSchemaChangeIT
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.