[Feature][Zeta] Support sink partition strategy routing#10964
[Feature][Zeta] Support sink partition strategy routing#10964yzeng1618 wants to merge 5 commits into
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I re-reviewed the latest head from scratch against the actual Zeta runtime path, and the overall partition-routing direction makes sense. The blocker I still see is in the schema-evolution path: once a sink declares a partition strategy, SeaTunnelRow records can be routed to remote sink writers, but SchemaChangeEvent is still consumed only by the local writer.
Runtime chain I checked:
Source / transform emits SchemaChangeEvent
-> SeaTunnelSourceCollector.collect(event)
-> sendRecordToNext(new Record<>(event))
-> SinkPartitionExchange.received(record)
-> if Barrier: broadcast to all sink writers
-> if SchemaChangeEvent: drain batches, then consumeLocally(record) only
Subsequent SeaTunnelRow records
-> SinkPartitionRouter.route(record)
-> hash partition fields to a remote sink writer
-> dispatcher.dispatch(batch)
-> remote writer handles the row in SinkFlowLifeCycle.handleRecord()
-> writer.write(row)
-> but that writer may never have received applySchemaChange(event)
The key paths are:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java:119-133seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchange.java:104-129seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionRouter.java:64-80seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java:327-343
Issue 1: SchemaChangeEvent is not broadcast to all participating sink writers
- Location:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchange.java:112 - Why this is a real blocker:
the new exchange layer correctly broadcasts barriers, but schema changes stay local. After that, normal data rows may still hash to a remote writer. That remote writer then writes post-DDL data before it has applied the schema change. - Risk:
this can break the main write path for any sink that combines partition routing with schema evolution. - Better fix:
either broadcastSchemaChangeEventto everypartitionTargetTask, or explicitly reject the combination of partition strategy + schema evolution until the runtime support is complete. - Severity: High
Issue 2: the new tests still miss the schema-change + remote-routing regression case
- Location:
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/task/flow/SinkPartitionExchangeTest.java:44 - Why it matters:
the current tests cover same-key routing and barrier alignment, but not the failing path above, so Issue 1 can slip through with all tests green. - Severity: Medium
Compatibility:
- This is backward compatible for sinks that do not declare a partition strategy.
- For sinks that do declare one, the current runtime contract is incomplete once schema evolution is involved.
Performance / side effects:
- The extra hashing and writer-to-writer dispatch cost looks acceptable.
- The bigger concern here is correctness, not CPU or memory.
Test stability:
- The newly added UTs are structurally stable: they are in-memory tests without
Thread.sleep, fixed ports, or container timing races. - The gap is coverage, not flakiness.
CI:
- I did not run this locally.
- The current
Buildcheck was still queued when I reviewed, but Issue 1 is already a source-level merge blocker independent of CI.
Merge conclusion: can merge after fixes
- Blocking items
- Issue 1 must be fixed first, because the current schema-evolution path is incorrect once rows are routed to remote sink writers.
- Suggested follow-up
- Add the missing regression test from Issue 2 so this runtime contract stays protected.
Overall, the feature direction is good and the planner/runtime split is clean. The one thing we should not merge as-is is the local-only schema-change handling.
|
This seems to conflict somewhat with STIP‑23. I think we should first come up with a design document. The backpressure PR also needs to be taken into consideration |
|
Thanks for raising the STIP-23 angle. I agree this needs to line up with the broader partition-routing and backpressure design rather than being patched in isolation. From Daniel's side, the current blocker from the latest review is still the same runtime gap: |
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch against the real Zeta sink routing path.
What this PR solves
- User pain: today sink partitioning is mostly local to a writer task, so users cannot declare connector-level partition routing that redistributes rows across sink writers.
- Fix approach: add planner/runtime support for
SinkPartitionStrategy, remote writer exchange, and translator guards for unsupported engines. - One-line summary: the previous schema-change blocker is fixed on the latest head, and my remaining merge concern is CI rather than a new source-level correctness issue.
Runtime path I checked
upstream row / schema event / barrier
-> SinkFlowLifeCycle.received(...)
-> SinkPartitionExchange.received(...)
-> data row: route to target writer
-> schema change: drain batches, broadcast to all target writers
-> barrier: drain batches, broadcast, align once per target writer
-> remote target writer
-> receivedSinkPartitionExchange(...)
-> handleRecord(...)
Re-review result
- The earlier blocker is fixed:
SchemaChangeEventis now broadcast throughcreateBroadcastEnvelopes(...), and the newSinkPartitionExchangeTest.schemaChangeIsBroadcastBeforeRemoteData()covers the exact runtime path that was missing before. - I did not find a new source-level blocker in the current partition-routing logic.
CI
- The current GitHub
Buildcheck is still red for this PR. - From the accessible metadata here I can confirm the red status, but I could not retrieve a detailed failing job log from the fork run via the available GitHub API surface in this environment, so I do not want to invent a root cause.
Conclusion: can merge after fixes
- Blocking items
- Please bring the current
Buildback to green, or provide the failing job details if the red status is from unrelated fork/infra noise.
- Suggested non-blocking follow-up
- None more important than closing the CI loop now that the source-level schema-change gap is fixed.
I rechecked this with STIP-23 in mind.For this PR, hash routing is limited to data rows. Barriers and schema changes are treated as control events, so they are flushed/broadcast instead of being routed by hash. I also updated the design notes to make this explicit. The backpressure/timer signal part is not handled here and should remain in the backpressure PR. |
|
Thanks for circling back with the STIP-23 clarification and the updated design note. That explanation lines up with the latest Daniel re-review on head Since there is still no new commit after that latest Daniel review, I am keeping this as a reply-only follow-up on the unchanged head instead of reopening a full source review. On my side the remaining merge gate is still CI rather than a new runtime-path blocker. |
|
Apologies, my earlier comments may not have been clear enough. Let me reorganize my thoughts here. I've identified a few points in the current approach that I think are worth discussing:
Based on the above considerations, my suggestion is to produce a detailed design first, covering at least the following:
The above is solely my personal opinion and I welcome any corrections @yzeng1618 @davidzollo @zhangshenghang . |
|
Thanks for reorganizing the concerns here, and I appreciate the level of detail. These are thoughtful architecture questions, especially around how a Sink->Sink side channel relates to checkpoint alignment, backpressure propagation, and shared HZ operation threads. I want to be careful not to pretend a brand-new full Daniel re-review happened on the same unchanged head, though. The current PR head is still That said, I do think your discussion points are important input for the next revision. If the author decides to address them with either a design write-up or a code update, please push that as a new commit and I will re-review the latest head from scratch again against the full Zeta runtime path. Until then, from Daniel's side this stays in reply-only / awaiting-update state rather than a new full review on the unchanged head. |
|
I re-checked the current implementation against these concerns. The current SinkPartitionExchange path does introduce a sink-side point-to-point channel: remote delivery goes through Hazelcast task operations, the dispatch side waits on the operation future, and the receive side can synchronously enter the target SinkWriter path from the operation call stack. The exchange also has local buffering/alignment state that is not part of checkpoint state. So I agree this should not be treated as the production Knowledge Sync routing direction without a fuller design. I have drafted a design document first, focusing on the complete document_id flow: assignment, routing, delivery, landing, and the relationship between route buckets, parallel subtasks, and vector-store partitions/logical buckets. The proposed direction is to move the first production routing path toward SourceSplitEnumerator/source-side document assignment, and defer both the sink-side point-to-point channel and a standalone keyed exchange operator until checkpoint, back-pressure, and recovery semantics are designed as first-class engine behavior. Checkpoint alignment and recovery consistency are called out as a follow-up iteration with explicit state boundaries. |
BackgroundKnowledge Sync requires document-scoped lifecycle decisions. A vector lifecycle sink needs to read the old chunks for one document, compare hashes, delete stale chunks, upsert changed chunks, and skip unchanged chunks. If chunks for the same The current implementation direction added a sink-declared partition strategy and a Zeta runtime component named That path is outside the normal Zeta source/transform/sink data exchange boundary. It uses Hazelcast task operations for cross-task delivery, blocks the sending task while waiting for remote dispatch completion, and executes the receiving write path synchronously inside the Hazelcast operation call stack. It also does not snapshot or restore exchange-local buffers and sequence state. This design proposes a safer first production direction: keep document routing on the source side through Goals
Non-Goals
Current Sink-Side Exchange AssessmentThe current sink-side exchange has already addressed some local ordering issues: checkpoint barriers and schema-change events are flushed and broadcast instead of being hashed as ordinary data records. This is important, but it does not make the exchange equivalent to an engine-native data channel. Observed risks:
Therefore this design treats sink-side point-to-point routing as unsuitable for the first production Knowledge Sync routing path. Proposed DirectionUse source-side document routing controlled by The core rule is:
For the first lifecycle implementation, the job should use a pointwise topology where the source reader subtask, transform subtask chain, and sink writer subtask for the same route bucket stay aligned. If the planner cannot guarantee this alignment, the lifecycle sink must fail fast instead of falling back to best-effort routing. This keeps routing in the same place where SeaTunnel already reasons about parallelism, split assignment, checkpointed source state, and failover. Routing ModelRouting KeyThe primary routing key is the physical field The Connectors may provide stronger source-native identities when available, such as database primary keys, object version IDs, or external document IDs. The selected identity must be included in the source/enumerator checkpoint state when it affects pending work assignment. Route BucketThe route bucket is deterministic:
AssignmentThe enumerator discovers documents and assigns each document-level split to the reader that owns the route bucket. The split must represent a whole document or a whole document lifecycle event. Chunk-level split assignment is not allowed in lifecycle mode because it can split one document across multiple decision domains. Complete Data Flow1. DiscoveryThe source enumerator discovers source objects, files, or source records. It normalizes the source address and resolves the document identity. Required output from discovery:
2. Routing AssignmentThe enumerator computes If no reader is registered for a bucket, the split remains pending in enumerator state until the reader is available. On failover, pending and assigned-but-not checkpointed splits are reconstructed from the latest checkpoint and reassigned using the same deterministic rule. 3. Source DeliveryThe source reader reads the whole document split and emits one or more rows. Every emitted row for that document must contain the physical routing and lifecycle fields required downstream:
For a delete event, the reader may emit a compact tombstone row with 4. Transform DeliveryTransforms must preserve the physical routing fields. Document parse, chunking, metadata projection, and embedding transforms may add fields, but they must not drop or rename If a transform can expand one document into many rows, all expanded rows remain inside the same route bucket because expansion happens after the source reader has been assigned the document split. 5. Sink LandingThe lifecycle sink writer receives all rows for one document through the route bucket assigned to its subtask. The writer performs document-scoped lifecycle logic: The sink writer must use idempotent target identifiers: For target systems with physical partitions, such as Milvus, the writer may map the route bucket to a configured partition: For target systems without a matching physical partition concept, such as a single Qdrant collection, the route bucket is a SeaTunnel ownership boundary and Addressing RelationshipExample with
The invariant is not that each bucket has only one document. The invariant is that each document has only one bucket. Back-Pressure and Hot KeysThis design relies on the existing dataflow back-pressure path instead of adding Hazelcast data operations from sink writer to sink writer. A slow lifecycle sink writer slows its normal upstream chain. Hot keys are still possible. A single large document or a small number of very large documents can overload one bucket. The first production version should handle this as an explicit lifecycle trade-off:
Dynamic hot-key migration is out of scope because moving a live document between writers would require a checkpointed rebalance protocol. Checkpoint and Recovery BoundariesCheckpoint alignment can be implemented in a follow-up iteration, but the state boundaries should be fixed now. Source Enumerator StateThe enumerator owns:
Source Reader StateEach reader owns:
If the reader cannot resume inside a document, it may replay the whole document from the last checkpoint. The sink lifecycle path must therefore be idempotent by Sink Writer StateThe sink writer owns:
Visible external side effects before checkpoint completion must be either idempotent or staged. A sink that cannot satisfy this must document at-least-once visibility and pass tests for duplicate replay. Recovery Scenarios
Planner RequirementsFor a lifecycle sink that requires document routing, the planner should validate:
Alternatives ConsideredA. Keep Sink-Side Point-to-Point ExchangeThis preserves the current implementation direction and can route rows even when source and sink parallelism differ. It is not recommended for production Knowledge Sync because it adds a sink-local data channel outside the normal DAG, uses Hazelcast operations for data transfer, and lacks exchange-level checkpoint state. B. Add a Standalone Engine Keyed ExchangeThis is the most general long-term engine solution. It could support C. Source-Side Document AssignmentThis is the recommended first production path. It reuses the existing Implementation Slices
Verification Plan
|
|
Thanks for taking the time to write this up. The source-side document assignment direction is much clearer to me than continuing to harden the current sink-side point-to-point exchange as the first production path. In particular, spelling out the route-bucket ownership, checkpoint boundaries, and the reason to defer a standalone keyed exchange directly addresses the architecture concerns that were raised earlier. Since the PR head is still the same If you decide to keep this PR as the implementation vehicle, the next step needs to be a code update that matches the design direction and makes the intended scope explicit in the PR itself. Once there is a new commit, I am happy to re-review the latest head from scratch against the full Zeta runtime path. |
Purpose of this pull request
This PR adds a sink-declared partition routing contract for RAG / Knowledge Sync scenarios.
Related issues:
Main changes:
SinkPartitionStrategyAPI withNONEandHASH_BY_FIELDSmodes.SeaTunnelSink#getPartitionStrategy()so existing sinks keep current behavior.SeaTunnelRowType.SeaTunnelRow.optionswhen rows are copied.document_id;This is the PR-B0 style foundation for Knowledge Sync: sink partition strategy API plus Zeta-first routing support. It does not implement Qdrant/Milvus document lifecycle behavior in this PR.
Does this PR introduce any user-facing change?
Yes.
This PR introduces a new sink developer API,
SeaTunnelSink#getPartitionStrategy(), that allows a sink to declare field-based routing requirements.Existing sinks are not affected because the default implementation returns an empty strategy.
For Zeta, when a sink declares
HASH_BY_FIELDS(["document_id"]), data rows with the samedocument_idare routed to the same sink writer.For Flink and Spark, non-empty sink partition strategies now fail fast with an explicit unsupported-engine error instead of being silently ignored.
How was this patch tested?
Added tests
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.