diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java new file mode 100644 index 000000000000..981227491773 --- /dev/null +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.flink.execution; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.source.SupportSchemaEvolution; +import org.apache.seatunnel.translation.flink.schema.SchemaOperator; +import org.apache.seatunnel.translation.flink.schema.SchemaOperator13; + +import java.net.URL; +import java.util.List; + +/** + * Flink 1.13-specific source execution processor. Shadows the common {@code SourceExecuteProcessor} + * at runtime (same package, same class name) to provide two Flink 1.13-specific behaviours without + * using reflection: + * + *
    + *
  1. {@link #createSchemaOperator} returns {@link SchemaOperator13}, which registers the + * checkpoint-stall fallback timer via the strongly-typed {@code + * ProcessingTimeService.registerTimer} API instead of a background {@code + * ScheduledExecutorService} + reflection. + *
  2. {@link #supportsSinkFunctionFinish} hard-codes {@code false}: Flink 1.13's {@code + * SinkFunction} does not expose a {@code finish()} method, so this fact is known at compile + * time and no reflection is needed. + *
+ */ +@SuppressWarnings("unchecked,rawtypes") +public class SourceExecuteProcessor extends AbstractSourceExecuteProcessor { + + public SourceExecuteProcessor( + List jarPaths, + Config envConfig, + List pluginConfigs, + JobContext jobContext) { + super(jarPaths, envConfig, pluginConfigs, jobContext); + } + + @Override + protected SchemaOperator createSchemaOperator( + String jobId, SupportSchemaEvolution source, Config pluginConfig) { + return new SchemaOperator13(jobId, source, pluginConfig); + } + + /** + * Flink 1.13's {@code SinkFunction} does not have a {@code finish()} method, so source + * keep-alive must always be enabled when schema evolution is active. Returns {@code false} + * directly rather than using reflection. + */ + @Override + protected boolean supportsSinkFunctionFinish() { + return false; + } +} diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractSourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractSourceExecuteProcessor.java new file mode 100644 index 000000000000..158570238a40 --- /dev/null +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractSourceExecuteProcessor.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.core.starter.flink.execution; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.options.EnvCommonOptions; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SupportSchemaEvolution; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.FactoryUtil; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.constants.EngineType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.core.starter.execution.SourceTableInfo; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; +import org.apache.seatunnel.translation.flink.schema.SchemaOperator; +import org.apache.seatunnel.translation.flink.source.FlinkSource; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import scala.Tuple2; + +import java.io.Serializable; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME; +import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_OUTPUT; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch; +import static org.apache.seatunnel.common.constants.JobMode.STREAMING; + +@SuppressWarnings("unchecked,rawtypes") +public abstract class AbstractSourceExecuteProcessor + extends FlinkAbstractPluginExecuteProcessor { + + private static final String SOURCE_KEEP_ALIVE_CONFIG = "schema-changes.source-keep-alive"; + + protected AbstractSourceExecuteProcessor( + List jarPaths, + Config envConfig, + List pluginConfigs, + JobContext jobContext) { + super(jarPaths, envConfig, pluginConfigs, jobContext); + } + + @Override + public List execute(List upstreamDataStreams) { + StreamExecutionEnvironment executionEnvironment = + flinkRuntimeEnvironment.getStreamExecutionEnvironment(); + List sources = new ArrayList<>(); + for (int i = 0; i < plugins.size(); i++) { + SourceTableInfo sourceTableInfo = plugins.get(i); + SeaTunnelSource internalSource = sourceTableInfo.getSource(); + Config pluginConfig = pluginConfigs.get(i); + + DataStreamSource sourceStream = + executionEnvironment.fromSource( + new FlinkSource<>( + internalSource, + enableSourceKeepAliveIfNeeded( + internalSource, pluginConfig, envConfig)), + WatermarkStrategy.noWatermarks(), + String.format("%s-Source", internalSource.getPluginName())); + + if (pluginConfig.hasPath(EnvCommonOptions.PARALLELISM.key())) { + int parallelism = pluginConfig.getInt(EnvCommonOptions.PARALLELISM.key()); + sourceStream.setParallelism(parallelism); + } + + boolean isStreaming = + envConfig.hasPath("job.mode") + && STREAMING + .toString() + .equalsIgnoreCase(envConfig.getString("job.mode")); + + boolean enableSchemaChange = false; + for (Config cfg : pluginConfigs) { + if (cfg.hasPath("schema-changes.enabled") + && cfg.getBoolean("schema-changes.enabled")) { + enableSchemaChange = true; + break; + } + } + // add schema evolution functionality to cdc source + DataStream evolvedStream = null; + if (isStreaming + && enableSchemaChange + && sourceTableInfo.getSource() instanceof SupportSchemaEvolution) { + evolvedStream = + sourceStream.transform( + "schema-evolution", + TypeInformation.of(SeaTunnelRow.class), + createSchemaOperator( + jobContext.getJobId(), + (SupportSchemaEvolution) sourceTableInfo.getSource(), + pluginConfig)); + } + + if (evolvedStream != null) { + sources.add( + new DataStreamTableInfo( + evolvedStream, + sourceTableInfo.getCatalogTables(), + ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT))); + } else { + sources.add( + new DataStreamTableInfo( + sourceStream, + sourceTableInfo.getCatalogTables(), + ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT))); + } + } + return sources; + } + + private Config enableSourceKeepAliveIfNeeded( + SeaTunnelSource source, Config pluginConfig, Config currentEnvConfig) { + boolean isStreaming = + currentEnvConfig.hasPath("job.mode") + && STREAMING + .toString() + .equalsIgnoreCase(currentEnvConfig.getString("job.mode")); + boolean enableSchemaChange = + pluginConfig.hasPath("schema-changes.enabled") + && pluginConfig.getBoolean("schema-changes.enabled"); + boolean shouldEnableKeepAlive = + isStreaming + && enableSchemaChange + && source instanceof SupportSchemaEvolution + && !supportsSinkFunctionFinish(); + if (!shouldEnableKeepAlive) { + return currentEnvConfig; + } + return currentEnvConfig.withValue( + SOURCE_KEEP_ALIVE_CONFIG, ConfigValueFactory.fromAnyRef(true)); + } + + /** + * Returns the {@link SchemaOperator} instance to attach after schema-evolution-capable sources. + * Subclasses may override to return a version-specific operator (e.g. {@code SchemaOperator13} + * for Flink 1.13) that uses the public {@code ProcessingTimeService} API instead of reflection. + */ + protected SchemaOperator createSchemaOperator( + String jobId, SupportSchemaEvolution source, Config pluginConfig) { + return new SchemaOperator(jobId, source, pluginConfig); + } + + /** + * Returns {@code true} if the current Flink runtime's {@code SinkFunction} exposes a {@code + * finish()} method (introduced in Flink 1.14). When {@code false}, source keep-alive is enabled + * so pending schema changes can still be applied after all source subtasks finish. Subclasses + * may override with a hard-coded value to avoid reflection. + */ + protected boolean supportsSinkFunctionFinish() { + for (java.lang.reflect.Method method : + org.apache.flink.streaming.api.functions.sink.SinkFunction.class.getMethods()) { + if ("finish".equals(method.getName()) && method.getParameterCount() == 0) { + return true; + } + } + return false; + } + + @Override + protected List initializePlugins( + List jarPaths, List pluginConfigs) { + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableSourceFactory.class, ADD_URL_TO_CLASSLOADER); + SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = + new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER); + Function fallbackCreateSource = + sourcePluginDiscovery::createPluginInstance; + + List sources = new ArrayList<>(); + Set jars = new HashSet<>(); + for (Config sourceConfig : pluginConfigs) { + PluginIdentifier pluginIdentifier = + PluginIdentifier.of( + EngineType.SEATUNNEL.getEngine(), + PluginType.SOURCE.getType(), + sourceConfig.getString(PLUGIN_NAME.key())); + jars.addAll( + sourcePluginDiscovery.getPluginJarAndDependencyPaths( + Lists.newArrayList(pluginIdentifier))); + + Tuple2, List> source = + FactoryUtil.createAndPrepareSource( + ReadonlyConfig.fromConfig(sourceConfig), + classLoader, + pluginIdentifier.getPluginName(), + fallbackCreateSource, + (TableSourceFactory) + factoryDiscovery + .createOptionalPluginInstance(pluginIdentifier) + .orElse(null), + null); + + source._1().setJobContext(jobContext); + ensureJobModeMatch(jobContext, source._1()); + + sources.add(new SourceTableInfo(source._1(), source._2())); + } + jarPaths.addAll(jars); + return sources; + } +} diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index 94393b0ec45c..6cd1efcf2050 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -17,51 +17,22 @@ package org.apache.seatunnel.core.starter.flink.execution; -import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.PluginIdentifier; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.options.EnvCommonOptions; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.source.SupportSchemaEvolution; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.factory.FactoryUtil; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.common.constants.EngineType; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.core.starter.execution.SourceTableInfo; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; -import org.apache.seatunnel.translation.flink.schema.SchemaOperator; -import org.apache.seatunnel.translation.flink.source.FlinkSource; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import scala.Tuple2; - -import java.io.Serializable; import java.net.URL; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.function.Function; - -import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME; -import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_OUTPUT; -import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch; -import static org.apache.seatunnel.common.constants.JobMode.STREAMING; +/** + * Default (Flink 1.15+) source execution processor. Delegates entirely to {@link + * AbstractSourceExecuteProcessor}. For Flink 1.13, this class is shadowed at runtime by the version + * in {@code seatunnel-flink-13-starter}, which overrides {@link #createSchemaOperator} and {@link + * #supportsSinkFunctionFinish} with strongly-typed Flink 1.13 implementations that avoid + * reflection. + */ @SuppressWarnings("unchecked,rawtypes") -public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor { +public class SourceExecuteProcessor extends AbstractSourceExecuteProcessor { public SourceExecuteProcessor( List jarPaths, @@ -70,115 +41,4 @@ public SourceExecuteProcessor( JobContext jobContext) { super(jarPaths, envConfig, pluginConfigs, jobContext); } - - @Override - public List execute(List upstreamDataStreams) { - StreamExecutionEnvironment executionEnvironment = - flinkRuntimeEnvironment.getStreamExecutionEnvironment(); - List sources = new ArrayList<>(); - for (int i = 0; i < plugins.size(); i++) { - SourceTableInfo sourceTableInfo = plugins.get(i); - SeaTunnelSource internalSource = sourceTableInfo.getSource(); - Config pluginConfig = pluginConfigs.get(i); - FlinkSource flinkSource = new FlinkSource<>(internalSource, envConfig); - - DataStreamSource sourceStream = - executionEnvironment.fromSource( - flinkSource, - WatermarkStrategy.noWatermarks(), - String.format("%s-Source", internalSource.getPluginName())); - - if (pluginConfig.hasPath(EnvCommonOptions.PARALLELISM.key())) { - int parallelism = pluginConfig.getInt(EnvCommonOptions.PARALLELISM.key()); - sourceStream.setParallelism(parallelism); - } - - boolean isStreaming = - envConfig.hasPath("job.mode") - && STREAMING - .toString() - .equalsIgnoreCase(envConfig.getString("job.mode")); - - boolean enableSchemaChange = false; - for (Config cfg : pluginConfigs) { - if (cfg.hasPath("schema-changes.enabled") - && cfg.getBoolean("schema-changes.enabled")) { - enableSchemaChange = true; - break; - } - } - // add schema evolution functionality to cdc source - DataStream evolvedStream = null; - if (isStreaming - && enableSchemaChange - && sourceTableInfo.getSource() instanceof SupportSchemaEvolution) { - evolvedStream = - sourceStream.transform( - "schema-evolution", - TypeInformation.of(SeaTunnelRow.class), - new SchemaOperator( - jobContext.getJobId(), - (SupportSchemaEvolution) sourceTableInfo.getSource(), - pluginConfig)); - } - - if (evolvedStream != null) { - sources.add( - new DataStreamTableInfo( - evolvedStream, - sourceTableInfo.getCatalogTables(), - ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT))); - } else { - sources.add( - new DataStreamTableInfo( - sourceStream, - sourceTableInfo.getCatalogTables(), - ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT))); - } - } - return sources; - } - - @Override - protected List initializePlugins( - List jarPaths, List pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSourceFactory.class, ADD_URL_TO_CLASSLOADER); - SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = - new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER); - Function fallbackCreateSource = - sourcePluginDiscovery::createPluginInstance; - - List sources = new ArrayList<>(); - Set jars = new HashSet<>(); - for (Config sourceConfig : pluginConfigs) { - PluginIdentifier pluginIdentifier = - PluginIdentifier.of( - EngineType.SEATUNNEL.getEngine(), - PluginType.SOURCE.getType(), - sourceConfig.getString(PLUGIN_NAME.key())); - jars.addAll( - sourcePluginDiscovery.getPluginJarAndDependencyPaths( - Lists.newArrayList(pluginIdentifier))); - - Tuple2, List> source = - FactoryUtil.createAndPrepareSource( - ReadonlyConfig.fromConfig(sourceConfig), - classLoader, - pluginIdentifier.getPluginName(), - fallbackCreateSource, - (TableSourceFactory) - factoryDiscovery - .createOptionalPluginInstance(pluginIdentifier) - .orElse(null), - null); - - source._1().setJobContext(jobContext); - ensureJobModeMatch(jobContext, source._1()); - - sources.add(new SourceTableInfo(source._1(), source._2())); - } - jarPaths.addAll(jars); - return sources; - } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf index 9dd595f54785..7ffbf1c1c614 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change.conf @@ -20,7 +20,7 @@ env { # You can set engine configuration here - parallelism = 1 + parallelism = 5 job.mode = "STREAMING" checkpoint.interval = 5000 read_limit.bytes_per_second=7000000 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf index 09c88f312068..69816a41b9b6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_flink_schema_change_exactly_once.conf @@ -20,7 +20,7 @@ env { # You can set engine configuration here - parallelism = 1 + parallelism = 5 job.mode = "STREAMING" checkpoint.interval = 5000 read_limit.bytes_per_second=7000000 diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator13.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator13.java new file mode 100644 index 000000000000..3430c46b6188 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator13.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.flink.schema; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.source.SupportSchemaEvolution; + +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import lombok.extern.slf4j.Slf4j; + +/** + * Flink 1.13-specific extension of {@link SchemaOperator} that resolves two issues present when the + * fallback timer is placed in the common module: + * + *
    + *
  1. No reflection: {@link ProcessingTimeService} and {@link ProcessingTimeCallback} are + * imported directly as strongly-typed Flink 1.13 APIs. There is no risk of silent breakage + * from method renames in future Flink versions. + *
  2. No dead flag path: the timer callback fires on the Flink task thread via + * {@code ProcessingTimeService.registerTimer}, so {@link #handleFallbackTimerOnTaskThread()} + * is always reachable even when no more source data arrives and {@code processElement} is + * never called again. This is the exact scenario this workaround targets on Flink 1.13. + *
+ * + *

The base {@link SchemaOperator} carries none of this timer infrastructure; Flink 1.15 and + * later use that base class directly because checkpointing behaves correctly there. + */ +@Slf4j +public class SchemaOperator13 extends SchemaOperator { + + /** + * Guards against double-registration. All accesses happen on the Flink task thread + * (processElement, timer callbacks, notifyCheckpointComplete) + */ + private boolean fallbackTimerPending = false; + + public SchemaOperator13(String jobId, SupportSchemaEvolution source, Config pluginConfig) { + super(jobId, source, pluginConfig); + } + + /** + * Registers a processing-time timer that will call {@link #handleFallbackTimerOnTaskThread()} + * on the Flink task thread after {@link #CHECKPOINT_STALL_TIMEOUT_MS} milliseconds. + * + *

Using {@link ProcessingTimeService#registerTimer} instead of a background {@code + * ScheduledExecutorService} achieves two goals: + * + *

    + *
  • The callback is delivered on the task thread, so {@code output.collect} and operator + * state are accessed safely without additional synchronisation. + *
  • No daemon thread overhead is introduced for Flink 1.14+ users who use the common + * module's no-op default. + *
+ * + *

If a timer is already pending this call is a no-op to prevent duplicate firings. + */ + @Override + protected void scheduleFallbackTimer() { + if (fallbackTimerPending) { + return; + } + fallbackTimerPending = true; + + ProcessingTimeService pts = getProcessingTimeService(); + long fireAt = pts.getCurrentProcessingTime() + CHECKPOINT_STALL_TIMEOUT_MS; + + pts.registerTimer( + fireAt, + (ProcessingTimeCallback) + timestamp -> { + fallbackTimerPending = false; + try { + handleFallbackTimerOnTaskThread(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error( + "Fallback schema-change timer interrupted for job {}", + jobId, + e); + } + }); + + log.debug( + "Registered Flink processing-time fallback timer to fire in {}ms for job {}", + CHECKPOINT_STALL_TIMEOUT_MS, + jobId); + } +} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java index 0f993dcda0ff..0ce26072b3fa 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/SchemaOperator.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -54,7 +54,7 @@ /** * Operator placed after the source to handle schema evolution. * - *

schema change events are NOT processed synchronously in {@link #processElement}. Instead, they + *

Schema change events are NOT processed synchronously in {@link #processElement}. Instead, they * are buffered and deferred until an additional checkpoint cycle has completed after the first * checkpoint that observed the pending DDL. This wait ensures that when the sink executes ALTER * TABLE, all XA transactions from prior checkpoint cycles have been fully committed by the {@code @@ -64,6 +64,13 @@ * *

Per checkpoint cycle, at most ONE schema change is applied. If multiple DDLs arrive between * two checkpoints, they are processed across successive checkpoint cycles. + * + *

Flink 1.13 cannot continue checkpointing after some source subtasks have finished. When + * high-parallelism CDC jobs hit that condition, pending schema changes would otherwise stay blocked + * forever. Subclasses may override {@link #scheduleFallbackTimer()} to register a version-specific + * timer that detects the stall and re-enters the task thread via {@link + * #handleFallbackTimerOnTaskThread()} so the deferred DDL can still be applied safely. The base + * implementation is a no-op, keeping the common module free of version-specific overhead. */ @Slf4j public class SchemaOperator extends AbstractStreamOperator @@ -73,8 +80,14 @@ public class SchemaOperator extends AbstractStreamOperator private static final long SCHEMA_CHANGE_TIMEOUT_MS = 300_000L; private static final int CHECKPOINT_WAIT_ROUNDS = 1; + /** Exposed to subclasses so version-specific fallback timers can use the same threshold. */ + protected static final long CHECKPOINT_STALL_TIMEOUT_MS = 15_000L; + private final Map localSchemaState; - private String jobId; + + /** Exposed to subclasses for logging only. */ + protected String jobId; + private final SupportSchemaEvolution source; private final Config pluginConfig; private volatile Long lastProcessedEventTime; @@ -83,6 +96,13 @@ public class SchemaOperator extends AbstractStreamOperator private volatile boolean schemaChangePending = false; private long firstSeenCheckpointId = -1L; + /** + * Timestamp of the most recently completed checkpoint. Updated in {@link + * #notifyCheckpointComplete} and read by {@link #handleFallbackTimerOnTaskThread} to detect + * whether checkpoints have stalled. + */ + protected volatile long lastCheckpointCompletedMs = -1L; + private transient ListState localSchemaStateStore; private transient ListState lastProcessedEventTimeState; private transient ListState schemaChangePendingState; @@ -116,7 +136,8 @@ public void open() throws Exception { } @Override - public void processElement(StreamRecord streamRecord) { + public void processElement(StreamRecord streamRecord) + throws InterruptedException { SeaTunnelRow element = streamRecord.getValue(); if (!isSchemaEvolutionEnabled(pluginConfig)) { @@ -129,7 +150,7 @@ public void processElement(StreamRecord streamRecord) { && element.getOptions() != null) { Object object = element.getOptions().get("schema_change_event"); if (object instanceof SchemaChangeEvent) { - handleSchemaChangeDetected((SchemaChangeEvent) object, streamRecord.getTimestamp()); + handleSchemaChangeDetected((SchemaChangeEvent) object); return; } } @@ -143,7 +164,7 @@ public void processElement(StreamRecord streamRecord) { output.collect(streamRecord); } - private void handleSchemaChangeDetected(SchemaChangeEvent event, long timestamp) { + private void handleSchemaChangeDetected(SchemaChangeEvent event) { List supportedTypes = source.supports(); if (supportedTypes == null || supportedTypes.isEmpty()) { log.info("Source does not support any schema change types, skipping"); @@ -166,6 +187,7 @@ private void handleSchemaChangeDetected(SchemaChangeEvent event, long timestamp) pendingQueue.add(BufferedRecord.schemaChange(event)); schemaChangePending = true; + scheduleFallbackTimer(); } private void enqueueDataRecord(SeaTunnelRow row, long timestamp) { @@ -197,12 +219,12 @@ private TableIdentifier getPendingSchemaTableIdentifier() { * ensure safety: * *

    - *
  • first time seeing the DDL: record {@link #firstSeenCheckpointId} but do NOT - * broadcast the DDL yet. At this point the {@code FlinkGlobalCommitter} may still be - * running {@code XA COMMIT} for this checkpoint's prepared transactions, holding MDL - * locks on the sink table. - *
  • {@code checkpointId >= firstSeenCheckpointId + CHECKPOINT_WAIT_ROUNDS} : the XA - * COMMIT from the earlier checkpoint cycle is guaranteed to have finished (at least one + *
  • First time seeing the DDL: record {@link #firstSeenCheckpointId} but do NOT broadcast + * the DDL yet. At this point the {@code FlinkGlobalCommitter} may still be running {@code + * XA COMMIT} for this checkpoint's prepared transactions, holding MDL locks on the sink + * table. + *
  • {@code checkpointId >= firstSeenCheckpointId + CHECKPOINT_WAIT_ROUNDS}: the XA COMMIT + * from the earlier checkpoint cycle is guaranteed to have finished (at least one * additional checkpoint cycle has completed, which implies the committer ran). The sink's * ALTER TABLE will not encounter MDL lock, it is now safe to broadcast the DDL. *
@@ -210,20 +232,14 @@ private TableIdentifier getPendingSchemaTableIdentifier() { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); + lastCheckpointCompletedMs = System.currentTimeMillis(); if (!schemaChangePending || pendingQueue.isEmpty()) { return; } - BufferedRecord head = pendingQueue.peek(); - while (head != null && !head.isSchemaChange) { - output.collect(new StreamRecord<>(head.row, head.timestamp)); - pendingQueue.poll(); - head = pendingQueue.peek(); - } + BufferedRecord head = advancePastDataRecords(); if (head == null) { - schemaChangePending = false; - firstSeenCheckpointId = -1L; return; } @@ -266,6 +282,96 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { tableId, eventTime); + applyNextPendingSchemaChange(); + } + + /** + * Handles a checkpoint-stall fallback on the task thread. Must be called from the Flink task + * thread (e.g. via {@code ProcessingTimeService.registerTimer} callback) to keep {@code + * output.collect} and operator state accesses thread-safe. + * + *

Safety fence: the DDL is applied only when at least one checkpoint has already completed + * after the schema event ({@code firstSeenCheckpointId >= 0}). This preserves the guarantee + * that XA transactions from the earlier checkpoint cycle have finished before ALTER TABLE runs. + * If that fence has not been crossed yet, the fallback reschedules itself by calling {@link + * #scheduleFallbackTimer()} and returns without applying anything. + */ + protected void handleFallbackTimerOnTaskThread() throws InterruptedException { + if (!schemaChangePending || pendingQueue.isEmpty()) { + return; + } + + if (lastCheckpointCompletedMs > 0 + && System.currentTimeMillis() - lastCheckpointCompletedMs + < CHECKPOINT_STALL_TIMEOUT_MS) { + scheduleFallbackTimer(); + return; + } + + BufferedRecord head = advancePastDataRecords(); + if (head == null) { + return; + } + + if (firstSeenCheckpointId < 0) { + log.info( + "Fallback timer fired but no checkpoint has completed after schema event " + + "for table {} (epoch {}). Rescheduling fallback to preserve " + + "checkpoint-completion safety fence.", + head.schemaEvent.tableIdentifier(), + head.schemaEvent.getCreatedTime()); + scheduleFallbackTimer(); + return; + } + + log.warn( + "Checkpoint stall detected after first post-DDL checkpoint {}. " + + "Applying deferred DDL for table {} (epoch {}) via fallback timer. " + + "Note: data committed via normal Flink checkpoint lifecycle may be " + + "delayed until checkpoints resume.", + firstSeenCheckpointId, + head.schemaEvent.tableIdentifier(), + head.schemaEvent.getCreatedTime()); + + applyNextPendingSchemaChange(); + } + + /** + * Schedules a fallback timer that will call {@link #handleFallbackTimerOnTaskThread()} if + * checkpoints stall before the pending schema change can be applied. + * + *

The base implementation is a no-op: version-specific subclasses (e.g. {@code + * SchemaOperator13}) override this to register a timer via {@code ProcessingTimeService}, + * keeping the common module free of version-specific timer infrastructure and reflection. + */ + protected void scheduleFallbackTimer() { + // no-op by default; overridden in version-specific subclasses + } + + private BufferedRecord advancePastDataRecords() { + BufferedRecord head = pendingQueue.peek(); + while (head != null && !head.isSchemaChange) { + output.collect(new StreamRecord<>(head.row, head.timestamp)); + pendingQueue.poll(); + head = pendingQueue.peek(); + } + if (head == null) { + schemaChangePending = false; + firstSeenCheckpointId = -1L; + } + return head; + } + + private void applyNextPendingSchemaChange() throws InterruptedException { + BufferedRecord head = pendingQueue.peek(); + if (head == null || !head.isSchemaChange) { + return; + } + + SchemaChangeEvent event = head.schemaEvent; + TableIdentifier tableId = event.tableIdentifier(); + long eventTime = event.getCreatedTime(); + if (lastProcessedEventTime != null && eventTime <= lastProcessedEventTime) { log.warn( "Skipping outdated schema change event (epoch {} <= last processed {})", @@ -294,7 +400,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { "Schema change for table {} (epoch {}) confirmed by all sink subtasks.", tableId, eventTime); - pendingQueue.poll(); firstSeenCheckpointId = -1L; @@ -323,6 +428,7 @@ private void drainDataUntilNextSchemaChange() { "Released {} buffered data records. Another schema change pending, " + "waiting for next checkpoint.", released); + scheduleFallbackTimer(); return; } pendingQueue.poll(); @@ -474,11 +580,6 @@ private void sendSchemaChangeEventToDownstream(SchemaChangeEvent schemaChangeEve output.collect(new StreamRecord<>(broadcastRow)); } - @Override - public void close() throws Exception { - super.close(); - } - static class BufferedRecord { final boolean isSchemaChange; final SeaTunnelRow row; diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java index 8d99f433a391..3cd96d43dc6f 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/schema/coordinator/LocalSchemaCoordinator.java @@ -41,6 +41,12 @@ * Local coordinator for schema change synchronization. This coordinator only manages temporary * communication between SchemaOperator and sink subtasks. All persistent state is managed by * BroadcastSchemaSinkOperator in Flink State. + * + *

Schema changes (DDL like ALTER TABLE) are database-level operations that only need to be + * executed once. In Flink's parallel execution model, SchemaOperator sends schema change events via + * output.collect() which routes to only ONE downstream subtask based on partitioning. Therefore, + * this coordinator completes schema change requests when ANY single subtask successfully applies + * the change, rather than waiting for all subtasks. */ @Slf4j public class LocalSchemaCoordinator { @@ -138,24 +144,27 @@ public void unregisterSinkSubtask(int subtaskId) { remaining, jobId); + // Check if any pending requests can now be completed + // (Since we only need 1 ACK for DDL, this typically won't change anything, + // but we keep it for edge cases where all subtasks close before any ACK) for (Map.Entry entry : pendingRequests.entrySet()) { String key = entry.getKey(); TimestampedPendingRequest request = entry.getValue(); Set applied = receivedAcks.get(key); - int expectedActive = Math.max(remaining, 1); - if (applied != null && applied.size() >= expectedActive) { + // If we already have at least 1 ACK, complete the request + if (applied != null && !applied.isEmpty()) { if (request.appliedPhaseCompleteAtomic.compareAndSet(false, true)) { - boolean allSuccess = request.allSuccess.get(); - request.future.complete(allSuccess); + boolean success = request.allSuccess.get(); + request.future.complete(success); log.info( - "After subtask {} unregistered, all {} active subtasks have applied " - + "schema change for table {} (epoch {}). Completing request with result: {}", + "After subtask {} unregistered, completing schema change request for " + + "table {} (epoch {}) with {} ACK(s). Result: {}", subtaskId, - expectedActive, request.tableId, request.epoch, - allSuccess); + applied.size(), + success); } } } @@ -218,8 +227,8 @@ public enum SchemaProcessingStatus { public boolean requestSchemaChange(TableIdentifier tableId, long epoch, long timeoutMs) throws InterruptedException, SchemaCoordinationException { String key = tableId.toString() + "#" + epoch; - int expectedAcks = activeSinkSubtasks.size(); - if (expectedAcks == 0) { + int totalSubtasks = activeSinkSubtasks.size(); + if (totalSubtasks == 0) { log.warn( "No active sink subtasks. Cannot coordinate schema change for table {} (epoch {}). " + "Assuming success to avoid deadlock.", @@ -227,11 +236,25 @@ public boolean requestSchemaChange(TableIdentifier tableId, long epoch, long tim epoch); return true; } + // Schema changes (DDL) are database-level operations that only need to execute once. + // Due to Flink's partitioning, only one subtask receives the schema change event, + // so we only need 1 ACK to confirm the DDL was applied successfully. + // + // Precondition: sink subtasks that do NOT receive the schema-change event directly + // (because Flink's partitioning routed it elsewhere) must have their local schema + // view refreshed through BroadcastSchemaSinkOperator's broadcast/state path. + // If that broadcast path is incomplete, those subtasks will silently apply the old + // schema to new-format rows — a data-corruption risk. Any change to the broadcast + // path must preserve this invariant, and a multi-table (≥2 tables, parallelism ≥2) + // E2E test should guard it so regressions are caught immediately. + int expectedAcks = 1; log.info( - "Requesting schema change for table {} (epoch {}). Waiting for all {} sink subtasks to apply after checkpoint completion.", + "Requesting schema change for table {} (epoch {}). Waiting for at least {} of {} " + + "sink subtasks to apply the DDL (database-level operation).", tableId, epoch, - expectedAcks); + expectedAcks, + totalSubtasks); long now = System.currentTimeMillis(); TimestampedPendingRequest request = @@ -312,31 +335,42 @@ public void notifySchemaChangeApplied( } appliedSubtasks.add(subtaskId); - int currentExpected = Math.min(request.expectedAcks, activeSinkSubtasks.size()); - currentExpected = Math.max(currentExpected, 1); + // Schema changes only need 1 successful application since they're database-level operations + int requiredAcks = request.expectedAcks; // This is now 1 log.info( - "Subtask {} applied schema change for table {} (epoch {}), success: {}. {}/{} subtasks applied.", + "Subtask {} applied schema change for table {} (epoch {}), success: {}. " + + "{} subtask(s) applied (need {} for completion).", subtaskId, tableId, epoch, success, appliedSubtasks.size(), - currentExpected); + requiredAcks); if (!success) { request.allSuccess.set(false); } - if (appliedSubtasks.size() >= currentExpected) { + // Complete when we have at least 1 successful ACK (DDL only needs to run once) + if (appliedSubtasks.size() >= requiredAcks && success) { if (request.appliedPhaseCompleteAtomic.compareAndSet(false, true)) { - boolean allSuccess = request.allSuccess.get(); - request.future.complete(allSuccess); + request.future.complete(true); log.info( - "All {} active subtasks have applied schema change for table {} (epoch {}). Completing request with result: {}", - currentExpected, + "Schema change for table {} (epoch {}) successfully applied by subtask {}. " + + "DDL execution complete (database-level operation).", + tableId, + epoch, + subtaskId); + } + } else if (appliedSubtasks.size() >= requiredAcks && !success) { + // If the only ACK we got was a failure, complete with failure + if (request.appliedPhaseCompleteAtomic.compareAndSet(false, true)) { + request.future.complete(false); + log.error( + "Schema change for table {} (epoch {}) failed on subtask {}.", tableId, epoch, - allSuccess); + subtaskId); } } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java index 4c2a7b6d2e50..52a6d9c7a5de 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java @@ -49,6 +49,8 @@ public class FlinkSourceReader implements SourceReader> { + private static final String SOURCE_KEEP_ALIVE_CONFIG = "schema-changes.source-keep-alive"; + private final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReader.class); private final org.apache.seatunnel.api.source.SourceReader sourceReader; @@ -65,6 +67,8 @@ public class FlinkSourceReader private final ScheduledExecutorService scheduledExecutor; + private final boolean sourceKeepAliveEnabled; + public FlinkSourceReader( org.apache.seatunnel.api.source.SourceReader sourceReader, org.apache.seatunnel.api.source.SourceReader.Context context, @@ -81,6 +85,9 @@ public FlinkSourceReader( this.sourceReader = sourceReader; this.context = context; this.flinkRowCollector = new FlinkRowCollector(envConfig, context.getMetricsContext()); + this.sourceKeepAliveEnabled = + envConfig.hasPath(SOURCE_KEEP_ALIVE_CONFIG) + && envConfig.getBoolean(SOURCE_KEEP_ALIVE_CONFIG); } @Override @@ -108,8 +115,11 @@ public InputStatus pollNext(ReaderOutput output) throws Exception return InputStatus.NOTHING_AVAILABLE; } } else { - // reduce CPU idle - Thread.sleep(DEFAULT_WAIT_TIME_MILLIS); + if (sourceKeepAliveEnabled) { + // Flink 1.13 requires idle source subtasks to stay alive so checkpoints continue. + Thread.sleep(DEFAULT_WAIT_TIME_MILLIS); + return InputStatus.NOTHING_AVAILABLE; + } } return inputStatus; } @@ -132,6 +142,10 @@ public CompletableFuture isAvailable() { @Override public void addSplits(List> splits) { + if (!splits.isEmpty() && context instanceof FlinkSourceReaderContext) { + ((FlinkSourceReaderContext) context).resetNoMoreElementEvent(); + inputStatus = InputStatus.MORE_AVAILABLE; + } sourceReader.addSplits( splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList())); } @@ -144,7 +158,8 @@ public void notifyNoMoreSplits() { @Override public void handleSourceEvents(SourceEvent sourceEvent) { if (sourceEvent instanceof NoMoreElementEvent) { - inputStatus = InputStatus.END_OF_INPUT; + inputStatus = + sourceKeepAliveEnabled ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT; } if (sourceEvent instanceof SourceEventWrapper) { sourceReader.handleSourceEvent((((SourceEventWrapper) sourceEvent).getSourceEvent())); diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java index 2b20e0d4047f..458b240d4d28 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java @@ -101,6 +101,10 @@ public boolean isSendNoMoreElementEvent() { return isSendNoMoreElementEvent.get(); } + public void resetNoMoreElementEvent() { + isSendNoMoreElementEvent.set(false); + } + @Override public EventListener getEventListener() { return eventListener; diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java index fd3b7d212616..f1873556963c 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/schema/SchemaOperatorTest.java @@ -176,6 +176,76 @@ void testCoordinationFailureKeepsBufferedRecordsBlocked() throws Exception { assertTrue(pendingQueue.peek().isSchemaChange); } + /** + * Verifies that {@link SchemaOperator#handleFallbackTimerOnTaskThread()} correctly respects the + * checkpoint-completion safety fence even when called from a stall-detection timer. + * + *

The test invokes the handler directly (as if a processing-time timer fired) to keep the + * unit test independent of Flink's timer infrastructure. In production, the handler is called + * by {@link SchemaOperator13#scheduleFallbackTimer()} via {@code + * ProcessingTimeService.registerTimer}. + * + *

The base {@link SchemaOperator#scheduleFallbackTimer()} is a no-op; this test verifies + * only the handler logic, not the scheduling mechanism. + */ + @Test + void testFallbackTimerRespectsCheckpointSafetyFence() throws Exception { + LocalSchemaCoordinator coordinator = Mockito.mock(LocalSchemaCoordinator.class); + Mockito.when( + coordinator.requestSchemaChange( + Mockito.any(), Mockito.anyLong(), Mockito.anyLong())) + .thenReturn(true); + + OperatorTestContext context = createOperator(false); + setField(context.operator, "coordinator", coordinator); + + AlterTableAddColumnEvent event = createSchemaChangeEvent(); + SeaTunnelRow row = createDataRow("row-released-after-fallback"); + + context.operator.processElement(new StreamRecord<>(createSchemaRow(event), 400L)); + context.operator.processElement(new StreamRecord<>(row, 401L)); + + // Simulate timer firing before any checkpoint has completed (firstSeenCheckpointId < 0). + // The handler must NOT apply the DDL — it must call scheduleFallbackTimer() to wait for + // the checkpoint-completion safety fence (guards XA/MDL conflicts). + invokeNoArgMethod(context.operator, "handleFallbackTimerOnTaskThread"); + + assertTrue(context.output.records.isEmpty()); + assertTrue(getBooleanField(context.operator, "schemaChangePending")); + assertEquals(2, getPendingQueue(context.operator).size()); + assertEquals(-1L, getLongField(context.operator, "firstSeenCheckpointId")); + Mockito.verifyNoInteractions(coordinator); + + // Complete the first post-DDL checkpoint — sets firstSeenCheckpointId, not yet safe to + // apply (need one additional round, so notifyCheckpointComplete stops here). + context.operator.notifyCheckpointComplete(40L); + + assertTrue(context.output.records.isEmpty()); + assertEquals(40L, getLongField(context.operator, "firstSeenCheckpointId")); + assertTrue(getBooleanField(context.operator, "schemaChangePending")); + Mockito.verifyNoInteractions(coordinator); + + // Simulate checkpoint stall: move lastCheckpointCompletedMs into the past beyond + // CHECKPOINT_STALL_TIMEOUT_MS (15 s). This mirrors the Flink 1.13 behaviour where + // high-parallelism CDC jobs stop checkpointing after some source subtasks finish. + setField( + context.operator, + "lastCheckpointCompletedMs", + System.currentTimeMillis() - 20_000L); + + // Simulate timer firing again. firstSeenCheckpointId >= 0 and checkpoint has stalled, + // so the safety fence is satisfied — the DDL can now be applied. + invokeNoArgMethod(context.operator, "handleFallbackTimerOnTaskThread"); + + assertEquals(2, context.output.records.size()); + assertSchemaBroadcast(context.output.records.get(0), event); + assertEquals(row, context.output.records.get(1).getValue()); + assertFalse(getBooleanField(context.operator, "schemaChangePending")); + assertTrue(getPendingQueue(context.operator).isEmpty()); + Mockito.verify(coordinator) + .requestSchemaChange(event.tableIdentifier(), event.getCreatedTime(), 300_000L); + } + private static OperatorTestContext createOperator(boolean restored) throws Exception { return createOperator(new OperatorStateStoreStub(), restored); } @@ -285,6 +355,12 @@ private static void setField(Object target, Class owner, String fieldName, Ob field.set(target, value); } + private static Object invokeNoArgMethod(Object target, String methodName) throws Exception { + java.lang.reflect.Method method = target.getClass().getDeclaredMethod(methodName); + method.setAccessible(true); + return method.invoke(target); + } + private static Field findField(Class type, String fieldName) throws NoSuchFieldException { Class current = type; while (current != null) { diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java index 50be1a39d279..943d48b43d8f 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/test/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterTest.java @@ -20,6 +20,10 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.junit.jupiter.api.Assertions; @@ -28,7 +32,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; class FlinkSinkWriterTest { @@ -83,13 +89,54 @@ void testSnapshotStateWithoutPrepareCommitFallsBack() throws Exception { Assertions.assertEquals("state-3", states.get(0).getState()); } + @Test + void testSchemaChangeEventDoesNotForceCommit() throws Exception { + SchemaAwareRecordingSinkWriter delegate = new SchemaAwareRecordingSinkWriter(); + RecordingContext context = new RecordingContext(); + + FlinkSinkWriter flinkSinkWriter = + new FlinkSinkWriter<>(delegate, 7L, context); + + AlterTableAddColumnEvent event = + AlterTableAddColumnEvent.add( + TableIdentifier.of("catalog", "database", "table"), + PhysicalColumn.of( + "added_col", + org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE, + 64L, + true, + null, + null)); + event.setJobId("job-under-test"); + SeaTunnelRow schemaEvent = new SeaTunnelRow(0); + Map options = new LinkedHashMap<>(); + options.put("schema_change_event", event); + options.put("schema_subtask_id", 0L); + schemaEvent.setOptions(options); + flinkSinkWriter.write(schemaEvent, null); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, "value"); + flinkSinkWriter.write(row, null); + + // Schema change should apply without forcing commit - commits happen via normal Flink + // lifecycle + Assertions.assertEquals(1, delegate.writtenRows.size()); + Assertions.assertEquals(Collections.emptyList(), delegate.prepareCommitCalls); + Assertions.assertEquals(1, delegate.appliedSchemaChanges.size()); + Assertions.assertEquals(event, delegate.appliedSchemaChanges.get(0)); + } + private static class RecordingSinkWriter implements SinkWriter { - private final List prepareCommitCalls = new ArrayList<>(); - private final List snapshotCalls = new ArrayList<>(); + protected final List prepareCommitCalls = new ArrayList<>(); + protected final List snapshotCalls = new ArrayList<>(); + protected final List writtenRows = new ArrayList<>(); @Override - public void write(SeaTunnelRow element) throws IOException {} + public void write(SeaTunnelRow element) throws IOException { + writtenRows.add(element); + } @Override public Optional prepareCommit() { @@ -116,6 +163,19 @@ public void abortPrepare() {} public void close() throws IOException {} } + private static class SchemaAwareRecordingSinkWriter extends RecordingSinkWriter + implements SupportSchemaEvolutionSinkWriter { + + private final List + appliedSchemaChanges = new ArrayList<>(); + + @Override + public void applySchemaChange( + org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent event) { + appliedSchemaChanges.add(event); + } + } + private static class RecordingContext implements SinkWriter.Context { @Override