diff --git a/docs/docusaurus/sidebars.json b/docs/docusaurus/sidebars.json index cd174d59523c1e..7adfb1d2eb0aba 100644 --- a/docs/docusaurus/sidebars.json +++ b/docs/docusaurus/sidebars.json @@ -186,6 +186,7 @@ "loading/Stream_Load_transaction_interface", "loading/Flink_cdc_load", "loading/Flink-connector-starrocks", + "loading/Flink_multi_table_transaction", "loading/load_from_pulsar", "loading/Load_to_Primary_Key_tables", "loading/Etl_in_loading", diff --git a/docs/en/ecosystem_release/flink_connector.md b/docs/en/ecosystem_release/flink_connector.md index d84945f412739f..43fa569871a2b8 100644 --- a/docs/en/ecosystem_release/flink_connector.md +++ b/docs/en/ecosystem_release/flink_connector.md @@ -29,10 +29,10 @@ description: "Release notes and changelog for the StarRocks Connector for Apache | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 | > **NOTICE** > @@ -42,6 +42,24 @@ description: "Release notes and changelog for the StarRocks Connector for Apache ### 1.2 +#### 1.2.15 + +Release date: June 18, 2026 + +##### Features + +- Added multi-table transaction Stream Load support. [#487](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/487) + +##### Improvements + +- Merge Commit supports logging data quality error messages. [#484](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/484) + +##### BugFix + +- Fix multi-table transaction concurrency: serialize per-table loads and align cross-table commits. [#491](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/491) +- Fallback to FE cancel API when rollback fails for PREPARE-state lingering transactions. [#488](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/488) +- Do not quote CURRENT_TIMESTAMP in the DEFAULT clause when building column statements. [#486](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/486) + #### 1.2.14 Release date: February 11, 2026 diff --git a/docs/en/loading/Flink-connector-starrocks.md b/docs/en/loading/Flink-connector-starrocks.md index 995f14306fe88d..6ff47514047128 100644 --- a/docs/en/loading/Flink-connector-starrocks.md +++ b/docs/en/loading/Flink-connector-starrocks.md @@ -17,10 +17,10 @@ The Flink connector supports DataStream API, Table API & SQL, and Python API. It | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 | ## Obtain Flink connector @@ -77,13 +77,13 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency sh build.sh ``` - For example, if the Flink version in your environment is 1.15, you need to execute the following command: + For example, if the Flink version in your environment is 1.16, you need to execute the following command: ```bash - sh build.sh 1.15 + sh build.sh 1.16 ``` -3. Go to the `target/` directory to find the Flink connector JAR file, such as `flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar`, generated upon compilation. +3. Go to the `target/` directory to find the Flink connector JAR file, such as `flink-connector-starrocks-1.2.7_flink-1.16-SNAPSHOT.jar`, generated upon compilation. > **NOTE** > diff --git a/docs/en/loading/Flink_multi_table_transaction.md b/docs/en/loading/Flink_multi_table_transaction.md new file mode 100644 index 00000000000000..8dd54bc5ce89ca --- /dev/null +++ b/docs/en/loading/Flink_multi_table_transaction.md @@ -0,0 +1,667 @@ +--- +displayed_sidebar: docs +description: "Enable Multi-table Transaction for a Flink job to write to multiple tables within the same database in one processing cycle." +--- + +# Load data from Apache Flink® with Multi-table Transaction + +StarRocks Flink Connector supports Multi-table Transaction to load data from Flink into multiple tables atomically. + +## Use Cases + +When a single Flink job writes to multiple tables within the same StarRocks database in one processing cycle, enabling multi-table transaction guarantees: + +- **Cross-table atomic commit**: Data written to different table within the same commit cycle becomes visible atomically — all or nothing. +- **Source transaction integrity**: A complete upstream transaction (for example, from Kafka) is never split across two StarRocks transactions. +- **Sub-second data freshness**: Data continuously flows into StarRocks via `/api/transaction/load`, and is committed at the interval configured by `sink.buffer-flush.interval-ms`. + +Typical scenarios: + +- Synchronous writes to a general table and a detail table (for example, `orders` and `order_items`) +- Event routing to different partition tables (for example, `events_202601`, `events_202602`) +- A single job maintaining multiple interrelated downstream result tables + +:::tip[Prerequisites] +To enable Multi-table Transaction, you must running your cluster on StarRocks v4.0 and later (with the Multi-table Transaction Stream Load support), and StarRocks Flink Connector on v1.2.9 and later. +::: + +## Core Capabilities + +| Capability | Description | +| ---------------------------- | ------------------------------------------------------------ | +| Cross-table atomic commit | All tables within the same flush cycle share one StarRocks transaction label. The Prepare and Commit operations are unified. | +| Source transaction integrity | Commit timing is controlled by the `transactionEnd` flag. Commit only occurs at complete source transaction boundaries. | +| Sub-second data visibility | Data is periodically flushed to StarRocks (`/api/transaction/load`). It is committed when the `transactionEnd` and the timer conditions are met | +| N:1 transaction mapping | Multiple source transactions can accumulate in a single StarRocks transaction. They do not have to be mapped 1:1. | +| Within-partition ordering | `keyBy(sourcePartition)` ensures transactions from the same partition are processed in order within the same sink subtask. | + +## Configurations + +### Multi-table Transaction Configurations + +#### `sink.transaction.multi-table.enabled` + +- Type: Boolean +- Default: `false` +- Description: Whether to enable the Multi-table Atomic Transaction mode. + +#### `sink.transaction.multi-table.buffer-size` + +- Type: Long +- Default: `134217728` (128 MB) +- Unit: Bytes +- Description: Global buffer size in bytes for the Multi-table Transaction mode. When the total buffered data across all tables reaches this threshold, a flush is triggered. + +### Load-related Configurations + +#### `sink.version` + +- Recommended Value: `V2` +- Description: Required. `V1` does not support the transaction Stream Load interface. + +#### `sink.semantic` + +- Recommended Value: `at-least-once` +- Description: Multi-table mode currently supports `at-least-once` only. + +#### `database-name` + +- Recommended Value: `*` +- Description: Wildcard to enable dynamic multi-table routing. + +#### `table-name` + +- Recommended Value: `*` +- Description: Wildcard to enable dynamic multi-table routing. + +#### `sink.buffer-flush.interval-ms` + +- Recommended Value: `1000` +- Description: Controls the commit cycle. You can set it to `1000` to achieve the freshness of approximately one second. + +#### `sink.properties.format` + +- Recommended Value: `json` +- Description: The data format. + +#### `sink.properties.strip_outer_array` + +- Recommended Value: `true` +- Description: Whether to strip the outermost array structure. + +## Interfaces + +### `StarRocksRowData` + +```java +public interface StarRocksRowData { + String getUniqueKey(); // Region routing key (nullable; auto-derived from database.table) + String getDatabase(); // Target database + String getTable(); // Target table + String getRow(); // Row data in JSON format + + /** + * Indicates this is the last row of a source transaction batch. + * Used by multi-table transaction mode to determine safe commit points: + * the connector only commits when the most recent write had this flag set, + * ensuring no partial source transaction is committed. + */ + default boolean isTransactionEnd() { + return false; + } + + /** + * Returns the source partition ID for this row. + * Used by multi-table transaction mode to track per-partition transaction + * boundaries. Returns -1 when partition tracking is not applicable. + */ + default int getSourcePartition() { + return -1; + } +} +``` + +### `DefaultStarRocksRowData` + +```java +public class DefaultStarRocksRowData implements StarRocksRowData { + // Basic fields + private String uniqueKey; + private String database; + private String table; + private String row; + + // Multi-table transaction fields + private boolean transactionEnd; // Source transaction end marker + private int sourcePartition = -1; // Source partition ID (for keyBy ordering) + + // Constructors + public DefaultStarRocksRowData(); + public DefaultStarRocksRowData(String database, String table); + public DefaultStarRocksRowData(String uniqueKey, String database, String table, String row); + + // Setters + public void setUniqueKey(String uniqueKey); + public void setDatabase(String database); + public void setTable(String table); + public void setRow(String row); + public void setTransactionEnd(boolean transactionEnd); + public void setSourcePartition(int sourcePartition); + + // Getters (inherited from StarRocksRowData) + public String getUniqueKey(); + public String getDatabase(); + public String getTable(); + public String getRow(); + public boolean isTransactionEnd(); + public int getSourcePartition(); +} +``` + +### User-Implemented Component + +Users need to implement a `KeyedProcessFunction` (referred to as `TransactionAssembler` in this document) that: + +1. Keys by source partition and buffers data rows within a transaction +2. Emits all rows only when the source transaction is closed (for example, upon receiving `TXN_END`) +3. Sets `transactionEnd=true` on the last row +4. Sets `sourcePartition` on every row + +No custom `SinkFunction` is needed — the standard connector API (`SinkFunctionFactory.createSinkFunction()`) handles everything. + +## Complete Example + +### StarRocks Table DDL + +```sql +CREATE DATABASE `test`; + +CREATE TABLE `test`.`orders` ( + `order_id` BIGINT NOT NULL, + `customer_id` BIGINT NOT NULL, + `total_amount` DECIMAL(10,2) DEFAULT "0", + `order_status` VARCHAR(32) DEFAULT "" +) ENGINE=OLAP PRIMARY KEY(`order_id`) +DISTRIBUTED BY HASH(`order_id`) +PROPERTIES("replication_num" = "1"); + +CREATE TABLE `test`.`order_items` ( + `item_id` BIGINT NOT NULL, + `order_id` BIGINT NOT NULL, + `product_name` VARCHAR(128) DEFAULT "", + `quantity` INT DEFAULT "0", + `price` DECIMAL(10,2) DEFAULT "0" +) ENGINE=OLAP PRIMARY KEY(`item_id`) +DISTRIBUTED BY HASH(`item_id`) +PROPERTIES("replication_num" = "1"); +``` + +### Flink Job Code + +```java +import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData; +import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class WriteMultipleTablesWithTransaction { + + // ===================================================== + // 1. Source Event Model (define per your business logic) + // ===================================================== + + enum EventType { TXN_BEGIN, DATA, TXN_END } + + static class TxnEvent implements Serializable { + private static final long serialVersionUID = 1L; + + int partition; + String txnId; + EventType type; + String database; + String table; + String json; + + TxnEvent() {} + + TxnEvent(int partition, String txnId, EventType type, + String database, String table, String json) { + this.partition = partition; + this.txnId = txnId; + this.type = type; + this.database = database; + this.table = table; + this.json = json; + } + + static TxnEvent begin(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_BEGIN, null, null, null); + } + + static TxnEvent data(int partition, String txnId, String db, String table, String json) { + return new TxnEvent(partition, txnId, EventType.DATA, db, table, json); + } + + static TxnEvent end(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_END, null, null, null); + } + } + + // ===================================================== + // 2. TransactionAssembler — Core User Component + // ===================================================== + + /** + * Buffers DATA events per partition; on TXN_END emits the complete + * transaction's rows as individual DefaultStarRocksRowData records. + * + * Only emits when a source transaction is fully closed (TXN_END received). + * All rows are emitted synchronously within one processElement() call, + * so they enter the downstream sink buffer without intervening checkpoint barriers. + * + * Multiple source transactions accumulate in the sink's buffer between + * flush cycles — the connector handles grouping them into StarRocks transactions. + */ + static class TransactionAssembler + extends KeyedProcessFunction { + + private transient ListState pendingRows; + + @Override + public void open(Configuration parameters) throws Exception { + ListStateDescriptor descriptor = new ListStateDescriptor<>( + "pending-txn-rows", Types.POJO(TxnEvent.class)); + pendingRows = getRuntimeContext().getListState(descriptor); + } + + @Override + public void processElement(TxnEvent event, Context ctx, + Collector out) throws Exception { + switch (event.type) { + case TXN_BEGIN: + pendingRows.clear(); + break; + + case DATA: + pendingRows.add(event); + break; + + case TXN_END: + List rows = new ArrayList<>(); + for (TxnEvent row : pendingRows.get()) { + rows.add(row); + } + int partition = ctx.getCurrentKey(); + for (int i = 0; i < rows.size(); i++) { + TxnEvent row = rows.get(i); + DefaultStarRocksRowData rowData = new DefaultStarRocksRowData( + null, row.database, row.table, row.json); + rowData.setSourcePartition(partition); + // Mark the last row as transaction end + if (i == rows.size() - 1) { + rowData.setTransactionEnd(true); + } + out.collect(rowData); + } + pendingRows.clear(); + break; + } + } + } + + // ===================================================== + // 3. Main Program + // ===================================================== + + public static void main(String[] args) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030"); + String loadUrl = params.get("loadUrl", "127.0.0.1:8030"); + String userName = params.get("userName", "root"); + String password = params.get("password", ""); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10_000); // Checkpoint is for recovery only; does not affect commit cycle + + // --- Source --- + // Replace with an actual Kafka Source that deserializes into TxnEvent + DataStream events = env.addSource(/* KafkaSource or MockTxnEventSource */); + + // --- Step 1: Assemble complete transactions per partition --- + // TransactionAssembler emits rows only after TXN_END. + // Multiple closed source txns accumulate in the sink buffer between flushes. + DataStream rows = events + .keyBy(e -> e.partition) + .process(new TransactionAssembler()); + + // --- Step 2: Partition-affinity routing to sink --- + // keyBy(sourcePartition) routes same-partition data to the same sink subtask. + // The connector uses per-partition regions internally, so even when multiple + // partitions land on the same sink subtask, transaction boundaries are tracked + // independently per partition via PartitionCommitTracker. + DataStream partitionedRows = rows + .keyBy(DefaultStarRocksRowData::getSourcePartition); + + // --- Step 3: Configure the connector --- + // sink.transaction.multi-table.enabled=true activates per-partition region + // tracking inside StreamLoadManagerV2: each partition's regions are switched + // independently when its txnEnd arrives, and commit triggers only when all + // active partitions have been switched. + StarRocksSinkOptions options = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", jdbcUrl) + .withProperty("load-url", loadUrl) + .withProperty("database-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("table-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("username", userName) + .withProperty("password", password) + .withProperty("sink.version", "V2") // Required: V2 + .withProperty("sink.semantic", "at-least-once") + .withProperty("sink.transaction.multi-table.enabled", "true") // Enable multi-table txn + .withProperty("sink.buffer-flush.interval-ms", "1000") // ~1s data freshness + .withProperty("sink.properties.format", "json") + .withProperty("sink.properties.strip_outer_array", "true") + .build(); + + // Optional: per-table stream load properties + StreamLoadTableProperties orderItemsProps = StreamLoadTableProperties.builder() + .database("test") + .table("order_items") + .addProperty("format", "json") + .addProperty("strip_outer_array", "true") + .addProperty("ignore_json_size", "true") + .build(); + options.addTableProperties(orderItemsProps); + + // --- Step 4: Create and attach the sink --- + // Standard connector API — no custom SinkFunction needed. + // addSink on the keyBy'd stream ensures partition affinity. + SinkFunction sink = SinkFunctionFactory.createSinkFunction(options); + partitionedRows.addSink(sink); + + env.execute("WriteMultipleTablesWithTransaction"); + } +} +``` + +### Data Flow Topology + +``` +Kafka (60 partitions) + | + v +keyBy(partition) ———— Ensures same-partition events go to the same subtask + | + v +TransactionAssembler (KeyedProcessFunction) + | Buffers DATA events + | On TXN_END: emits all rows (last row has transactionEnd=true) + | Every row carries sourcePartition + | + v +keyBy(sourcePartition) ———— Ensures same-partition rows go to the same sink subtask + | + v +StarRocksDynamicSinkFunctionV2 (via SinkFunctionFactory.createSinkFunction) + | + | +——————————— StreamLoadManagerV2 (multi-table txn mode) —————————-——+ + | | | + | | Per-partition, per-table regions: | + | | Region(P0, orders), Region(P0, order_items) | + | | Region(P2, orders), Region(P2, order_items) | + | | | + | | Each region tracks: | + | | - activeChunk / inactiveChunks | + | | - lastSwitchTimeMs (for miniInterval batching) | + | | - activeChunkCleanBoundary (true iff last task event is txnEnd)| + | | | + | | write(partition, db, table, row) [task thread] | + | | -> routes to Region(partition, db, table) | + | | -> write0 sets activeChunkCleanBoundary = false | + | | -> In multi-table mode write0 does NOT switchChunk, so | + | | activeChunk only freezes at a txnEnd boundary | + | | | + | | setCommitAllowed(partition, txnEnd=true) [task thread] | + | | -> region.tryMiniIntervalSwitch(): | + | | sets activeChunkCleanBoundary = true | + | | if (now - lastSwitchTimeMs >= miniInterval | + | | && activeChunk has data): switchChunkForCommit | + | | else: data batches into activeChunk with subsequent | + | | completed source transactions (N:1 mapping) | + | | -> PartitionCommitTracker.onTxnEnd(partition) | + | | | + | | SharedTransactionCoordinator: | + | | -> eagerly opens shared txn before any flush | + | | -> all autonomous flushes use the shared label | + | | -> recycles idle txn at 80% of server timeout | + | | | + | | Manager thread (every scanningFrequency): | + | | -> tryForceCleanSwitch per region: | + | | if cleanBoundary && has data && miniInterval elapsed | + | | -> switchChunkForCommit (source-idle fallback) | + | | -> tryStartTimerDrivenCommit: | + | | if commitInterval elapsed && hasDataLoaded | + | | -> set commitInFlight = true | + | | -> autonomous flush: drain inactiveChunks via streamLoad | + | | (never touches activeChunk in multi-table mode) | + | | | + | | Manager thread (commitInFlight=true): | + | | -> triggerLoadIfNeeded per region (HTTP /api/transaction/load) | + | | -> wait all loads complete | + | | -> unified commit via SharedTransactionCoordinator | + | | -> reset tracker; open next shared txn | + | +———————————————————————————————————————————————————————————————————+ + | + v +StarRocks (test.orders + test.order_items) +``` + +## How It Works + +### Chunk Lifecycle and `miniInterval` Batching + +Each `(partition, table)` region has one `activeChunk` (currently accepting writes) and a FIFO of `inactiveChunks` (frozen data, pending HTTP load). In multi-table transaction mode, the **only** way data moves from `activeChunk` into `inactiveChunks` is via `switchChunkForCommit`, which is called from exactly three sites: + +1. **Task thread, on txnEnd** — via `region.tryMiniIntervalSwitch()` inside `setCommitAllowed(partition, true)`. This is the common path. + +2. **Manager thread, source-idle fallback** — via `region.tryForceCleanSwitch()` on every scan cycle, for regions whose `activeChunk` has been sitting idle at a clean transaction boundary. + +3. **Manager thread, savepoint/recycle** — force-switch all regions after verifying every region is at a clean boundary. + +To avoid one HTTP load per source transaction in high-throughput CDC, the task thread only performs a switch when at least `miniSwitchIntervalMs` has elapsed since the previous switch on the same region. `miniSwitchIntervalMs` is computed as `min(1000 ms, max(100 ms, commitInterval / 10))`, so a 1-second commit interval batches at 100 ms while a 30-second interval caps batching at 1 second. Within a miniInterval window, multiple completed source transactions accumulate into the same `activeChunk` (N:1 mapping) and are frozen together on the next switch. + +Each region carries two fields that drive these decisions: + +- `lastSwitchTimeMs` — epoch ms of the most recent switch. Initially 0, so the very first txnEnd after region creation always triggers a switch. + +- `activeChunkCleanBoundary` — `true` if the most recent task-thread event on this region was either an `onTxnEnd` or a `switchChunk`. `false` after any `write()`. The manager thread's `tryForceCleanSwitch` only runs when this flag is `true`, so it can never freeze partial source-transaction data. + +### Task Thread — Write and txnEnd + +``` +invoke(record) [Flink task thread] + | + | if record is a data row: + | write(partition, db, table, row) + | region.write(row) → addRow(); cleanBoundary = false + | + | if record carries transactionEnd=true: + | setCommitAllowed(partition, true) + | for each region owned by this partition: + | region.tryMiniIntervalSwitch(): + | cleanBoundary = true // always (txnEnd observed) + | if (now - lastSwitchTimeMs >= miniInterval + | && activeChunk has data): + | switchChunkForCommit() // freezes activeChunk + | partitionTracker.onTxnEnd(partition) // safety bookkeeping only +``` + +The task thread is the sole serializer of `write()` and `setCommitAllowed()` events, which is why both the `cleanBoundary` mark and the conditional switch must run here (not deferred to the manager thread): `cleanBoundary` must reflect the most recent task-thread event at all times, or the manager's idle-fallback could race a write and freeze partial data. + +### 6.3 Manager Thread — Time-Driven Commit + +The manager thread runs a scan loop at `scanningFrequency`. Each iteration: + +1. **Ensure shared transaction**: open a new one (eager) or proactively recycle the current one if it is approaching the StarRocks server-side timeout (80% of `timeout` header, default 480 s). + +2. **Source-idle fallback**: call `region.tryForceCleanSwitch()` on every region to freeze any `activeChunk` that is clean and has been idle for at least `miniInterval`. This handles the "source paused after a few txnEnds" case where the task thread stopped before issuing a fresh switch. + +3. **Time-driven commit trigger**: call `tryStartTimerDrivenCommit()`, which sets `commitInFlight=true` if **both** conditions hold: + + - `now - lastCommitTimeMs >= commitInterval` (the configured `sink.buffer-flush.interval-ms`). + + - There is data to commit — either `txnCoordinator.hasDataLoaded()` is true or at least one region still has pending inactiveChunks. + +4. **Autonomous flush**: drain any region whose `inactiveChunks` is non-empty via the `FlushAndCommitStrategy`. Multi-table mode's `flush()` only streams out already-frozen inactive chunks — it **never** touches `activeChunk`. This preserves the invariant that every chunk reaching StarRocks under the shared label comes from completed source transactions. + +When `commitInFlight=true` the main loop enters `processMultiTableCommit`, which waits for in-flight loads, triggers loads for any remaining inactive chunks, runs the unified commit via `SharedTransactionCoordinator`, updates `lastCommitTimeMs`, resets the tracker, and opens a new shared transaction for the next cycle. + +### Shared Transaction Coordination + +All tables within the same commit cycle share a single StarRocks transaction managed by `SharedTransactionCoordinator`: + +1. **Eager transaction opening**: A shared transaction is opened eagerly before any autonomous flush, so all HTTP loads use the shared label. This eliminates the data-loss window where an independent-label flush could be orphaned when a shared transaction later overwrites the label. + +2. **Unified commit**: After all regions' data is loaded, a single `commit` is executed for the shared label. Multi-table transactions skip the `prepare` step because StarRocks does not support `TXN_PREPARE` in multi-table mode. + +3. **Idle transaction recycling**: If the shared transaction remains open longer than 80% of the StarRocks server-side timeout (default: 480s for a 600s timeout), it is proactively recycled (commit-or-rollback + reopen) to prevent server-side timeout errors. Recycle fails fast if any region has in-progress transaction data (clean-boundary violation) or any partition has written data without ever receiving a txnEnd. + +### 6.5 PartitionCommitTracker (Safety Bookkeeping) + +In the current design, commit timing is driven entirely by the commit interval and the per-region clean-boundary flags. `PartitionCommitTracker` is reduced to an informational/safety aid: + +- `onWrite(partition)` registers a partition as `ACTIVE` on first write. + +- `onTxnEnd(partition)` transitions the partition to `TXN_END_SEEN` (sticky — subsequent writes do **not** demote it back to `ACTIVE`). + +- `getPartitionsWithoutTxnEnd()` lists partitions that have written data but never received a txnEnd. Used by savepoint and recycle to fail fast on upstream contract violations (a source transaction that never closed). + +- `reset()` clears all partitions at the end of a commit cycle. + +The tracker no longer drives switch/commit decisions, does not track a `SWITCHED` state, and does not manage pending txnEnd signals. Partitions that stop producing data after a commit are simply cleared by `reset()`; if they resume later, the next `onWrite` re-registers them. + +### Autonomous Flush with Shared Labels + +When a region's `inactiveChunks` becomes non-empty, the manager thread's autonomous-flush loop streams it to StarRocks via `/api/transaction/load` under the **current shared label**. Because the shared transaction is always opened before any data is loaded, there is no window where data could be loaded under an independent (orphaned) label. In multi-table mode, `flush()` never triggers a `switchChunk` — it only drains already-frozen inactive chunks — so the invariant "every chunk reaching StarRocks under the shared label comes from completed source transactions" holds unconditionally. + +### Safety Guarantees + +| Guarantee | Mechanism | +| ------------------------------------------------- | ----------------------------------------------------------------------------- | +| switchChunk does not split source transactions | `switchChunkForCommit` is only called at a clean transaction boundary (on txnEnd or when `activeChunkCleanBoundary` is `true`). | +| Commit never includes partial source transactions | Every chunk reaching StarRocks originates from `switchChunkForCommit`. Autonomous flush never switches `activeChunk` in multi-table mode. | +| Per-partition isolation | Each `(partition, table)` has its own region. One partition's switch never affects another's data. | +| Within-partition ordering | `keyBy(sourcePartition)` routes same-partition rows to the same sink subtask. | +| Task thread is non-blocking | `tryMiniIntervalSwitch` is O(regions-in-partition). HTTP work happens asynchronously on the manager thread. | +| Source-idle data visibility | Manager thread's `tryForceCleanSwitch` freezes clean `activeChunk`s after `miniInterval` of idleness, so data remains visible within `commitInterval + miniInterval` even if the source pauses. | +| Autonomous flushes are transaction-safe | Every load uses the shared label. Every frozen chunk is from completed source transactions. | +| Idle transactions do not timeout | Shared transactions are recycled at 80% of server timeout. Recycle fails fast on in-progress data. | +| Per-partition independent commit | A partition with a completed source transaction commits on the next commit interval, independent of other partitions' in-progress transactions. | + +### N:1 Transaction Mapping + +Multiple source transactions can accumulate in a single StarRocks transaction via the miniInterval batching mechanism: + +``` +Source txn K1 (3 rows) -> write -> activeChunk -> txnEnd (1st switch) +Source txn K2 (2 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) +Source txn K3 (4 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) + (miniInterval elapsed → next switch batches K2+K3) + -> ... +commitInterval elapsed + -> commit(label=A) + -> K1 + K2 + K3 atomically committed to StarRocks +``` + +Because the commit decision is time-driven (not tied to a specific txnEnd), the connector amortizes HTTP-load and begin/commit overhead across many small source transactions without any configuration changes. For a CDC source emitting 100 txnEnds per second with `commitInterval=1 s`, `miniInterval=100 ms`, the connector issues at most ~10 load calls per second instead of ~100. + +## Limitations + +- **Requires `sink.version=V2`**: V1 does not support transaction stream load. + +- **At-least-once only**: Failed retries may produce duplicate writes. Multi-table mode guarantees all tables within the same batch succeed or fail together, but does not provide global exactly-once. For PRIMARY KEY tables, duplicate writes are idempotent (upsert). + +- **All tables must be in the same database**: StarRocks multi-table transactions are database-scoped; cross-database transactions are not supported. + +- **Transaction scope is per sink subtask + per partition**: Each sink subtask maintains its own StarRocks transaction independently. Atomicity is guaranteed **within a single source transaction** (all rows for one txnEnd on one partition, across all tables that partition writes to). Data visibility across **different** source partitions can interleave: once partition P0's source transaction has fully arrived and the commit interval has elapsed, P0's data is committed even if partition P1's source transaction is still in progress. Applications that require cross-partition atomicity at the StarRocks level must either use a single source partition or coordinate commits upstream. + +- **Data visibility latency**: Governed by `sink.buffer-flush.interval-ms` and the internal `miniInterval = min(1000, max(100, commitInterval/10))`. In a continuously flowing CDC stream, an individual row becomes visible in StarRocks within roughly `commitInterval + miniInterval` of its source commit. During a source pause, previously-committed data remains visible while any row between the last switch and the pause becomes visible after one more `miniInterval` (handled by the manager-thread clean-boundary fallback). + +- **Depends on StarRocks cluster transaction settings**: Monitor running txn limits, prepared timeout (default 600s), and label retention. Ensure `sink.buffer-flush.interval-ms` is significantly shorter than the StarRocks transaction timeout. + +- **`activeChunk` memory growth under long source transactions**: Because multi-table mode disables chunk-size-triggered internal switching (to preserve the clean-transaction-boundary invariant), `activeChunk` can grow until the next txnEnd arrives. Memory is bounded by `sink.transaction.multi-table.buffer-size` (soft) and `2 × buffer-size` (hard via `blockIfCacheFull`). Exceptionally large source transactions will throttle the task thread via back-pressure; if this becomes routine, either split the source transactions upstream or increase `sink.transaction.multi-table.buffer-size`. + +- **Cross-database writes are rejected**: Multi-table transactions validate that all regions belong to the same database. Writing to tables in different databases within the same commit cycle will throw an error. + +- **Incompatible with merge commit**: `sink.properties.enable_merge_commit=true` cannot be combined with `sink.transaction.multi-table.enabled=true`. Merge commit routes writes through `MergeCommitManager`, which lacks the partition-aware `write(int, ...)` / `setCommitAllowed(int, ...)` hooks that multi-table mode relies on for transaction boundaries. The connector fails fast at validation time if both are enabled. + +## Monitoring and Troubleshooting + +Recommended metrics: + +- **On the Flink Side** + - Checkpoint success rate + - Checkpoint duration + - Sink flush/commit latency +- **On the StarRocks Side** + - Running/prepared txn count + - Txn timeout occurrences + - Label conflicts + +### Common issues + +#### `transaction not existed` + +- Cause: StarRocks transaction timeout +- Solution: The connector automatically recycles idle transactions at 80% of server timeout. If this still occurs, check if prepared timeout is too short or flush interval is too large. + +#### `too many running txns` + +- Cause: There are excessive concurrent transactions. +- Solution: Reduce sink parallelism or increase the value of StarRocks FE configuration `max_running_txn_num_per_db`. + +#### `Transaction start failed` + +- Cause: The `beginTransaction` HTTP call failed +- Solution: Verify load-url connectivity and the StarRocks version (v4.0 or later is required). + +#### High data visibility latency + +- Cause: Commit conditions are not met. +- Solution: Verify upstream data has correct `transactionEnd=true` markers; expect up to `commitInterval + miniInterval` latency per row. If latency exceeds this budget, check the manager thread is not stuck in a recycle or in-flight load (see `StarRocks-Sink-Manager` logs). + +#### Cross-database write error + +- Cause: Tables in different databases are in same commit cycle. +- Solution: Ensure all tables written in the same job belong to the same StarRocks database. + +## Best Practices + +1. **TransactionAssembler contract**: + + - Emit rows only after the source transaction is fully closed. + - The last row must have `setTransactionEnd(true)`. + - Every row must have `setSourcePartition(partition)`. + - All rows must be emitted synchronously within a single `processElement()` call. + +2. **keyBy before sink is mandatory**: `rows.keyBy(DefaultStarRocksRowData::getSourcePartition).addSink(sink)` — omitting this breaks within-partition transaction ordering. + +3. **Checkpoint is decoupled from commit**: Checkpoint interval can be set to a large value (for example, 60 seconds) for fault recovery. Data visibility is governed by `sink.buffer-flush.interval-ms` (for example, 1000ms). + +4. **Keep routing strategies stable**: Avoid single transactions writing to an excessive number of distinct tables, which increases transaction duration and failure probability. + +5. **Fault injection testing before production**: Kill TaskManagers / introduce network jitter and verify data correctness after checkpoint recovery. diff --git a/docs/en/unloading/Flink_connector.md b/docs/en/unloading/Flink_connector.md index 96a80b7da2c145..1615b98892953d 100644 --- a/docs/en/unloading/Flink_connector.md +++ b/docs/en/unloading/Flink_connector.md @@ -33,10 +33,10 @@ Unlike the JDBC connector provided by Flink, the Flink connector of StarRocks su | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 and later | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 | ## Prerequisites diff --git a/docs/ja/ecosystem_release/flink_connector.md b/docs/ja/ecosystem_release/flink_connector.md index 5de2378eb5d228..1c2766053aa3ad 100644 --- a/docs/ja/ecosystem_release/flink_connector.md +++ b/docs/ja/ecosystem_release/flink_connector.md @@ -29,10 +29,10 @@ description: "StarRocks Connector for Apache Flink のリリースノート・ | コネクタ | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 以降 | 8 | 2.11,2.12 | > **注意** > @@ -42,6 +42,24 @@ description: "StarRocks Connector for Apache Flink のリリースノート・ ### 1.2 +#### 1.2.15 + +リリース日:2026年6月18日 + +##### 機能 + +- マルチテーブルトランザクションの Stream Load サポートを追加しました。[#487](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/487) + +##### 改善点 + +- Merge Commit でデータ品質エラーメッセージのログ記録に対応しました。[#484](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/484) + +##### バグ修正 + +- マルチテーブルトランザクションの並行処理を修正:テーブルごとのロードをシリアル化し、テーブル間のコミットを同期化しました。[#491](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/491) +- PREPARE状態の残留トランザクションでロールバックが失敗した場合、FEのキャンセルAPIにフォールバックするようにしました。[#488](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/488) +- カラム作成ステートメントを生成する際、DEFAULT句内のCURRENT_TIMESTAMPを引用符で囲まない。[#486](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/486) + #### 1.2.14 リリース日: 2026年2月11日 diff --git a/docs/ja/loading/Flink-connector-starrocks.md b/docs/ja/loading/Flink-connector-starrocks.md index 6eb3d59bd87851..26bf33ddd6d5ab 100644 --- a/docs/ja/loading/Flink-connector-starrocks.md +++ b/docs/ja/loading/Flink-connector-starrocks.md @@ -17,10 +17,10 @@ Flink コネクタは DataStream API、Table API & SQL、Python API をサポー | コネクタ | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 以降 | 8 | 2.11,2.12 | ## Flink コネクタの取得 @@ -77,13 +77,13 @@ Maven プロジェクトの `pom.xml` ファイルに、以下の形式で Flink sh build.sh ``` - 例えば、環境の Flink バージョンが 1.15 の場合、以下のコマンドを実行する必要があります。 + 例えば、環境の Flink バージョンが 1.16 の場合、以下のコマンドを実行する必要があります。 ```bash - sh build.sh 1.15 + sh build.sh 1.16 ``` -3. `target/` ディレクトリに移動し、コンパイルによって生成された Flink コネクタ JAR ファイル(例: `flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar`)を見つけます。 +3. `target/` ディレクトリに移動し、コンパイルによって生成された Flink コネクタ JAR ファイル(例: `flink-connector-starrocks-1.2.7_flink-1.16-SNAPSHOT.jar`)を見つけます。 > **注意** > diff --git a/docs/ja/loading/Flink_multi_table_transaction.md b/docs/ja/loading/Flink_multi_table_transaction.md new file mode 100644 index 00000000000000..07b1ec1c89319f --- /dev/null +++ b/docs/ja/loading/Flink_multi_table_transaction.md @@ -0,0 +1,667 @@ +--- +displayed_sidebar: docs +description: "Flinkジョブのマルチテーブルトランザクションを有効にして、1回の処理サイクルで同じデータベース内の複数のテーブルに書き込みます。" +--- + +# Apache Flink®からマルチテーブルトランザクションでデータをロードする + +StarRocks Flink Connectorはマルチテーブルトランザクションをサポートしており、FlinkからStarRocksの複数のテーブルにアトミックにデータをロードできます。 + +## ユースケース + +単一のFlinkジョブが1つの処理サイクルで同じStarRocksデータベース内の複数のテーブルに書き込む場合、マルチテーブルトランザクションを有効にすることで以下が保証されます: + +- **クロステーブルアトミックコミット**:同じコミットサイクル内で異なるテーブルに書き込まれたデータはアトミックに可視化されます — すべて成功するか、すべて失敗するかのどちらかです。 +- **ソーストランザクションの整合性**:完全なアップストリームトランザクション(例:Kafkaからのもの)が2つのStarRocksトランザクションに分割されることはありません。 +- **サブ秒のデータ鮮度**:データは`/api/transaction/load`を介してStarRocksに継続的に流れ込み、`sink.buffer-flush.interval-ms`で設定された間隔でコミットされます。 + +典型的なシナリオ: + +- 一般テーブルと詳細テーブルへの同期書き込み(例:`orders`と`order_items`) +- 異なるパーティションテーブルへのイベントルーティング(例:`events_202601`、`events_202602`) +- 複数の相互関連するダウンストリーム結果テーブルを管理する単一ジョブ + +:::tip[前提条件] +マルチテーブルトランザクションを有効にするには、StarRocks v4.0以降(マルチテーブルトランザクションStream Loadサポートを含む)でクラスターを実行し、StarRocks Flink Connectorをv1.2.9以降で使用する必要があります。 +::: + +## 主要機能 + +| 機能 | 説明 | +| ---------------------------- | ------------------------------------------------------------ | +| クロステーブルアトミックコミット | 同じフラッシュサイクル内のすべてのテーブルは1つのStarRocksトランザクションラベルを共有します。PrepareおよびCommit操作は統一されています。 | +| ソーストランザクションの整合性 | コミットタイミングは`transactionEnd`フラグによって制御されます。コミットは完全なソーストランザクション境界でのみ発生します。 | +| サブ秒のデータ可視性 | データは定期的にStarRocksにフラッシュされます(`/api/transaction/load`)。`transactionEnd`とタイマー条件が満たされたときにコミットされます。 | +| N:1トランザクションマッピング | 複数のソーストランザクションを単一のStarRocksトランザクションに蓄積できます。1:1でマッピングする必要はありません。 | +| パーティション内の順序保証 | `keyBy(sourcePartition)`は、同じパーティションからのトランザクションが同じシンクサブタスク内で順番に処理されることを保証します。 | + +## 設定 + +### マルチテーブルトランザクションの設定 + +#### `sink.transaction.multi-table.enabled` + +- 型:Boolean +- デフォルト:`false` +- 説明:マルチテーブルアトミックトランザクションモードを有効にするかどうか。 + +#### `sink.transaction.multi-table.buffer-size` + +- 型:Long +- デフォルト:`134217728`(128 MB) +- 単位:バイト +- 説明:マルチテーブルトランザクションモードのグローバルバッファサイズ(バイト単位)。すべてのテーブルにわたるバッファリングされたデータの合計がこのしきい値に達すると、フラッシュがトリガーされます。 + +### ロード関連の設定 + +#### `sink.version` + +- 推奨値:`V2` +- 説明:必須。`V1`はトランザクションStream Loadインターフェースをサポートしていません。 + +#### `sink.semantic` + +- 推奨値:`at-least-once` +- 説明:マルチテーブルモードは現在`at-least-once`のみをサポートしています。 + +#### `database-name` + +- 推奨値:`*` +- 説明:動的マルチテーブルルーティングを有効にするためのワイルドカード。 + +#### `table-name` + +- 推奨値:`*` +- 説明:動的マルチテーブルルーティングを有効にするためのワイルドカード。 + +#### `sink.buffer-flush.interval-ms` + +- 推奨値:`1000` +- 説明:コミットサイクルを制御します。約1秒の鮮度を実現するために`1000`に設定できます。 + +#### `sink.properties.format` + +- 推奨値:`json` +- 説明:データフォーマット。 + +#### `sink.properties.strip_outer_array` + +- 推奨値:`true` +- 説明: 最外部の配列構造を取り除くかどうか。 + +## インターフェース + +### `StarRocksRowData` + +```java +public interface StarRocksRowData { + String getUniqueKey(); // Region routing key (nullable; auto-derived from database.table) + String getDatabase(); // Target database + String getTable(); // Target table + String getRow(); // Row data in JSON format + + /** + * Indicates this is the last row of a source transaction batch. + * Used by multi-table transaction mode to determine safe commit points: + * the connector only commits when the most recent write had this flag set, + * ensuring no partial source transaction is committed. + */ + default boolean isTransactionEnd() { + return false; + } + + /** + * Returns the source partition ID for this row. + * Used by multi-table transaction mode to track per-partition transaction + * boundaries. Returns -1 when partition tracking is not applicable. + */ + default int getSourcePartition() { + return -1; + } +} +``` + +### `DefaultStarRocksRowData` + +```java +public class DefaultStarRocksRowData implements StarRocksRowData { + // 基本フィールド + private String uniqueKey; + private String database; + private String table; + private String row; + + // マルチテーブルトランザクションフィールド + private boolean transactionEnd; // Source transaction end marker + private int sourcePartition = -1; // Source partition ID (for keyBy ordering) + + // コンストラクタ + public DefaultStarRocksRowData(); + public DefaultStarRocksRowData(String database, String table); + public DefaultStarRocksRowData(String uniqueKey, String database, String table, String row); + + // セッター + public void setUniqueKey(String uniqueKey); + public void setDatabase(String database); + public void setTable(String table); + public void setRow(String row); + public void setTransactionEnd(boolean transactionEnd); + public void setSourcePartition(int sourcePartition); + + // ゲッター(StarRocksRowDataから継承) + public String getUniqueKey(); + public String getDatabase(); + public String getTable(); + public String getRow(); + public boolean isTransactionEnd(); + public int getSourcePartition(); +} +``` + +### ユーザー実装コンポーネント + +ユーザーは `KeyedProcessFunction`(本ドキュメントでは `TransactionAssembler` と呼ぶ)を実装する必要があります。これは以下の条件を満たすものです: + +1. ソースパーティションをキーとし、トランザクション内のデータ行をバッファリングする +2. ソーストランザクションがクローズされたとき(例: `TXN_END` を受信したとき)にのみ、すべての行を出力する +3. 最後の行に `transactionEnd=true` を設定する +4. すべての行に `sourcePartition` を設定する + +カスタム `SinkFunction` は不要です — 標準コネクタ API(`SinkFunctionFactory.createSinkFunction()`)がすべてを処理します。 + +## 完全な例 + +### StarRocks テーブル DDL + +```sql +CREATE DATABASE `test`; + +CREATE TABLE `test`.`orders` ( + `order_id` BIGINT NOT NULL, + `customer_id` BIGINT NOT NULL, + `total_amount` DECIMAL(10,2) DEFAULT "0", + `order_status` VARCHAR(32) DEFAULT "" +) ENGINE=OLAP PRIMARY KEY(`order_id`) +DISTRIBUTED BY HASH(`order_id`) +PROPERTIES("replication_num" = "1"); + +CREATE TABLE `test`.`order_items` ( + `item_id` BIGINT NOT NULL, + `order_id` BIGINT NOT NULL, + `product_name` VARCHAR(128) DEFAULT "", + `quantity` INT DEFAULT "0", + `price` DECIMAL(10,2) DEFAULT "0" +) ENGINE=OLAP PRIMARY KEY(`item_id`) +DISTRIBUTED BY HASH(`item_id`) +PROPERTIES("replication_num" = "1"); +``` + +### Flink ジョブコード + +```java +import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData; +import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class WriteMultipleTablesWithTransaction { + + // ===================================================== + // 1. ソースイベントモデル(ビジネスロジックに応じて定義) + // ===================================================== + + enum EventType { TXN_BEGIN, DATA, TXN_END } + + static class TxnEvent implements Serializable { + private static final long serialVersionUID = 1L; + + int partition; + String txnId; + EventType type; + String database; + String table; + String json; + + TxnEvent() {} + + TxnEvent(int partition, String txnId, EventType type, + String database, String table, String json) { + this.partition = partition; + this.txnId = txnId; + this.type = type; + this.database = database; + this.table = table; + this.json = json; + } + + static TxnEvent begin(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_BEGIN, null, null, null); + } + + static TxnEvent data(int partition, String txnId, String db, String table, String json) { + return new TxnEvent(partition, txnId, EventType.DATA, db, table, json); + } + + static TxnEvent end(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_END, null, null, null); + } + } + + // ===================================================== + // 2. TransactionAssembler — コアユーザーコンポーネント + // ===================================================== + + /** + * Buffers DATA events per partition; on TXN_END emits the complete + * transaction's rows as individual DefaultStarRocksRowData records. + * + * Only emits when a source transaction is fully closed (TXN_END received). + * All rows are emitted synchronously within one processElement() call, + * so they enter the downstream sink buffer without intervening checkpoint barriers. + * + * Multiple source transactions accumulate in the sink's buffer between + * flush cycles — the connector handles grouping them into StarRocks transactions. + */ + static class TransactionAssembler + extends KeyedProcessFunction { + + private transient ListState pendingRows; + + @Override + public void open(Configuration parameters) throws Exception { + ListStateDescriptor descriptor = new ListStateDescriptor<>( + "pending-txn-rows", Types.POJO(TxnEvent.class)); + pendingRows = getRuntimeContext().getListState(descriptor); + } + + @Override + public void processElement(TxnEvent event, Context ctx, + Collector out) throws Exception { + switch (event.type) { + case TXN_BEGIN: + pendingRows.clear(); + break; + + case DATA: + pendingRows.add(event); + break; + + case TXN_END: + List rows = new ArrayList<>(); + for (TxnEvent row : pendingRows.get()) { + rows.add(row); + } + int partition = ctx.getCurrentKey(); + for (int i = 0; i < rows.size(); i++) { + TxnEvent row = rows.get(i); + DefaultStarRocksRowData rowData = new DefaultStarRocksRowData( + null, row.database, row.table, row.json); + rowData.setSourcePartition(partition); + // 最後の行をトランザクション終了としてマーク + if (i == rows.size() - 1) { + rowData.setTransactionEnd(true); + } + out.collect(rowData); + } + pendingRows.clear(); + break; + } + } + } + + // ===================================================== + // 3. メインプログラム + // ===================================================== + + public static void main(String[] args) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030"); + String loadUrl = params.get("loadUrl", "127.0.0.1:8030"); + String userName = params.get("userName", "root"); + String password = params.get("password", ""); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10_000); // Checkpoint is for recovery only; does not affect commit cycle + + // --- ソース --- + // TxnEventにデシリアライズする実際のKafkaソースに置き換えてください + DataStream events = env.addSource(/* KafkaSource or MockTxnEventSource */); + + // --- ステップ1:パーティションごとに完全なトランザクションを組み立てる --- + // TransactionAssemblerはTXN_END後にのみ行を出力します。 + // 複数のクローズされたソーストランザクションがフラッシュ間にシンクバッファに蓄積されます。 + DataStream rows = events + .keyBy(e -> e.partition) + .process(new TransactionAssembler()); + + // --- ステップ2:シンクへのパーティションアフィニティルーティング --- + // keyBy(sourcePartition)は同じパーティションのデータを同じシンクサブタスクにルーティングします。 + // コネクタは内部でパーティションごとのリージョンを使用するため、複数の + // パーティションが同じシンクサブタスクに配置された場合でも、トランザクション境界は + // PartitionCommitTrackerを介してパーティションごとに独立して追跡されます。 + DataStream partitionedRows = rows + .keyBy(DefaultStarRocksRowData::getSourcePartition); + + // --- ステップ3: コネクタを設定する --- + // sink.transaction.multi-table.enabled=true はパーティションごとのリージョンを有効にします + // StreamLoadManagerV2 内でのトラッキング: 各パーティションのリージョンは切り替えられます + // txnEnd が到着したときに独立して切り替わり、すべての + // アクティブなパーティションが切り替えられたときにのみコミットがトリガーされます。 + StarRocksSinkOptions options = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", jdbcUrl) + .withProperty("load-url", loadUrl) + .withProperty("database-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("table-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("username", userName) + .withProperty("password", password) + .withProperty("sink.version", "V2") // Required: V2 + .withProperty("sink.semantic", "at-least-once") + .withProperty("sink.transaction.multi-table.enabled", "true") // Enable multi-table txn + .withProperty("sink.buffer-flush.interval-ms", "1000") // ~1s data freshness + .withProperty("sink.properties.format", "json") + .withProperty("sink.properties.strip_outer_array", "true") + .build(); + + // オプション: テーブルごとのストリームロードプロパティ + StreamLoadTableProperties orderItemsProps = StreamLoadTableProperties.builder() + .database("test") + .table("order_items") + .addProperty("format", "json") + .addProperty("strip_outer_array", "true") + .addProperty("ignore_json_size", "true") + .build(); + options.addTableProperties(orderItemsProps); + + // --- ステップ4: シンクを作成してアタッチする --- + // 標準コネクタ API — カスタム SinkFunction は不要です。 + // keyBy されたストリームに addSink を使用することで、パーティションアフィニティが確保されます。 + SinkFunction sink = SinkFunctionFactory.createSinkFunction(options); + partitionedRows.addSink(sink); + + env.execute("WriteMultipleTablesWithTransaction"); + } +} +``` + +### データフロートポロジー + +``` +Kafka (60 partitions) + | + v +keyBy(partition) ———— Ensures same-partition events go to the same subtask + | + v +TransactionAssembler (KeyedProcessFunction) + | Buffers DATA events + | On TXN_END: emits all rows (last row has transactionEnd=true) + | Every row carries sourcePartition + | + v +keyBy(sourcePartition) ———— Ensures same-partition rows go to the same sink subtask + | + v +StarRocksDynamicSinkFunctionV2 (via SinkFunctionFactory.createSinkFunction) + | + | +——————————— StreamLoadManagerV2 (multi-table txn mode) —————————-——+ + | | | + | | Per-partition, per-table regions: | + | | Region(P0, orders), Region(P0, order_items) | + | | Region(P2, orders), Region(P2, order_items) | + | | | + | | Each region tracks: | + | | - activeChunk / inactiveChunks | + | | - lastSwitchTimeMs (for miniInterval batching) | + | | - activeChunkCleanBoundary (true iff last task event is txnEnd)| + | | | + | | write(partition, db, table, row) [task thread] | + | | -> routes to Region(partition, db, table) | + | | -> write0 sets activeChunkCleanBoundary = false | + | | -> In multi-table mode write0 does NOT switchChunk, so | + | | activeChunk only freezes at a txnEnd boundary | + | | | + | | setCommitAllowed(partition, txnEnd=true) [task thread] | + | | -> region.tryMiniIntervalSwitch(): | + | | sets activeChunkCleanBoundary = true | + | | if (now - lastSwitchTimeMs >= miniInterval | + | | && activeChunk has data): switchChunkForCommit | + | | else: data batches into activeChunk with subsequent | + | | completed source transactions (N:1 mapping) | + | | -> PartitionCommitTracker.onTxnEnd(partition) | + | | | + | | SharedTransactionCoordinator: | + | | -> eagerly opens shared txn before any flush | + | | -> all autonomous flushes use the shared label | + | | -> recycles idle txn at 80% of server timeout | + | | | + | | Manager thread (every scanningFrequency): | + | | -> tryForceCleanSwitch per region: | + | | if cleanBoundary && has data && miniInterval elapsed | + | | -> switchChunkForCommit (source-idle fallback) | + | | -> tryStartTimerDrivenCommit: | + | | if commitInterval elapsed && hasDataLoaded | + | | -> set commitInFlight = true | + | | -> autonomous flush: drain inactiveChunks via streamLoad | + | | (never touches activeChunk in multi-table mode) | + | | | + | | Manager thread (commitInFlight=true): | + | | -> triggerLoadIfNeeded per region (HTTP /api/transaction/load) | + | | -> wait all loads complete | + | | -> unified commit via SharedTransactionCoordinator | + | | -> reset tracker; open next shared txn | + | +———————————————————————————————————————————————————————————————————+ + | + v +StarRocks (test.orders + test.order_items) +``` + +## 動作の仕組み + +### チャンクのライフサイクルと `miniInterval` バッチ処理 + +各 `(partition, table)` リージョンには、1 つの `activeChunk`(現在書き込みを受け付けている)と、`inactiveChunks`(凍結済みデータ、HTTP ロード待ち)の FIFO キューがあります。マルチテーブルトランザクションモードでは、**のみ** `activeChunk` から `inactiveChunks` へデータを移動する方法は `switchChunkForCommit` を介して行われ、これは正確に 3 か所から呼び出されます: + +1. **タスクスレッド、txnEnd 時** — `setCommitAllowed(partition, true)` 内の `region.tryMiniIntervalSwitch()` を介して。これが一般的なパスです。 + +2. **マネージャースレッド、ソースアイドルフォールバック** — すべてのスキャンサイクルで `region.tryForceCleanSwitch()` を介して、`activeChunk` がクリーンなトランザクション境界でアイドル状態になっているリージョンに対して実行されます。 + +3. **マネージャースレッド、セーブポイント/リサイクル** — すべてのリージョンがクリーンな境界にあることを確認した後、すべてのリージョンを強制切り替えします。 + +高スループット CDC において 1 ソーストランザクションごとに 1 回の HTTP ロードが発生するのを避けるため、タスクスレッドは同一リージョンで前回の切り替えから少なくとも `miniSwitchIntervalMs` が経過した場合にのみ切り替えを実行します。`miniSwitchIntervalMs` は `min(1000 ms, max(100 ms, commitInterval / 10))` として計算されるため、コミット間隔が 1 秒の場合は 100 ms でバッチ処理され、30 秒の場合は最大 1 秒でバッチ処理が上限となります。miniInterval ウィンドウ内では、複数の完了済みソーストランザクションが同じ `activeChunk` に蓄積され(N:1 マッピング)、次の切り替え時にまとめて凍結されます。 + +各リージョンには、これらの判断を駆動する 2 つのフィールドがあります: + +- `lastSwitchTimeMs` — 最後の切り替えのエポック ms。初期値は 0 であるため、リージョン作成後の最初の txnEnd は常に切り替えをトリガーします。 + +- `activeChunkCleanBoundary` — このリージョンに対する最新のタスクスレッドイベントが `onTxnEnd` または `switchChunk` のいずれかであった場合は `true`。`write()` の後は `false`。マネージャースレッドの `tryForceCleanSwitch` はこのフラグが `true` のときにのみ実行されるため、ソーストランザクションの部分的なデータを凍結することはありません。 + +### タスクスレッド — 書き込みと txnEnd + +``` +invoke(record) [Flink task thread] + | + | if record is a data row: + | write(partition, db, table, row) + | region.write(row) → addRow(); cleanBoundary = false + | + | if record carries transactionEnd=true: + | setCommitAllowed(partition, true) + | for each region owned by this partition: + | region.tryMiniIntervalSwitch(): + | cleanBoundary = true // always (txnEnd observed) + | if (now - lastSwitchTimeMs >= miniInterval + | && activeChunk has data): + | switchChunkForCommit() // freezes activeChunk + | partitionTracker.onTxnEnd(partition) // safety bookkeeping only +``` + +タスクスレッドは `write()` および `setCommitAllowed()` イベントの唯一のシリアライザーであるため、`cleanBoundary` マークと条件付き切り替えの両方をここで実行する必要があります(マネージャースレッドに委譲してはなりません): `cleanBoundary` は常に最新のタスクスレッドイベントを反映していなければならず、そうでなければマネージャーのアイドルフォールバックが書き込みと競合して部分的なデータを凍結する可能性があります。 + +### 6.3 マネージャースレッド — 時間駆動コミット + +マネージャースレッドは `scanningFrequency` でスキャンループを実行します。各イテレーションでは: + +1. **共有トランザクションの確保**: 新しいものを開く(先行)か、StarRocks サーバー側タイムアウト(`timeout` ヘッダーの 80%、デフォルト 480 秒)に近づいている場合は現在のものを積極的にリサイクルします。 + +2. **ソースアイドルフォールバック**: すべてのリージョンに対して `region.tryForceCleanSwitch()` を呼び出し、クリーンかつ少なくとも `miniInterval` の間アイドル状態にある `activeChunk` を凍結します。これは、タスクスレッドが新しい切り替えを発行する前に停止した「数回の txnEnd 後にソースが一時停止した」ケースを処理します。 + +3. **時間駆動コミットトリガー**: `tryStartTimerDrivenCommit()` を呼び出し、**両方** 条件が成立する場合に `commitInFlight=true` を設定します: + + - `now - lastCommitTimeMs >= commitInterval`(設定された`sink.buffer-flush.interval-ms`)。 + + - コミットするデータが存在します — `txnCoordinator.hasDataLoaded()`がtrueであるか、少なくとも1つのリージョンにまだ保留中のinactiveChunksがあります。 + +4. **自律フラッシュ**:`inactiveChunks`が空でないリージョンを`FlushAndCommitStrategy`経由でドレインします。マルチテーブルモードの`flush()`は、既にフリーズされた非アクティブチャンクのみをストリームアウトします — それは**決して**`activeChunk`に触れます。これにより、共有ラベルの下でStarRocksに到達するすべてのチャンクが完了したソーストランザクションから来るという不変条件が保持されます。 + +`commitInFlight=true`のとき、メインループは`processMultiTableCommit`に入り、インフライトのロードを待機し、残りの非アクティブチャンクのロードをトリガーし、`SharedTransactionCoordinator`を介して統合コミットを実行し、`lastCommitTimeMs`を更新し、トラッカーをリセットして、次のサイクルのために新しい共有トランザクションを開きます。 + +### 共有トランザクションの調整 + +同じコミットサイクル内のすべてのテーブルは、`SharedTransactionCoordinator`によって管理される単一のStarRocksトランザクションを共有します: + +1. **積極的なトランザクション開始**:共有トランザクションは、自律フラッシュの前に積極的に開かれるため、すべてのHTTPロードは共有ラベルを使用します。これにより、独立ラベルのフラッシュが後で共有トランザクションがラベルを上書きする際に孤立する可能性があるデータ損失ウィンドウが排除されます。 + +2. **統合コミット**:すべてのリージョンのデータがロードされた後、共有ラベルに対して単一の`commit`が実行されます。マルチテーブルトランザクションは、StarRocksがマルチテーブルモードで`TXN_PREPARE`をサポートしていないため、`prepare`ステップをスキップします。 + +3. **アイドルトランザクションのリサイクル**:共有トランザクションがStarRocksサーバー側タイムアウトの80%(デフォルト:600sタイムアウトに対して480s)を超えて開いたままの場合、サーバー側タイムアウトエラーを防ぐために積極的にリサイクル(コミットまたはロールバック+再オープン)されます。いずれかのリージョンに進行中のトランザクションデータがある場合(クリーン境界違反)、またはいずれかのパーティションがtxnEndを受け取ることなくデータを書き込んだ場合、リサイクルは即座に失敗します。 + +### 6.5 PartitionCommitTracker(安全性の記録管理) + +現在の設計では、コミットのタイミングはコミット間隔とリージョンごとのクリーン境界フラグによって完全に制御されます。`PartitionCommitTracker`は情報提供/安全補助に縮小されています: + +- `onWrite(partition)`は最初の書き込み時にパーティションを`ACTIVE`として登録します。 + +- `onTxnEnd(partition)`はパーティションを`TXN_END_SEEN`に遷移させます(スティッキー — 後続の書き込みは**しない**`ACTIVE`に降格しません)。 + +- `getPartitionsWithoutTxnEnd()`は、データを書き込んだがtxnEndを受け取ったことがないパーティションを一覧表示します。セーブポイントおよびリサイクルが、上流のコントラクト違反(クローズされなかったソーストランザクション)で即座に失敗するために使用されます。 + +- `reset()`はコミットサイクルの終了時にすべてのパーティションをクリアします。 + +トラッカーはスイッチ/コミットの決定を駆動しなくなり、`SWITCHED`状態を追跡せず、保留中のtxnEndシグナルを管理しません。コミット後にデータの生成を停止したパーティションは`reset()`によって単純にクリアされます。後で再開した場合、次の`onWrite`が再登録します。 + +### 共有ラベルを使用した自律フラッシュ + +リージョンの`inactiveChunks`が空でなくなると、マネージャースレッドの自律フラッシュループは、`/api/transaction/load`を介して**現在の共有ラベル**の下でStarRocksにストリームします。共有トランザクションは常にデータがロードされる前に開かれるため、データが独立した(孤立した)ラベルの下でロードされる可能性のあるウィンドウは存在しません。マルチテーブルモードでは、`flush()`は`switchChunk`をトリガーしません — 既にフリーズされた非アクティブチャンクのみをドレインします — そのため、「共有ラベルの下でStarRocksに到達するすべてのチャンクは完了したソーストランザクションから来る」という不変条件は無条件に成立します。 + +### 安全性の保証 + +| 保証 | メカニズム | +| ------------------------------------------------- | ----------------------------------------------------------------------------- | +| switchChunkはソーストランザクションを分割しない | `switchChunkForCommit`はクリーントランザクション境界(txnEnd時、または`activeChunkCleanBoundary`が`true`の場合)でのみ呼び出されます。 | +| コミットには部分的なソーストランザクションが含まれない | StarRocksに到達するすべてのチャンクは`switchChunkForCommit`から発生します。自律フラッシュはマルチテーブルモードで`activeChunk`を切り替えません。 | +| パーティションごとの分離 | 各`(partition, table)`は独自のリージョンを持ちます。あるパーティションのスイッチは別のパーティションのデータに影響しません。 | +| パーティション内の順序付け | `keyBy(sourcePartition)`は同じパーティションの行を同じシンクサブタスクにルーティングします。 | +| タスクスレッドはノンブロッキング | `tryMiniIntervalSwitch`はO(パーティション内のリージョン数)です。HTTPの作業はマネージャースレッドで非同期に行われます。 | +| ソースアイドル時のデータ可視性 | マネージャースレッドの`tryForceCleanSwitch`は、`miniInterval`のアイドル後にクリーンな`activeChunk`をフリーズするため、ソースが一時停止しても`commitInterval + miniInterval`内でデータが可視のままになります。 | +| 自律フラッシュはトランザクションセーフ | すべてのロードは共有ラベルを使用します。すべてのフリーズされたチャンクは完了したソーストランザクションからのものです。 | +| アイドルトランザクションはタイムアウトしない | 共有トランザクションはサーバータイムアウトの80%でリサイクルされます。リサイクルは進行中のデータで即座に失敗します。 | +| パーティションごとの独立したコミット | 完了したソーストランザクションを持つパーティションは、他のパーティションの進行中のトランザクションとは独立して、次のコミット間隔でコミットされます。 | + +### N:1 トランザクションマッピング + +複数のソーストランザクションは、miniIntervalバッチングメカニズムを介して単一のStarRocksトランザクションに蓄積できます: + +``` +Source txn K1 (3 rows) -> write -> activeChunk -> txnEnd (1st switch) +Source txn K2 (2 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) +Source txn K3 (4 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) + (miniInterval elapsed → next switch batches K2+K3) + -> ... +commitInterval elapsed + -> commit(label=A) + -> K1 + K2 + K3 atomically committed to StarRocks +``` + +コミットの決定は時間駆動(特定のtxnEndに紐付けられていない)であるため、コネクターは設定変更なしに多くの小さなソーストランザクション全体でHTTPロードおよびbegin/commitのオーバーヘッドを分散します。`commitInterval=1 s`、`miniInterval=100 ms`で毎秒100件のtxnEndを発行するCDCソースの場合、コネクターは毎秒約100回ではなく最大約10回のロード呼び出しを発行します。 + +## 制限事項 + +- **`sink.version=V2`が必要**:V1はトランザクションストリームロードをサポートしていません。 + +- **少なくとも1回のみ**:失敗した再試行により重複書き込みが発生する可能性があります。マルチテーブルモードは同じバッチ内のすべてのテーブルが一緒に成功または失敗することを保証しますが、グローバルな正確に1回の保証は提供しません。PRIMARY KEYテーブルの場合、重複書き込みはべき等(upsert)です。 + +- **すべてのテーブルは同じデータベースに存在する必要があります**:StarRocksのマルチテーブルトランザクションはデータベーススコープです。クロスデータベーストランザクションはサポートされていません。 + +- **トランザクションスコープはシンクサブタスクごと+パーティションごと**:各シンクサブタスクは、独自のStarRocksトランザクションを独立して維持します。アトミック性は保証されています**単一のソーストランザクション内**(1つのパーティション上の1つのtxnEndに対するすべての行、そのパーティションが書き込むすべてのテーブルにわたって)。**異なる**ソースパーティション間のデータ可視性はインターリーブする可能性があります:パーティションP0のソーストランザクションが完全に到着し、コミット間隔が経過すると、パーティションP1のソーストランザクションがまだ進行中であっても、P0のデータはコミットされます。StarRocksレベルでクロスパーティションのアトミック性を必要とするアプリケーションは、単一のソースパーティションを使用するか、上流でコミットを調整する必要があります。 + +- **データ可視性レイテンシ**:`sink.buffer-flush.interval-ms` および内部の `miniInterval = min(1000, max(100, commitInterval/10))` によって制御されます。継続的に流れるCDCストリームでは、個々の行はソースコミットからおよそ `commitInterval + miniInterval` 以内にStarRocksで可視になります。ソースが一時停止している間、以前にコミットされたデータは引き続き可視であり、最後のスイッチと一時停止の間の行は、さらに1回の `miniInterval` の後に可視になります(マネージャースレッドのクリーンバウンダリフォールバックによって処理されます)。 + +- **StarRocksクラスターのトランザクション設定に依存**:実行中のトランザクション制限、準備済みタイムアウト(デフォルト600秒)、およびラベルの保持を監視してください。`sink.buffer-flush.interval-ms` がStarRocksのトランザクションタイムアウトよりも大幅に短いことを確認してください。 + +- **長いソーストランザクション下での `activeChunk` メモリ増大**:マルチテーブルモードはチャンクサイズによる内部スイッチングを無効にするため(クリーントランザクション境界の不変条件を保持するため)、`activeChunk` は次のtxnEndが到着するまで増加し続ける可能性があります。メモリは `sink.transaction.multi-table.buffer-size`(ソフト)および `2 × buffer-size`(`blockIfCacheFull` によるハード)によって制限されます。非常に大きなソーストランザクションはバックプレッシャーによってタスクスレッドをスロットルします。これが常態化する場合は、上流でソーストランザクションを分割するか、`sink.transaction.multi-table.buffer-size` を増やしてください。 + +- **クロスデータベース書き込みは拒否されます**:マルチテーブルトランザクションは、すべてのリージョンが同じデータベースに属していることを検証します。同じコミットサイクル内で異なるデータベースのテーブルに書き込むとエラーが発生します。 + +- **マージコミットとは互換性がありません**:`sink.properties.enable_merge_commit=true` は `sink.transaction.multi-table.enabled=true` と組み合わせることができません。マージコミットは `MergeCommitManager` を通じて書き込みをルーティングしますが、これはマルチテーブルモードがトランザクション境界のために依存するパーティション対応の `write(int, ...)` / `setCommitAllowed(int, ...)` フックを持っていません。両方が有効になっている場合、コネクターは検証時に即座に失敗します。 + +## 監視とトラブルシューティング + +推奨メトリクス: + +- **Flink側** + - チェックポイント成功率 + - チェックポイント所要時間 + - シンクのフラッシュ/コミットレイテンシ +- **StarRocks側** + - 実行中/準備済みトランザクション数 + - トランザクションタイムアウトの発生 + - ラベルの競合 + +### よくある問題 + +#### `transaction not existed` + +- 原因:StarRocksのトランザクションタイムアウト +- 解決策:コネクターはサーバータイムアウトの80%でアイドルトランザクションを自動的にリサイクルします。それでも発生する場合は、準備済みタイムアウトが短すぎるか、フラッシュ間隔が大きすぎないか確認してください。 + +#### `too many running txns` + +- 原因:同時トランザクションが過剰です。 +- 解決策:シンクの並列度を下げるか、StarRocks FE設定 `max_running_txn_num_per_db` の値を増やしてください。 + +#### `Transaction start failed` + +- 原因:`beginTransaction` HTTPコールが失敗しました +- 解決策:load-urlの接続性とStarRocksのバージョン(v4.0以降が必要)を確認してください。 + +#### データ可視性レイテンシが高い + +- 原因:コミット条件が満たされていません。 +- 解決策:上流データに正しい `transactionEnd=true` マーカーがあることを確認してください。行ごとに最大 `commitInterval + miniInterval` のレイテンシを想定してください。レイテンシがこの予算を超える場合は、マネージャースレッドがリサイクルまたはインフライトロードで停止していないか確認してください(`StarRocks-Sink-Manager` ログを参照)。 + +#### クロスデータベース書き込みエラー + +- 原因:異なるデータベースのテーブルが同じコミットサイクルに含まれています。 +- 解決策:同じジョブで書き込まれるすべてのテーブルが同じStarRocksデータベースに属していることを確認してください。 + +## ベストプラクティス + +1. **TransactionAssemblerコントラクト**: + + - ソーストランザクションが完全にクローズされた後にのみ行を出力します。 + - 最後の行には`setTransactionEnd(true)`が必要です。 + - すべての行には`setSourcePartition(partition)`が必要です。 + - すべての行は単一の`processElement()`呼び出し内で同期的に出力される必要があります。 + +2. **シンク前のkeyByは必須**: `rows.keyBy(DefaultStarRocksRowData::getSourcePartition).addSink(sink)` — これを省略するとパーティション内のトランザクション順序が壊れます。 + +3. **チェックポイントはコミットから切り離されています**: チェックポイント間隔はフォールト回復のために大きな値(例:60秒)に設定できます。データの可視性は`sink.buffer-flush.interval-ms`(例:1000ms)によって制御されます。 + +4. **ルーティング戦略を安定させる**: 過剰な数の異なるテーブルへの書き込みを行う単一トランザクションを避けてください。これによりトランザクション時間と失敗確率が増加します。 + +5. **本番前のフォールトインジェクションテスト**: TaskManagersを強制終了したり、ネットワークジッターを導入し、チェックポイント回復後のデータ正確性を検証してください。 diff --git a/docs/ja/unloading/Flink_connector.md b/docs/ja/unloading/Flink_connector.md index 43dc647193745f..aee5f27e85d176 100644 --- a/docs/ja/unloading/Flink_connector.md +++ b/docs/ja/unloading/Flink_connector.md @@ -33,10 +33,10 @@ Flink が提供する JDBC コネクタとは異なり、StarRocks の Flink コ | コネクタ | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 以降 | 8 | 2.11,2.12 | ## 前提条件 diff --git a/docs/zh/ecosystem_release/flink_connector.md b/docs/zh/ecosystem_release/flink_connector.md index b7694c02366c7f..754a97cd43137f 100644 --- a/docs/zh/ecosystem_release/flink_connector.md +++ b/docs/zh/ecosystem_release/flink_connector.md @@ -31,10 +31,10 @@ description: "StarRocks Connector for Apache Flink 的发布说明和更新日 | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 | > **注意** > @@ -44,6 +44,24 @@ description: "StarRocks Connector for Apache Flink 的发布说明和更新日 ### 1.2 +#### 1.2.15 + +发布日期:2026年6月18日 + +##### 新增特性 + +- 新增对多表事务 Stream Load 的支持。[#487](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/487) + +##### 功能优化 + +- Merge Commit 支持记录数据质量错误消息。[#484](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/484) + +##### 错误修复 + +- 修复多表事务的并发性问题:对每张表的导入操作进行串行化,并协调跨表提交。[#491](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/491) +- 当处于 PREPARE 状态的滞留事务回滚失败时,回退到 FE 取消 API。[#488](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/488) +- 在构建列语句时,DEFAULT 子句中不再对 CURRENT_TIMESTAMP 进行引号处理。[#486](https://github.com/StarRocks/starrocks-connector-for-apache-flink/pull/486) + #### 1.2.14 发布日期:2026年2月11日 diff --git a/docs/zh/loading/Flink-connector-starrocks.md b/docs/zh/loading/Flink-connector-starrocks.md index 749cc63ad1d97e..1920ed86172920 100644 --- a/docs/zh/loading/Flink-connector-starrocks.md +++ b/docs/zh/loading/Flink-connector-starrocks.md @@ -21,10 +21,10 @@ StarRocks 提供的 Flink connector,相比于 Flink 提供的 [flink-connector | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及更高版本 | 8 | 2.11,2.12 | ## 获取 Flink connector @@ -80,13 +80,13 @@ Flink connector JAR 文件的命名格式如下: sh build.sh ``` - 例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令: + 例如,如果您的环境中的 Flink 版本为1.16,您需要执行以下命令: ```Bash - sh build.sh 1.15 + sh build.sh 1.16 ``` -3. 前往 `target/` 目录,找到编译完成的 Flink connector JAR 文件,例如 `flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar`,该文件在编译过程中生成。 +3. 前往 `target/` 目录,找到编译完成的 Flink connector JAR 文件,例如 `flink-connector-starrocks-1.2.7_flink-1.16-SNAPSHOT.jar`,该文件在编译过程中生成。 > **注意**: > diff --git a/docs/zh/loading/Flink_multi_table_transaction.md b/docs/zh/loading/Flink_multi_table_transaction.md new file mode 100644 index 00000000000000..914085649adf80 --- /dev/null +++ b/docs/zh/loading/Flink_multi_table_transaction.md @@ -0,0 +1,667 @@ +--- +displayed_sidebar: docs +description: "为 Flink 作业启用多表事务,以便在一个处理周期内向同一数据库中的多个表写入数据。" +--- + +# 使用 Apache Flink® 通过多表事务加载数据 + +StarRocks Flink Connector 支持多表事务,可将 Flink 中的数据原子性地加载到多个表中。 + +## 使用场景 + +当单个 Flink 作业在一个处理周期内向同一 StarRocks 数据库中的多个表写入数据时,启用多表事务可保证: + +- **跨表原子提交**:在同一提交周期内写入不同表的数据以原子方式可见——要么全部成功,要么全部失败。 +- **源事务完整性**:完整的上游事务(例如来自 Kafka 的事务)不会被拆分到两个 StarRocks 事务中。 +- **亚秒级数据新鲜度**:数据通过 `/api/transaction/load` 持续流入 StarRocks,并按 `sink.buffer-flush.interval-ms` 配置的间隔进行提交。 + +典型场景: + +- 同步写入汇总表和明细表(例如 `orders` 和 `order_items`) +- 将事件路由到不同的分区表(例如 `events_202601`、`events_202602`) +- 单个作业维护多个相互关联的下游结果表 + +:::tip[前提条件] +要启用多表事务,您必须在 StarRocks v4.0 及以上版本(支持多表事务 Stream Load)上运行集群,并使用 v1.2.9 及以上版本的 StarRocks Flink Connector。 +::: + +## 核心能力 + +| 能力 | 描述 | +| ---------------------------- | ------------------------------------------------------------ | +| 跨表原子提交 | 同一刷新周期内的所有表共享一个 StarRocks 事务标签,Prepare 和 Commit 操作统一执行。 | +| 源事务完整性 | 提交时机由 `transactionEnd` 标志控制,仅在完整的源事务边界处进行提交。 | +| 亚秒级数据可见性 | 数据定期刷新到 StarRocks(`/api/transaction/load`),当满足 `transactionEnd` 和定时器条件时进行提交。 | +| N:1 事务映射 | 多个源事务可以在单个 StarRocks 事务中累积,无需按 1:1 映射。 | +| 分区内有序性 | `keyBy(sourcePartition)` 确保来自同一分区的事务在同一 sink 子任务中按顺序处理。 | + +## 配置项 + +### 多表事务配置 + +#### `sink.transaction.multi-table.enabled` + +- 类型:Boolean +- 默认值:`false` +- 描述:是否启用多表原子事务模式。 + +#### `sink.transaction.multi-table.buffer-size` + +- 类型:Long +- 默认值:`134217728`(128 MB) +- 单位:字节 +- 描述:多表事务模式下的全局缓冲区大小(字节)。当所有表的缓冲数据总量达到此阈值时,触发刷新。 + +### 加载相关配置 + +#### `sink.version` + +- 推荐值:`V2` +- 描述:必填项。`V1` 不支持事务 Stream Load 接口。 + +#### `sink.semantic` + +- 推荐值:`at-least-once` +- 描述:多表模式当前仅支持 `at-least-once`。 + +#### `database-name` + +- 推荐值:`*` +- 描述:通配符,用于启用动态多表路由。 + +#### `table-name` + +- 推荐值:`*` +- 描述:通配符,用于启用动态多表路由。 + +#### `sink.buffer-flush.interval-ms` + +- 推荐值:`1000` +- 描述:控制提交周期。可将其设置为 `1000` 以实现约一秒的数据新鲜度。 + +#### `sink.properties.format` + +- 推荐值:`json` +- 描述:数据格式。 + +#### `sink.properties.strip_outer_array` + +- 推荐值:`true` +- 描述:是否去除最外层的数组结构。 + +## 接口 + +### `StarRocksRowData` + +```java +public interface StarRocksRowData { + String getUniqueKey(); // Region routing key (nullable; auto-derived from database.table) + String getDatabase(); // Target database + String getTable(); // Target table + String getRow(); // Row data in JSON format + + /** + * Indicates this is the last row of a source transaction batch. + * Used by multi-table transaction mode to determine safe commit points: + * the connector only commits when the most recent write had this flag set, + * ensuring no partial source transaction is committed. + */ + default boolean isTransactionEnd() { + return false; + } + + /** + * Returns the source partition ID for this row. + * Used by multi-table transaction mode to track per-partition transaction + * boundaries. Returns -1 when partition tracking is not applicable. + */ + default int getSourcePartition() { + return -1; + } +} +``` + +### `DefaultStarRocksRowData` + +```java +public class DefaultStarRocksRowData implements StarRocksRowData { + // 基本字段 + private String uniqueKey; + private String database; + private String table; + private String row; + + // 多表事务字段 + private boolean transactionEnd; // Source transaction end marker + private int sourcePartition = -1; // Source partition ID (for keyBy ordering) + + // 构造函数 + public DefaultStarRocksRowData(); + public DefaultStarRocksRowData(String database, String table); + public DefaultStarRocksRowData(String uniqueKey, String database, String table, String row); + + // Setter 方法 + public void setUniqueKey(String uniqueKey); + public void setDatabase(String database); + public void setTable(String table); + public void setRow(String row); + public void setTransactionEnd(boolean transactionEnd); + public void setSourcePartition(int sourcePartition); + + // Getter 方法(继承自 StarRocksRowData) + public String getUniqueKey(); + public String getDatabase(); + public String getTable(); + public String getRow(); + public boolean isTransactionEnd(); + public int getSourcePartition(); +} +``` + +### 用户实现的组件 + +用户需要实现一个 `KeyedProcessFunction`(在本文档中称为 `TransactionAssembler`),它: + +1. 按源分区作为键并在事务中缓冲数据行 +2. 仅在源事务关闭时(例如,收到 `TXN_END` 时)才发出所有行 +3. 在最后一行设置 `transactionEnd=true` +4. 在每一行上设置 `sourcePartition` + +无需自定义 `SinkFunction` — 标准连接器 API(`SinkFunctionFactory.createSinkFunction()`)可处理一切。 + +## 完整示例 + +### StarRocks 表 DDL + +```sql +CREATE DATABASE `test`; + +CREATE TABLE `test`.`orders` ( + `order_id` BIGINT NOT NULL, + `customer_id` BIGINT NOT NULL, + `total_amount` DECIMAL(10,2) DEFAULT "0", + `order_status` VARCHAR(32) DEFAULT "" +) ENGINE=OLAP PRIMARY KEY(`order_id`) +DISTRIBUTED BY HASH(`order_id`) +PROPERTIES("replication_num" = "1"); + +CREATE TABLE `test`.`order_items` ( + `item_id` BIGINT NOT NULL, + `order_id` BIGINT NOT NULL, + `product_name` VARCHAR(128) DEFAULT "", + `quantity` INT DEFAULT "0", + `price` DECIMAL(10,2) DEFAULT "0" +) ENGINE=OLAP PRIMARY KEY(`item_id`) +DISTRIBUTED BY HASH(`item_id`) +PROPERTIES("replication_num" = "1"); +``` + +### Flink 作业代码 + +```java +import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData; +import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class WriteMultipleTablesWithTransaction { + + // ===================================================== + // 1. 源事件模型(根据业务逻辑自定义) + // ===================================================== + + enum EventType { TXN_BEGIN, DATA, TXN_END } + + static class TxnEvent implements Serializable { + private static final long serialVersionUID = 1L; + + int partition; + String txnId; + EventType type; + String database; + String table; + String json; + + TxnEvent() {} + + TxnEvent(int partition, String txnId, EventType type, + String database, String table, String json) { + this.partition = partition; + this.txnId = txnId; + this.type = type; + this.database = database; + this.table = table; + this.json = json; + } + + static TxnEvent begin(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_BEGIN, null, null, null); + } + + static TxnEvent data(int partition, String txnId, String db, String table, String json) { + return new TxnEvent(partition, txnId, EventType.DATA, db, table, json); + } + + static TxnEvent end(int partition, String txnId) { + return new TxnEvent(partition, txnId, EventType.TXN_END, null, null, null); + } + } + + // ===================================================== + // 2. TransactionAssembler — 核心用户组件 + // ===================================================== + + /** + * Buffers DATA events per partition; on TXN_END emits the complete + * transaction's rows as individual DefaultStarRocksRowData records. + * + * Only emits when a source transaction is fully closed (TXN_END received). + * All rows are emitted synchronously within one processElement() call, + * so they enter the downstream sink buffer without intervening checkpoint barriers. + * + * Multiple source transactions accumulate in the sink's buffer between + * flush cycles — the connector handles grouping them into StarRocks transactions. + */ + static class TransactionAssembler + extends KeyedProcessFunction { + + private transient ListState pendingRows; + + @Override + public void open(Configuration parameters) throws Exception { + ListStateDescriptor descriptor = new ListStateDescriptor<>( + "pending-txn-rows", Types.POJO(TxnEvent.class)); + pendingRows = getRuntimeContext().getListState(descriptor); + } + + @Override + public void processElement(TxnEvent event, Context ctx, + Collector out) throws Exception { + switch (event.type) { + case TXN_BEGIN: + pendingRows.clear(); + break; + + case DATA: + pendingRows.add(event); + break; + + case TXN_END: + List rows = new ArrayList<>(); + for (TxnEvent row : pendingRows.get()) { + rows.add(row); + } + int partition = ctx.getCurrentKey(); + for (int i = 0; i < rows.size(); i++) { + TxnEvent row = rows.get(i); + DefaultStarRocksRowData rowData = new DefaultStarRocksRowData( + null, row.database, row.table, row.json); + rowData.setSourcePartition(partition); + // 将最后一行标记为事务结束 + if (i == rows.size() - 1) { + rowData.setTransactionEnd(true); + } + out.collect(rowData); + } + pendingRows.clear(); + break; + } + } + } + + // ===================================================== + // 3. 主程序 + // ===================================================== + + public static void main(String[] args) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030"); + String loadUrl = params.get("loadUrl", "127.0.0.1:8030"); + String userName = params.get("userName", "root"); + String password = params.get("password", ""); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10_000); // Checkpoint is for recovery only; does not affect commit cycle + + // --- 数据源 --- + // 替换为实际的 Kafka Source,将数据反序列化为 TxnEvent + DataStream events = env.addSource(/* KafkaSource or MockTxnEventSource */); + + // --- 步骤 1:按分区组装完整事务 --- + // TransactionAssembler 仅在收到 TXN_END 后才发出行。 + // 多个已关闭的源事务在两次刷新之间累积在 sink 缓冲区中。 + DataStream rows = events + .keyBy(e -> e.partition) + .process(new TransactionAssembler()); + + // --- 步骤 2:按分区亲和性路由到 sink --- + // keyBy(sourcePartition) 将同一分区的数据路由到同一个 sink 子任务。 + // 连接器在内部使用按分区划分的 region,因此即使多个 + // 分区落在同一个 sink 子任务上,事务边界也会通过 PartitionCommitTracker + // 按分区独立跟踪。 + DataStream partitionedRows = rows + .keyBy(DefaultStarRocksRowData::getSourcePartition); + + // --- 步骤 3:配置连接器 --- + // sink.transaction.multi-table.enabled=true 激活每分区的 region + // 在 StreamLoadManagerV2 内部进行跟踪:每个分区的 region 在其 txnEnd 到达时独立切换, + // 仅当所有活跃分区均已切换后才触发提交。 + // 活跃分区均已完成切换。 + StarRocksSinkOptions options = StarRocksSinkOptions.builder() + .withProperty("jdbc-url", jdbcUrl) + .withProperty("load-url", loadUrl) + .withProperty("database-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("table-name", "*") // Wildcard for dynamic multi-table routing + .withProperty("username", userName) + .withProperty("password", password) + .withProperty("sink.version", "V2") // Required: V2 + .withProperty("sink.semantic", "at-least-once") + .withProperty("sink.transaction.multi-table.enabled", "true") // Enable multi-table txn + .withProperty("sink.buffer-flush.interval-ms", "1000") // ~1s data freshness + .withProperty("sink.properties.format", "json") + .withProperty("sink.properties.strip_outer_array", "true") + .build(); + + // 可选:每张表的 Stream Load 属性 + StreamLoadTableProperties orderItemsProps = StreamLoadTableProperties.builder() + .database("test") + .table("order_items") + .addProperty("format", "json") + .addProperty("strip_outer_array", "true") + .addProperty("ignore_json_size", "true") + .build(); + options.addTableProperties(orderItemsProps); + + // --- 步骤 4:创建并附加 Sink --- + // 标准连接器 API — 无需自定义 SinkFunction。 + // 在经过 keyBy 的流上调用 addSink 可确保分区亲和性。 + SinkFunction sink = SinkFunctionFactory.createSinkFunction(options); + partitionedRows.addSink(sink); + + env.execute("WriteMultipleTablesWithTransaction"); + } +} +``` + +### 数据流拓扑 + +``` +Kafka (60 partitions) + | + v +keyBy(partition) ———— Ensures same-partition events go to the same subtask + | + v +TransactionAssembler (KeyedProcessFunction) + | Buffers DATA events + | On TXN_END: emits all rows (last row has transactionEnd=true) + | Every row carries sourcePartition + | + v +keyBy(sourcePartition) ———— Ensures same-partition rows go to the same sink subtask + | + v +StarRocksDynamicSinkFunctionV2 (via SinkFunctionFactory.createSinkFunction) + | + | +——————————— StreamLoadManagerV2 (multi-table txn mode) —————————-——+ + | | | + | | Per-partition, per-table regions: | + | | Region(P0, orders), Region(P0, order_items) | + | | Region(P2, orders), Region(P2, order_items) | + | | | + | | Each region tracks: | + | | - activeChunk / inactiveChunks | + | | - lastSwitchTimeMs (for miniInterval batching) | + | | - activeChunkCleanBoundary (true iff last task event is txnEnd)| + | | | + | | write(partition, db, table, row) [task thread] | + | | -> routes to Region(partition, db, table) | + | | -> write0 sets activeChunkCleanBoundary = false | + | | -> In multi-table mode write0 does NOT switchChunk, so | + | | activeChunk only freezes at a txnEnd boundary | + | | | + | | setCommitAllowed(partition, txnEnd=true) [task thread] | + | | -> region.tryMiniIntervalSwitch(): | + | | sets activeChunkCleanBoundary = true | + | | if (now - lastSwitchTimeMs >= miniInterval | + | | && activeChunk has data): switchChunkForCommit | + | | else: data batches into activeChunk with subsequent | + | | completed source transactions (N:1 mapping) | + | | -> PartitionCommitTracker.onTxnEnd(partition) | + | | | + | | SharedTransactionCoordinator: | + | | -> eagerly opens shared txn before any flush | + | | -> all autonomous flushes use the shared label | + | | -> recycles idle txn at 80% of server timeout | + | | | + | | Manager thread (every scanningFrequency): | + | | -> tryForceCleanSwitch per region: | + | | if cleanBoundary && has data && miniInterval elapsed | + | | -> switchChunkForCommit (source-idle fallback) | + | | -> tryStartTimerDrivenCommit: | + | | if commitInterval elapsed && hasDataLoaded | + | | -> set commitInFlight = true | + | | -> autonomous flush: drain inactiveChunks via streamLoad | + | | (never touches activeChunk in multi-table mode) | + | | | + | | Manager thread (commitInFlight=true): | + | | -> triggerLoadIfNeeded per region (HTTP /api/transaction/load) | + | | -> wait all loads complete | + | | -> unified commit via SharedTransactionCoordinator | + | | -> reset tracker; open next shared txn | + | +———————————————————————————————————————————————————————————————————+ + | + v +StarRocks (test.orders + test.order_items) +``` + +## 工作原理 + +### 块生命周期与 `miniInterval` 批处理 + +每个 `(partition, table)` 区域有一个 `activeChunk`(当前接受写入)和一个 `inactiveChunks` 的 FIFO(冻结数据,等待 HTTP 加载)。在多表事务模式下,**仅** 数据从 `activeChunk` 移入 `inactiveChunks` 的方式是通过 `switchChunkForCommit`,它恰好从三个地方被调用: + +1. **任务线程,在 txnEnd 时** — 通过 `region.tryMiniIntervalSwitch()` 在 `setCommitAllowed(partition, true)` 内部。这是常见路径。 + +2. **管理线程,源空闲回退** — 在每个扫描周期中,通过 `region.tryForceCleanSwitch()` 对那些 `activeChunk` 一直处于干净事务边界空闲状态的区域执行此操作。 + +3. **管理线程,保存点/回收** — 在验证每个区域都处于干净边界后,强制切换所有区域。 + +为避免在高吞吐量 CDC 中每个源事务产生一次 HTTP 加载,任务线程仅在距同一区域上次切换已过去至少 `miniSwitchIntervalMs` 时才执行切换。`miniSwitchIntervalMs` 的计算方式为 `min(1000 ms, max(100 ms, commitInterval / 10))`,因此 1 秒的提交间隔在 100 ms 时进行批处理,而 30 秒的间隔则将批处理上限设为 1 秒。在一个 miniInterval 窗口内,多个已完成的源事务会累积到同一个 `activeChunk` 中(N:1 映射),并在下次切换时一起冻结。 + +每个区域包含两个驱动这些决策的字段: + +- `lastSwitchTimeMs` — 最近一次切换的纪元毫秒数。初始值为 0,因此区域创建后的第一个 txnEnd 始终会触发切换。 + +- `activeChunkCleanBoundary` — 如果此区域上最近的任务线程事件是 `onTxnEnd` 或 `switchChunk`,则为 `true`。在任何 `write()` 之后为 `false`。管理线程的 `tryForceCleanSwitch` 仅在此标志为 `true` 时运行,因此它永远不会冻结部分源事务数据。 + +### 任务线程 — 写入与 txnEnd + +``` +invoke(record) [Flink task thread] + | + | if record is a data row: + | write(partition, db, table, row) + | region.write(row) → addRow(); cleanBoundary = false + | + | if record carries transactionEnd=true: + | setCommitAllowed(partition, true) + | for each region owned by this partition: + | region.tryMiniIntervalSwitch(): + | cleanBoundary = true // always (txnEnd observed) + | if (now - lastSwitchTimeMs >= miniInterval + | && activeChunk has data): + | switchChunkForCommit() // freezes activeChunk + | partitionTracker.onTxnEnd(partition) // safety bookkeeping only +``` + +任务线程是 `write()` 和 `setCommitAllowed()` 事件的唯一序列化器,这也是为什么 `cleanBoundary` 标记和条件切换都必须在此处执行(而非延迟到管理线程):`cleanBoundary` 必须始终反映最新的任务线程事件,否则管理器的空闲回退可能与写入操作产生竞争,导致部分数据被冻结。 + +### 6.3 管理线程 — 时间驱动提交 + +管理线程以 `scanningFrequency` 的频率运行扫描循环。每次迭代: + +1. **Ensure shared transaction**: 打开一个新连接(即时),或在当前连接接近 StarRocks 服务端超时时主动回收该连接(`timeout` 标头的 80%,默认 480 秒)。 + +2. **源空闲回退**: call `region.tryForceCleanSwitch()` on every region to freeze any `activeChunk` that is clean and has been idle for at least `miniInterval`. This handles the "source paused after a few txnEnds" case where the task thread stopped before issuing a fresh switch. + +3. **时间驱动的提交触发器**: 调用 `tryStartTimerDrivenCommit()`,如果满足条件则设置 `commitInFlight=true`**两者** 条件成立: + + - `now - lastCommitTimeMs >= commitInterval`(已配置的 `sink.buffer-flush.interval-ms`)。 + + - 存在待提交的数据——`txnCoordinator.hasDataLoaded()` 为 true,或至少有一个 region 仍有待处理的 inactiveChunks。 + +4. **自主刷新**:通过 `FlushAndCommitStrategy` 排空任何 `inactiveChunks` 非空的 region。多表模式的 `flush()` 仅流出已冻结的非活跃 chunk——它**从不**不会触碰 `activeChunk`。这保证了在共享标签下到达 StarRocks 的每个 chunk 均来自已完成的源事务这一不变量。 + +当 `commitInFlight=true` 时,主循环进入 `processMultiTableCommit`,等待飞行中的加载完成,为所有剩余的非活跃 chunk 触发加载,通过 `SharedTransactionCoordinator` 执行统一提交,更新 `lastCommitTimeMs`,重置追踪器,并为下一个周期开启新的共享事务。 + +### 共享事务协调 + +同一提交周期内的所有表共享一个由 `SharedTransactionCoordinator` 管理的 StarRocks 事务: + +1. **提前开启事务**:在任何自主刷新之前,共享事务会被提前开启,因此所有 HTTP 加载均使用共享标签。这消除了独立标签刷新可能在共享事务随后覆盖标签时成为孤儿的数据丢失窗口。 + +2. **统一提交**:在所有 region 的数据加载完成后,针对共享标签执行单次 `commit`。多表事务跳过 `prepare` 步骤,因为 StarRocks 在多表模式下不支持 `TXN_PREPARE`。 + +3. **空闲事务回收**:如果共享事务保持开启的时间超过 StarRocks 服务端超时时间的 80%(默认:600s 超时对应 480s),则会主动回收(提交或回滚 + 重新开启),以防止服务端超时错误。若任何 region 存在进行中的事务数据(违反清洁边界),或任何分区已写入数据但从未收到 txnEnd,则回收操作将快速失败。 + +### 6.5 PartitionCommitTracker(安全记账) + +在当前设计中,提交时机完全由提交间隔和每个 region 的清洁边界标志驱动。`PartitionCommitTracker` 被简化为信息/安全辅助工具: + +- `onWrite(partition)` 在首次写入时将分区注册为 `ACTIVE`。 + +- `onTxnEnd(partition)` 将分区转换为 `TXN_END_SEEN`(粘性——后续写入**不会**不会将其降级回 `ACTIVE`)。 + +- `getPartitionsWithoutTxnEnd()` 列出已写入数据但从未收到 txnEnd 的分区。用于保存点和回收操作在上游合约违规(从未关闭的源事务)时快速失败。 + +- `reset()` 在提交周期结束时清除所有分区。 + +追踪器不再驱动切换/提交决策,不追踪 `SWITCHED` 状态,也不管理待处理的 txnEnd 信号。提交后停止产生数据的分区由 `reset()` 简单清除;若之后恢复,下一次 `onWrite` 会重新注册它们。 + +### 使用共享标签的自主刷新 + +当 region 的 `inactiveChunks` 变为非空时,管理线程的自主刷新循环通过 `/api/transaction/load` 在**当前共享标签**当前共享标签下将其流式传输到 StarRocks。由于共享事务始终在任何数据加载之前开启,因此不存在数据在独立(孤儿)标签下被加载的窗口。在多表模式下,`flush()` 从不触发 `switchChunk`——它仅排空已冻结的非活跃 chunk——因此「在共享标签下到达 StarRocks 的每个 chunk 均来自已完成的源事务」这一不变量无条件成立。 + +### 安全保证 + +| 保证 | 机制 | +| ------------------------------------------------- | ----------------------------------------------------------------------------- | +| switchChunk 不会拆分源事务 | `switchChunkForCommit` 仅在清洁事务边界处调用(在 txnEnd 时或当 `activeChunkCleanBoundary` 为 `true` 时)。 | +| 提交从不包含部分源事务 | 到达 StarRocks 的每个 chunk 均源自 `switchChunkForCommit`。自主刷新在多表模式下从不切换 `activeChunk`。 | +| 按分区隔离 | 每个 `(partition, table)` 拥有自己的 region。一个分区的切换不会影响另一个分区的数据。 | +| 分区内有序性 | `keyBy(sourcePartition)` 将相同分区的行路由到同一 sink 子任务。 | +| 任务线程非阻塞 | `tryMiniIntervalSwitch` 的时间复杂度为 O(regions-in-partition)。HTTP 工作在管理线程上异步执行。 | +| 源空闲时的数据可见性 | 管理线程的 `tryForceCleanSwitch` 在空闲 `miniInterval` 后冻结干净的 `activeChunk`,因此即使源暂停,数据在 `commitInterval + miniInterval` 内仍保持可见。 | +| 自主刷新是事务安全的 | 每次加载均使用共享标签。每个冻结的 chunk 均来自已完成的源事务。 | +| 空闲事务不会超时 | 共享事务在服务端超时时间的 80% 时被回收。回收在进行中的数据上快速失败。 | +| 按分区独立提交 | 具有已完成源事务的分区在下一个提交间隔提交,与其他分区的进行中事务无关。 | + +### N:1 事务映射 + +多个源事务可通过 miniInterval 批处理机制在单个 StarRocks 事务中累积: + +``` +Source txn K1 (3 rows) -> write -> activeChunk -> txnEnd (1st switch) +Source txn K2 (2 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) +Source txn K3 (4 rows) -> write -> activeChunk -> txnEnd (inside miniInterval: no switch) + (miniInterval elapsed → next switch batches K2+K3) + -> ... +commitInterval elapsed + -> commit(label=A) + -> K1 + K2 + K3 atomically committed to StarRocks +``` + +由于提交决策是时间驱动的(不与特定的 txnEnd 绑定),连接器通过摊销 HTTP 加载及 begin/commit 开销来处理大量小型源事务,无需任何配置更改。对于以每秒 100 个 txnEnd 速率发送的 CDC 源,配置 `commitInterval=1 s`、`miniInterval=100 ms` 时,连接器每秒最多发出约 10 次加载调用,而非约 100 次。 + +## 限制 + +- **需要 `sink.version=V2`**:V1 不支持事务流式加载。 + +- **仅至少一次**:失败的重试可能产生重复写入。多表模式保证同一批次内的所有表一起成功或失败,但不提供全局精确一次语义。对于 PRIMARY KEY 表,重复写入是幂等的(upsert)。 + +- **所有表必须在同一数据库中**:StarRocks 多表事务是数据库范围的;不支持跨数据库事务。 + +- **事务范围为每个 sink 子任务 + 每个分区**:每个 sink 子任务独立维护其自身的 StarRocks 事务。原子性在**在单个源事务内**(某一分区上单次 txnEnd 涉及的所有行,跨该分区写入的所有表)范围内得到保证。不同**不同**源分区的数据可见性可以交错:一旦分区 P0 的源事务已完全到达且提交间隔已过,P0 的数据即会被提交,即使分区 P1 的源事务仍在进行中。需要在 StarRocks 层面实现跨分区原子性的应用,必须使用单一源分区或在上游协调提交。 + +- **数据可见性延迟**:由 `sink.buffer-flush.interval-ms` 和内部 `miniInterval = min(1000, max(100, commitInterval/10))` 控制。在持续流动的 CDC 流中,单行数据在其源提交后约 `commitInterval + miniInterval` 内即可在 StarRocks 中可见。在源暂停期间,已提交的数据保持可见,而上次切换与暂停之间的任何行将在再经过一个 `miniInterval` 后可见(由管理线程的清洁边界回退机制处理)。 + +- **取决于 StarRocks 集群事务设置**:监控运行中的事务数量限制、prepared 超时(默认 600 秒)以及 label 保留情况。确保 `sink.buffer-flush.interval-ms` 显著短于 StarRocks 事务超时时间。 + +- **长源事务下 `activeChunk` 内存增长**:由于多表模式禁用了由 chunk 大小触发的内部切换(以保持清洁事务边界不变性),`activeChunk` 可能持续增长直到下一个 txnEnd 到达。内存受 `sink.transaction.multi-table.buffer-size`(软限制)和 `2 × buffer-size`(通过 `blockIfCacheFull` 实现的硬限制)约束。异常大的源事务将通过背压限制任务线程;若此情况频繁发生,请在上游拆分源事务或增大 `sink.transaction.multi-table.buffer-size`。 + +- **跨数据库写入将被拒绝**:多表事务会验证所有 region 是否属于同一数据库。在同一提交周期内向不同数据库的表写入数据将抛出错误。 + +- **与合并提交不兼容**:`sink.properties.enable_merge_commit=true` 不能与 `sink.transaction.multi-table.enabled=true` 组合使用。合并提交将写入路由通过 `MergeCommitManager`,而该路径缺少多表模式用于事务边界的分区感知 `write(int, ...)` / `setCommitAllowed(int, ...)` 钩子。若两者同时启用,连接器将在验证阶段快速失败。 + +## 监控与故障排查 + +推荐指标: + +- **Flink 侧** + - Checkpoint 成功率 + - Checkpoint 持续时间 + - Sink 刷新/提交延迟 +- **StarRocks 侧** + - 运行中/已 prepared 的事务数量 + - 事务超时发生次数 + - Label 冲突 + +### 常见问题 + +#### `transaction not existed` + +- 原因:StarRocks 事务超时 +- 解决方案:连接器会在服务器超时时间的 80% 时自动回收空闲事务。若问题仍然发生,请检查 prepared 超时是否过短或刷新间隔是否过大。 + +#### `too many running txns` + +- 原因:并发事务数量过多。 +- 解决方案:降低 sink 并行度,或增大 StarRocks FE 配置项 `max_running_txn_num_per_db` 的值。 + +#### `Transaction start failed` + +- 原因:`beginTransaction` HTTP 调用失败 +- 解决方案:验证 load-url 的连通性以及 StarRocks 版本(需要 v4.0 或更高版本)。 + +#### 数据可见性延迟过高 + +- 原因:提交条件未满足。 +- 解决方案:验证上游数据是否包含正确的 `transactionEnd=true` 标记;每行数据预期最多有 `commitInterval + miniInterval` 的延迟。若延迟超出此预算,请检查管理线程是否卡在回收或飞行中的加载操作(参见 `StarRocks-Sink-Manager` 日志)。 + +#### 跨数据库写入错误 + +- 原因:不同数据库中的表处于同一提交周期内。 +- 解决方案:确保同一作业中写入的所有表均属于同一 StarRocks 数据库。 + +## 最佳实践 + +1. **TransactionAssembler 合约**: + + - 仅在源事务完全关闭后才发出行。 + - 最后一行必须包含 `setTransactionEnd(true)`。 + - 每一行必须包含 `setSourcePartition(partition)`。 + - 所有行必须在单个 `processElement()` 调用中同步发出。 + +2. **sink 前的 keyBy 是强制要求**:`rows.keyBy(DefaultStarRocksRowData::getSourcePartition).addSink(sink)` — 省略此项会破坏分区内的事务顺序。 + +3. **Checkpoint 与提交解耦**:Checkpoint 间隔可以设置为较大的值(例如 60 秒)用于故障恢复。数据可见性由 `sink.buffer-flush.interval-ms` 控制(例如 1000ms)。 + +4. **保持路由策略稳定**:避免单个事务写入过多不同的表,这会增加事务持续时间和失败概率。 + +5. **上线前进行故障注入测试**:终止 TaskManager / 引入网络抖动,并在 checkpoint 恢复后验证数据正确性。 diff --git a/docs/zh/unloading/Flink_connector.md b/docs/zh/unloading/Flink_connector.md index 797f26928c315f..b5b7500e69300e 100644 --- a/docs/zh/unloading/Flink_connector.md +++ b/docs/zh/unloading/Flink_connector.md @@ -33,10 +33,10 @@ Flink Connector 支持两种数据读取方式:Flink SQL 和 Flink DataStream | Connector | Flink | StarRocks | Java | Scala | |-----------|-------------------------------|---------------| ---- |-----------| +| 1.2.15 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | | 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 | -| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 | ## 前提条件