Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SupportSchemaEvolution;
import org.apache.seatunnel.translation.flink.schema.SchemaOperator;
import org.apache.seatunnel.translation.flink.schema.SchemaOperator13;

import java.net.URL;
import java.util.List;

/**
* Flink 1.13-specific source execution processor. Shadows the common {@code SourceExecuteProcessor}
* at runtime (same package, same class name) to provide two Flink 1.13-specific behaviours without
* using reflection:
*
* <ol>
* <li>{@link #createSchemaOperator} returns {@link SchemaOperator13}, which registers the
* checkpoint-stall fallback timer via the strongly-typed {@code
* ProcessingTimeService.registerTimer} API instead of a background {@code
* ScheduledExecutorService} + reflection.
* <li>{@link #supportsSinkFunctionFinish} hard-codes {@code false}: Flink 1.13's {@code
* SinkFunction} does not expose a {@code finish()} method, so this fact is known at compile
* time and no reflection is needed.
* </ol>
*/
@SuppressWarnings("unchecked,rawtypes")
public class SourceExecuteProcessor extends AbstractSourceExecuteProcessor {

public SourceExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
List<? extends Config> pluginConfigs,
JobContext jobContext) {
super(jarPaths, envConfig, pluginConfigs, jobContext);
}

@Override
protected SchemaOperator createSchemaOperator(
String jobId, SupportSchemaEvolution source, Config pluginConfig) {
return new SchemaOperator13(jobId, source, pluginConfig);
}

/**
* Flink 1.13's {@code SinkFunction} does not have a {@code finish()} method, so source
* keep-alive must always be enabled when schema evolution is active. Returns {@code false}
* directly rather than using reflection.
*/
@Override
protected boolean supportsSinkFunctionFinish() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportSchemaEvolution;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.translation.flink.schema.SchemaOperator;
import org.apache.seatunnel.translation.flink.source.FlinkSource;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import scala.Tuple2;

import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_OUTPUT;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;
import static org.apache.seatunnel.common.constants.JobMode.STREAMING;

@SuppressWarnings("unchecked,rawtypes")
public abstract class AbstractSourceExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {

private static final String SOURCE_KEEP_ALIVE_CONFIG = "schema-changes.source-keep-alive";

protected AbstractSourceExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
List<? extends Config> pluginConfigs,
JobContext jobContext) {
super(jarPaths, envConfig, pluginConfigs, jobContext);
}

@Override
public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataStreams) {
StreamExecutionEnvironment executionEnvironment =
flinkRuntimeEnvironment.getStreamExecutionEnvironment();
List<DataStreamTableInfo> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
SourceTableInfo sourceTableInfo = plugins.get(i);
SeaTunnelSource internalSource = sourceTableInfo.getSource();
Config pluginConfig = pluginConfigs.get(i);

DataStreamSource<SeaTunnelRow> sourceStream =
executionEnvironment.fromSource(
new FlinkSource<>(
internalSource,
enableSourceKeepAliveIfNeeded(
internalSource, pluginConfig, envConfig)),
WatermarkStrategy.noWatermarks(),
String.format("%s-Source", internalSource.getPluginName()));

if (pluginConfig.hasPath(EnvCommonOptions.PARALLELISM.key())) {
int parallelism = pluginConfig.getInt(EnvCommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}

boolean isStreaming =
envConfig.hasPath("job.mode")
&& STREAMING
.toString()
.equalsIgnoreCase(envConfig.getString("job.mode"));

boolean enableSchemaChange = false;
for (Config cfg : pluginConfigs) {
if (cfg.hasPath("schema-changes.enabled")
&& cfg.getBoolean("schema-changes.enabled")) {
enableSchemaChange = true;
break;
}
}
// add schema evolution functionality to cdc source
DataStream<SeaTunnelRow> evolvedStream = null;
if (isStreaming
&& enableSchemaChange
&& sourceTableInfo.getSource() instanceof SupportSchemaEvolution) {
evolvedStream =
sourceStream.transform(
"schema-evolution",
TypeInformation.of(SeaTunnelRow.class),
createSchemaOperator(
jobContext.getJobId(),
(SupportSchemaEvolution) sourceTableInfo.getSource(),
pluginConfig));
}

if (evolvedStream != null) {
sources.add(
new DataStreamTableInfo(
evolvedStream,
sourceTableInfo.getCatalogTables(),
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
} else {
sources.add(
new DataStreamTableInfo(
sourceStream,
sourceTableInfo.getCatalogTables(),
ReadonlyConfig.fromConfig(pluginConfig).get(PLUGIN_OUTPUT)));
}
}
return sources;
}

private Config enableSourceKeepAliveIfNeeded(
SeaTunnelSource source, Config pluginConfig, Config currentEnvConfig) {
boolean isStreaming =
currentEnvConfig.hasPath("job.mode")
&& STREAMING
.toString()
.equalsIgnoreCase(currentEnvConfig.getString("job.mode"));
boolean enableSchemaChange =
pluginConfig.hasPath("schema-changes.enabled")
&& pluginConfig.getBoolean("schema-changes.enabled");
boolean shouldEnableKeepAlive =
isStreaming
&& enableSchemaChange
&& source instanceof SupportSchemaEvolution
&& !supportsSinkFunctionFinish();
if (!shouldEnableKeepAlive) {
return currentEnvConfig;
}
return currentEnvConfig.withValue(
SOURCE_KEEP_ALIVE_CONFIG, ConfigValueFactory.fromAnyRef(true));
}

/**
* Returns the {@link SchemaOperator} instance to attach after schema-evolution-capable sources.
* Subclasses may override to return a version-specific operator (e.g. {@code SchemaOperator13}
* for Flink 1.13) that uses the public {@code ProcessingTimeService} API instead of reflection.
*/
protected SchemaOperator createSchemaOperator(
String jobId, SupportSchemaEvolution source, Config pluginConfig) {
return new SchemaOperator(jobId, source, pluginConfig);
}

/**
* Returns {@code true} if the current Flink runtime's {@code SinkFunction} exposes a {@code
* finish()} method (introduced in Flink 1.14). When {@code false}, source keep-alive is enabled
* so pending schema changes can still be applied after all source subtasks finish. Subclasses
* may override with a hard-coded value to avoid reflection.
*/
protected boolean supportsSinkFunctionFinish() {
for (java.lang.reflect.Method method :
org.apache.flink.streaming.api.functions.sink.SinkFunction.class.getMethods()) {
if ("finish".equals(method.getName()) && method.getParameterCount() == 0) {
return true;
}
}
return false;
}

@Override
protected List<SourceTableInfo> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSourceFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource =
sourcePluginDiscovery::createPluginInstance;

List<SourceTableInfo> sources = new ArrayList<>();
Set<URL> jars = new HashSet<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
EngineType.SEATUNNEL.getEngine(),
PluginType.SOURCE.getType(),
sourceConfig.getString(PLUGIN_NAME.key()));
jars.addAll(
sourcePluginDiscovery.getPluginJarAndDependencyPaths(
Lists.newArrayList(pluginIdentifier)));

Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
classLoader,
pluginIdentifier.getPluginName(),
fallbackCreateSource,
(TableSourceFactory)
factoryDiscovery
.createOptionalPluginInstance(pluginIdentifier)
.orElse(null),
null);

source._1().setJobContext(jobContext);
ensureJobModeMatch(jobContext, source._1());

sources.add(new SourceTableInfo(source._1(), source._2()));
}
jarPaths.addAll(jars);
return sources;
}
}
Loading
Loading