diff --git a/docs/en/transforms/multi-table-transform-and-join-boundary.md b/docs/en/transforms/multi-table-transform-and-join-boundary.md new file mode 100644 index 000000000000..f10d52b8ebeb --- /dev/null +++ b/docs/en/transforms/multi-table-transform-and-join-boundary.md @@ -0,0 +1,266 @@ +--- +sidebar_position: 16 +--- + +# Multi-Table Transform Capability Boundary + +## Overview + +SeaTunnel's **multi-table transform** feature allows a single transform node to process multiple +tables flowing from an upstream source (typically a CDC connector) in one pipeline. This page +documents precisely what is supported, what is not, and what alternatives to use when you hit a +capability boundary. + +--- + +## 1. What Is a Multi-Table Transform? + +In a standard single-table pipeline, one Source feeds one Transform chain feeds one Sink. +In a multi-table pipeline, a single Source (e.g., MySQL-CDC) emits records from **many tables** +simultaneously, and each downstream Transform or Sink must declare which table(s) it applies to. + +``` +MySQL-CDC ──► FieldMapper (orders table) ──► Kafka Sink (orders topic) + │ + ├──► FieldMapper (users table) ──► Kafka Sink (users topic) + │ + └──► (unmatched tables pass through) ──► Elasticsearch Sink +``` + +--- + +## 2. Capability Boundary Table + +| Capability | Supported | Notes | +|---|---|---| +| Per-table field rename / map | ✅ Yes | Use `FieldMapper` with `plugin_input` and `table_match_regex` | +| Per-table column filtering | ✅ Yes | Use `Filter` with `plugin_input` and `table_match_regex` | +| Per-table type casting | ✅ Yes | Use `FieldMapper` with `define_sink_type` option | +| Per-table SQL transform (single table) | ✅ Yes | Use `SQL` transform with `plugin_input`; scope it with `table_match_regex` when needed | +| `TableMerge` — merge multiple tables into one | ✅ Yes | Tables must share compatible schema | +| `TableRename` — rename tables in the stream | ✅ Yes | Works well for routing to Sink by table name | +| Row-level filtering (filter by `rowkind`) | ✅ Yes | Use `FilterRowKind` transform | +| Cross-table SQL JOIN | ❌ Not supported | See Section 4 for alternatives | +| Aggregation across multiple CDC tables | ❌ Not supported | Aggregate downstream in OLAP engine | +| Generating new tables from a JOIN result | ❌ Not supported | Use a dedicated SQL engine | +| Applying one transform to ALL tables wildcard | ⚠️ Partial | `TableMerge` then single transform; schema must be compatible | +| Changing schema mid-stream (DDL events) | ⚠️ Limited | Depends on sink; some sinks handle schema evolution; transforms do not | +| Nested JSON field extraction per table | ✅ Yes | Use `JsonPath` transform with `plugin_input` and `table_match_regex` | + +--- + +## 3. TableMerge vs SQL Join + +These two are the most commonly confused features. + +### 3.1 `TableMerge` Transform + +`TableMerge` **merges the row streams of multiple tables into a single result table**. All +input tables must have the same (or compatible) schema. Use it to route all tables to one Sink. + +```json +{ + "plugin_name": "TableMerge", + "plugin_input": ["orders_2023", "orders_2024"], + "plugin_output": "all_orders", + "merge_by_field": true +} +``` + +**When to use**: Fan-in from multiple source tables that have the same structure (e.g., sharded +tables, multi-year partitions, or multi-database tables with identical schema). + +**When NOT to use**: When tables have different schemas that you need to correlate or enrich +from each other — that requires a JOIN. + +### 3.2 SQL JOIN (not natively supported in multi-table pipelines) + +A SQL JOIN correlates rows from two different tables based on a key. **SeaTunnel's `SQL` +transform does NOT support cross-table JOIN inside a multi-table streaming pipeline.** + +Attempting to JOIN records from two upstream tables inside a single SQL transform is not +supported and will result in a configuration error. + +**Recommended alternatives**: +- Write both tables to a shared data lake or warehouse (e.g., Hudi, Iceberg, ClickHouse), then + run the JOIN there +- Use Apache Flink with SeaTunnel's Flink connector for stateful JOIN operations +- Materialise the "dimension" table into a lookup cache (e.g., Redis, RocksDB) and use a custom + transform for enrichment + +--- + +## 4. Cross-Source JOIN Limitation + +SeaTunnel does **not** support streaming JOINs where the two input sides come from **different +sources** (e.g., joining MySQL-CDC with a PostgreSQL-CDC stream). + +| Scenario | Supported | +|---|---| +| Single-source multi-table pass-through | ✅ | +| Single-source TableMerge (same schema) | ✅ | +| Cross-source JOIN (MySQL-CDC + PG-CDC) | ❌ | +| Cross-source JOIN (CDC + JDBC batch) | ❌ | +| Same-source JOIN on two different tables | ❌ | + +**Workaround**: Write both sources to a common sink (Kafka, Iceberg, etc.) and perform the JOIN +downstream in a dedicated SQL engine (Flink, Spark, ClickHouse, etc.). + +--- + +## 5. Per-Table Transform Configuration Pattern + +When you need different transforms for different tables from the same source, declare separate +transform blocks that share the same `plugin_input` and use different `table_match_regex` +rules: + +```json +{ + "env": { + "job.name": "cdc-multi-table", + "job.mode": "STREAMING" + }, + "source": [ + { + "plugin_name": "MySQL-CDC", + "plugin_output": "cdc_stream", + "base-url": "jdbc:mysql://localhost:3306/mydb", + "username": "cdc_user", + "password": "password", + "database-names": ["mydb"], + "table-names": ["mydb.orders", "mydb.users", "mydb.products"] + } + ], + "transform": [ + { + "plugin_name": "FieldMapper", + "plugin_input": ["cdc_stream"], + "plugin_output": "orders_mapped", + "field_mapper": { "order_id": "id", "order_amount": "amount" }, + "table_match_regex": "mydb\\.orders" + }, + { + "plugin_name": "FieldMapper", + "plugin_input": ["cdc_stream"], + "plugin_output": "users_mapped", + "field_mapper": { "user_id": "id", "user_email": "email" }, + "table_match_regex": "mydb\\.users" + } + ], + "sink": [ + { + "plugin_name": "Kafka", + "plugin_input": ["orders_mapped"], + "topic": "orders" + }, + { + "plugin_name": "Kafka", + "plugin_input": ["users_mapped"], + "topic": "users" + }, + { + "plugin_name": "Kafka", + "plugin_input": ["cdc_stream"], + "topic": "products", + "table_match_regex": "mydb\\.products" + } + ] +} +``` + +--- + +## 6. Common Fields Example (Shared Schema) + +If multiple tables share a common set of fields, you can use `TableMerge` to combine them and +apply a single transform: + +```json +"transform": [ + { + "plugin_name": "TableMerge", + "plugin_input": ["cdc_stream"], + "plugin_output": "all_events", + "table_match_regex": "mydb\\.(orders|payments|refunds)" + }, + { + "plugin_name": "FieldMapper", + "plugin_input": ["all_events"], + "plugin_output": "all_events_mapped", + "field_mapper": { "created_at": "event_time", "event_type": "type" } + } +] +``` + +This works only when all three tables (`orders`, `payments`, `refunds`) share `created_at` and +`event_type` fields. If schemas differ, `TableMerge` will fail at runtime. + +--- + +## 7. EtLT Patterns with Multi-Table Transform + +**EtLT** (Extract, light-transform, Load, then Transform in the warehouse) is the recommended +pattern when SeaTunnel's transform layer cannot fulfil the full transformation requirement: + +``` +CDC Source + │ + ▼ +Light transforms (field rename, type cast, row filter) + │ + ▼ +Data Lake / Warehouse (Hudi / Iceberg / ClickHouse) + │ + ▼ +Heavy transforms (JOINs, aggregations, complex SQL) +in dbt / Flink SQL / Spark SQL +``` + +Use SeaTunnel's transform layer for: +- Field rename / filtering +- Type normalisation +- Row-level filtering +- Schema routing (different tables → different topics/tables) + +Offload to downstream: +- Cross-table JOINs +- Aggregations +- Pivot / unpivot +- Enrichment from dimension tables + +--- + +## 8. FAQ + +**Q: Can I apply one transform to ALL tables without specifying each one?** + +Not directly. Use `TableMerge` to combine tables with compatible schemas first, then apply a +single transform to the merged result. If schemas differ, you must use separate transform blocks, +typically with different `table_match_regex` rules. + +**Q: Does the `SQL` transform support `JOIN`?** + +No. The `SQL` transform only supports single-table queries (SELECT, WHERE, expressions). For +JOINs, use an external SQL engine after loading the data into a sink. + +**Q: What happens to tables that are not matched by any transform?** + +Unmatched tables continue flowing through the pipeline and can be captured by a Sink that +uses the right `plugin_input` and `table_match_regex`. + +**Q: Can I add a new table to an existing CDC pipeline without downtime?** + +This depends on the connector. MySQL-CDC supports dynamic table discovery in some configurations, +but adding a transform for a new table requires a pipeline restart. Use `stop-with-savepoint` +to minimise data loss (see [REST API v2 Reference](../engines/zeta/rest-api-v2.md)). + +--- + +## See Also + +- [TableMerge Transform Reference](table-merge.md) +- [TableRename Transform Reference](table-rename.md) +- [transform-multi-table Reference](transform-multi-table.md) +- [Multi-Table Architecture Overview](../architecture/features/multi-table.md) +- [CDC Pipeline Architecture](../architecture/cdc-pipeline-architecture.md) +- [REST API v2 Reference](../engines/zeta/rest-api-v2.md) diff --git a/docs/sidebars.js b/docs/sidebars.js index cbc39ae7bdef..a866715faf5d 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -262,7 +262,8 @@ const sidebars = { "transforms/table-filter", "transforms/table-merge", "transforms/table-rename", - "transforms/transform-multi-table" + "transforms/transform-multi-table", + "transforms/multi-table-transform-and-join-boundary" ] }, { diff --git a/docs/zh/transforms/multi-table-transform-and-join-boundary.md b/docs/zh/transforms/multi-table-transform-and-join-boundary.md new file mode 100644 index 000000000000..37fcb42c6347 --- /dev/null +++ b/docs/zh/transforms/multi-table-transform-and-join-boundary.md @@ -0,0 +1,269 @@ +--- +sidebar_position: 16 +--- + +# 多表 Transform 能力边界 + +## 概述 + +SeaTunnel 的**多表 Transform**功能允许单个 Transform 节点在一条流水线中同时处理来自上游 +数据源(通常是 CDC 连接器)的多张表。本文档精确描述了哪些功能受支持、哪些不受支持,以及 +遇到能力边界时的替代方案。 + +--- + +## 1. 什么是多表 Transform? + +在标准的单表流水线中,一个 Source 对应一条 Transform 链,再连接一个 Sink。在多表流水线中, +单个 Source(例如 MySQL-CDC)同时发送来自**多张表**的记录,下游的每个 Transform 或 Sink +需声明它适用于哪张(些)表。 + +``` +MySQL-CDC ──► FieldMapper (orders 表) ──► Kafka Sink (orders topic) + │ + ├──► FieldMapper (users 表) ──► Kafka Sink (users topic) + │ + └──► (未匹配的表直接透传) ──► Elasticsearch Sink +``` + +--- + +## 2. 能力边界一览 + +| 能力 | 是否支持 | 说明 | +|---|---|---| +| 按表字段重命名 / 映射 | ✅ 支持 | 使用带 `plugin_input` 和 `table_match_regex` 的 `FieldMapper` | +| 按表列过滤 | ✅ 支持 | 使用带 `plugin_input` 和 `table_match_regex` 的 `Filter` | +| 按表类型转换 | ✅ 支持 | 使用 `FieldMapper` 的 `define_sink_type` 选项 | +| 单表 SQL Transform | ✅ 支持 | 使用带 `plugin_input` 的 `SQL` Transform;必要时再用 `table_match_regex` 限定表范围 | +| `TableMerge`——将多表合并为一张 | ✅ 支持 | 各表需具有兼容的 Schema | +| `TableRename`——重命名流中的表 | ✅ 支持 | 适合按表名路由到 Sink | +| 行级过滤(按 `rowkind` 过滤) | ✅ 支持 | 使用 `FilterRowKind` Transform | +| 跨表 SQL JOIN | ❌ 不支持 | 参见第 4 节替代方案 | +| 多张 CDC 表的聚合 | ❌ 不支持 | 在下游 OLAP 引擎中执行聚合 | +| 通过 JOIN 生成新表 | ❌ 不支持 | 使用专用 SQL 引擎 | +| 通配符将同一 Transform 应用于所有表 | ⚠️ 部分支持 | 先用 `TableMerge` 合并,再做单 Transform;Schema 需兼容 | +| 流中 Schema 变更(DDL 事件) | ⚠️ 有限支持 | 取决于 Sink;部分 Sink 支持 Schema 演化,Transform 层不支持 | +| 按表 JSON 嵌套字段提取 | ✅ 支持 | 使用带 `plugin_input` 和 `table_match_regex` 的 `JsonPath` Transform | + +--- + +## 3. TableMerge 与 SQL Join 的区别 + +这是最容易混淆的两个特性。 + +### 3.1 `TableMerge` Transform + +`TableMerge` **将多张表的行流合并为一张结果表**。所有输入表必须具有相同(或兼容)的 +Schema。通常用于将多表路由至同一个 Sink。 + +```json +{ + "plugin_name": "TableMerge", + "plugin_input": ["orders_2023", "orders_2024"], + "plugin_output": "all_orders", + "merge_by_field": true +} +``` + +**适用场景**:多个来源表结构相同,需要合并(例如分片表、多年分区表,或多库同构表)。 + +**不适用场景**:需要关联或补全不同结构的表数据,这类场景需要 JOIN。 + +### 3.2 SQL JOIN(多表流水线中不支持) + +SQL JOIN 基于关联键将两张不同表的行进行关联。**SeaTunnel 的 `SQL` Transform 在多表流式 +流水线中不支持跨表 JOIN。** 在单个 SQL Transform 中尝试关联来自两张上游表的记录会导致 +配置报错。 + +**推荐替代方案**: + +- 将两张表写入共享数据湖或数仓(如 Hudi、Iceberg、ClickHouse),在目标端执行 JOIN +- 使用 Apache Flink 配合 SeaTunnel Flink 连接器进行有状态 JOIN +- 将“维度表”物化到查找缓存(如 Redis、RocksDB)中,通过自定义 Transform 进行数据补全 + +--- + +## 4. 跨 Source JOIN 的限制 + +SeaTunnel **不支持**两个输入侧来自**不同 Source** 的流式 JOIN(例如将 MySQL-CDC 与 +PostgreSQL-CDC 流进行 JOIN)。 + +| 场景 | 是否支持 | +|---|---| +| 单 Source 多表透传 | ✅ | +| 单 Source TableMerge(相同 Schema) | ✅ | +| 跨 Source JOIN(MySQL-CDC + PG-CDC) | ❌ | +| 跨 Source JOIN(CDC + JDBC 批量) | ❌ | +| 同 Source 两张不同表的 JOIN | ❌ | + +**解决方法**:将两个 Source 的数据写入公共 Sink(Kafka、Iceberg 等),再在专用 SQL 引擎 +(Flink、Spark、ClickHouse 等)中执行 JOIN。 + +--- + +## 5. 按表独立配置 Transform 的模式 + +当你需要对同一 Source 的不同表应用不同的 Transform 时,可声明多个共享相同 +`plugin_input`、但使用不同 `table_match_regex` 的 Transform 块: + +```json +{ + "env": { + "job.name": "cdc-multi-table", + "job.mode": "STREAMING" + }, + "source": [ + { + "plugin_name": "MySQL-CDC", + "plugin_output": "cdc_stream", + "base-url": "jdbc:mysql://localhost:3306/mydb", + "username": "cdc_user", + "password": "password", + "database-names": ["mydb"], + "table-names": ["mydb.orders", "mydb.users", "mydb.products"] + } + ], + "transform": [ + { + "plugin_name": "FieldMapper", + "plugin_input": ["cdc_stream"], + "plugin_output": "orders_mapped", + "field_mapper": { + "order_id": "id", + "order_amount": "amount" + }, + "table_match_regex": "mydb\\.orders" + }, + { + "plugin_name": "FieldMapper", + "plugin_input": ["cdc_stream"], + "plugin_output": "users_mapped", + "field_mapper": { + "user_id": "id", + "user_email": "email" + }, + "table_match_regex": "mydb\\.users" + } + ], + "sink": [ + { + "plugin_name": "Kafka", + "plugin_input": ["orders_mapped"], + "topic": "orders" + }, + { + "plugin_name": "Kafka", + "plugin_input": ["users_mapped"], + "topic": "users" + }, + { + "plugin_name": "Kafka", + "plugin_input": ["cdc_stream"], + "topic": "products", + "table_match_regex": "mydb\\.products" + } + ] +} +``` + +--- + +## 6. 公共字段示例(共享 Schema) + +如果多张表共享一组公共字段,可以先用 `TableMerge` 合并,再统一应用一个 Transform: + +```json +"transform": [ + { + "plugin_name": "TableMerge", + "plugin_input": ["cdc_stream"], + "plugin_output": "all_events", + "table_match_regex": "mydb\\.(orders|payments|refunds)" + }, + { + "plugin_name": "FieldMapper", + "plugin_input": ["all_events"], + "plugin_output": "all_events_mapped", + "field_mapper": { + "created_at": "event_time", + "event_type": "type" + } + } +] +``` + +此模式仅在三张表(`orders`、`payments`、`refunds`)都包含 `created_at` 和 +`event_type` 字段时有效。若各表 Schema 不同,`TableMerge` 会在运行时报错。 + +--- + +## 7. 多表 Transform 的 EtLT 模式 + +**EtLT**(Extract、轻量 transform、Load,再在数仓中 Transform)是当 SeaTunnel +Transform 层无法完成全量转换需求时的推荐架构模式: + +``` +CDC Source + │ + ▼ +轻量 Transform(字段重命名、类型转换、行过滤) + │ + ▼ +数据湖 / 数仓(Hudi / Iceberg / ClickHouse) + │ + ▼ +重型 Transform(JOIN、聚合、复杂 SQL) +在 dbt / Flink SQL / Spark SQL 中执行 +``` + +**适合在 SeaTunnel Transform 层完成的操作**: + +- 字段重命名 / 过滤 +- 类型标准化 +- 行级过滤 +- Schema 路由(不同表 -> 不同 Topic / 表) + +**建议下沉到下游处理的操作**: + +- 跨表 JOIN +- 聚合计算 +- Pivot / Unpivot +- 基于维度表的数据补全 + +--- + +## 8. 常见问题 + +**Q: 我能否不指定每张表,直接将一个 Transform 应用于所有表?** + +不能直接实现。可以先用 `TableMerge` 将 Schema 兼容的表合并,再对合并结果应用单一 +Transform。若各表 Schema 不同,则必须拆成多个 Transform 块,通常通过不同的 +`table_match_regex` 分别处理。 + +**Q: `SQL` Transform 支持 `JOIN` 吗?** + +不支持。`SQL` Transform 仅支持单表查询(SELECT、WHERE、表达式等)。如需执行 JOIN, +请先将数据加载至 Sink,再使用外部 SQL 引擎处理。 + +**Q: 未被任何 Transform 匹配的表会怎样?** + +未匹配的表会继续在流水线中流动,可被使用正确 `plugin_input` 和 `table_match_regex` +的 Sink 捕获。 + +**Q: 能否在不停机的情况下向现有 CDC 流水线新增表?** + +这取决于具体连接器。MySQL-CDC 在部分配置下支持动态发现新表,但为新表添加 Transform +仍需重启流水线。建议使用 `stop-with-savepoint` 将数据丢失降至最低(参见 +[REST API v2 参考文档](../engines/zeta/rest-api-v2.md))。 + +--- + +## 参考文档 + +- [TableMerge Transform 参考](table-merge.md) +- [TableRename Transform 参考](table-rename.md) +- [transform-multi-table 参考](transform-multi-table.md) +- [多表能力概览](../architecture/features/multi-table.md) +- [CDC 流水线架构](../architecture/cdc-pipeline-architecture.md) +- [REST API v2 参考文档](../engines/zeta/rest-api-v2.md)