Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/core-hadoop2-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [ '11' ]
jdk: [ '11' , '17' ]
Comment thread
xxubai marked this conversation as resolved.
Outdated
name: Build Amoro with JDK ${{ matrix.jdk }}
steps:
- uses: actions/checkout@v3
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/core-hadoop3-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jdk: [ '11' ]
jdk: [ '11', '17' ]
spark: [ '3.3','3.4', '3.5' ]
exclude:
- jdk: '17'
spark: '3.3'
- jdk: '17'
spark: '3.4'
name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }}
steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Amoro contains modules as below:

## Building

Amoro is built using Maven with JDK 8, 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module).
Amoro is built using Maven with JDK 11 and 17(required for `amoro-format-mixed/amoro-mixed-trino` module, experimental for other modules).

* Build all modules without `amoro-mixed-trino`: `./mvnw clean package`
* Build and skip tests: `./mvnw clean package -DskipTests`
Expand Down
2 changes: 0 additions & 2 deletions amoro-format-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
<name>Amoro Project Hudi Format</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-hive</artifactId>
Expand Down Expand Up @@ -422,7 +430,8 @@
<value>org.apache.amoro.listener.AmoroRunListener</value>
</property>
</properties>
<argLine>-verbose:class</argLine>
<excludedGroups>${surefire.excludedGroups.jdk}</excludedGroups>
<argLine>${amoro.surefire.baseArgLine}</argLine>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
authenticatedFileIO.doAs(
() -> {
try {
method.setAccessible(true);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KerberosInvocationHandler currently proxies methods exposed through the proxied object's interfaces.

In practice, the invocation path goes through public interfaces such as OneInputStreamOperator and SourceFunction. As a result, these method invocations do not require method.setAccessible(true).

Keeping method.setAccessible(true) would introduce unnecessary deep-reflection behavior, which is exactly what this JDK17 compatibility change is trying to avoid.

return method.invoke(obj, args);
} catch (Throwable e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.amoro.table.MixedTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.FlinkRuntimeException;
Expand All @@ -45,12 +44,10 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -244,20 +241,16 @@ private void checkErrorAndRethrow() {
}

private String generateRocksDBPath(FunctionContext context, String tableName) {
String tmpPath = getTmpDirectoryFromTMContainer(context);
String tmpPath = getTmpDirectory(context);
Comment thread
xxubai marked this conversation as resolved.
Outdated
File db = new File(tmpPath, tableName + "-lookup-" + UUID.randomUUID());
return db.toString();
}

private static String getTmpDirectoryFromTMContainer(FunctionContext context) {
try {
Field field = context.getClass().getDeclaredField("context");
field.setAccessible(true);
StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) field.get(context);
String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
private static String getTmpDirectory(FunctionContext context) {
String configuredTmpDir = context.getJobParameter("java.io.tmpdir", null);
if (configuredTmpDir != null && !configuredTmpDir.isEmpty()) {
return configuredTmpDir;
}
return System.getProperty("java.io.tmpdir");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,23 @@
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplitState;
import org.apache.amoro.flink.read.hybrid.split.SplitRequestEvent;
import org.apache.amoro.flink.util.FlinkClassReflectionUtil;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
Expand Down Expand Up @@ -132,25 +130,21 @@ public ReaderOutput<T> wrapOutput(ReaderOutput<T> output) {
return new MixedFormatReaderOutput<>(output);
}

/**
* There is a case that the watermark in {@link WatermarkOutputMultiplexer.OutputState} has been
* updated, but watermark has not been emitted for that when {@link
* WatermarkOutputMultiplexer#onPeriodicEmit} called, the outputState has been removed by {@link
* WatermarkOutputMultiplexer#unregisterOutput(String)} after split finished. Wrap {@link
* ReaderOutput} to call {@link
* ProgressiveTimestampsAndWatermarks.SplitLocalOutputs#emitPeriodicWatermark()} when split
* finishes.
*/
/** Wrap split outputs so we can flush any pending periodic watermark before release. */
static class MixedFormatReaderOutput<T> implements ReaderOutput<T> {

private final ReaderOutput<T> internal;
private final SourceOutputWithWatermarks<T> watermarkOutput;
private final Map<String, SourceOutput<T>> splitOutputs = new HashMap<>();

@SuppressWarnings("unchecked")
public MixedFormatReaderOutput(ReaderOutput<T> readerOutput) {
Preconditions.checkArgument(
readerOutput instanceof SourceOutputWithWatermarks,
"readerOutput should be SourceOutputWithWatermarks, but was %s",
readerOutput.getClass());
this.internal = readerOutput;
this.watermarkOutput = (SourceOutputWithWatermarks<T>) readerOutput;
}

@Override
Expand Down Expand Up @@ -180,14 +174,28 @@ public void markActive() {

@Override
public SourceOutput<T> createOutputForSplit(String splitId) {
return internal.createOutputForSplit(splitId);
SourceOutput<T> splitOutput = internal.createOutputForSplit(splitId);
splitOutputs.put(splitId, splitOutput);
return splitOutput;
}

@Override
public void releaseOutputForSplit(String splitId) {
Object splitLocalOutput = FlinkClassReflectionUtil.getSplitLocalOutput(internal);
FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput);
emitPeriodicWatermark(splitOutputs.remove(splitId));
internal.releaseOutputForSplit(splitId);
}

private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
if (splitOutput == null) {
return;
}

if (splitOutput instanceof SourceOutputWithWatermarks) {
((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
return;
}

watermarkOutput.emitPeriodicWatermark();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
Expand All @@ -49,7 +48,9 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.FlinkInputFormat;
Expand Down Expand Up @@ -243,37 +244,55 @@ public DataStream<RowData> buildUnkeyedTableSource(String scanStartupMode) {
.properties(properties)
.flinkConf(flinkConf)
.limit(limit);
Long startSnapshotId = null;
if (MixedFormatValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(scanStartupMode)) {
Optional<Snapshot> startSnapshotOptional =
Optional.ofNullable(tableLoader.loadTable().currentSnapshot());
if (startSnapshotOptional.isPresent()) {
Snapshot snapshot = startSnapshotOptional.get();
startSnapshotId = snapshot.snapshotId();
LOG.info(
"Get starting snapshot id {} based on scan startup mode {}",
snapshot.snapshotId(),
scanStartupMode);
builder.startSnapshotId(snapshot.snapshotId());
builder.startSnapshotId(startSnapshotId);
}
}
DataStream<RowData> origin = builder.build();
return wrapKrb(origin).assignTimestampsAndWatermarks(watermarkStrategy);
return wrapKrb(origin, startSnapshotId).assignTimestampsAndWatermarks(watermarkStrategy);
}

/** extract op from dataStream, and wrap krb support */
private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
private DataStream<RowData> wrapKrb(DataStream<RowData> ds, Long startSnapshotId) {
Comment thread
xxubai marked this conversation as resolved.
Outdated
IcebergClassUtil.clean(env);
Transformation origin = ds.getTransformation();
int scanParallelism =
flinkConf
.getOptional(MixedFormatValidator.SCAN_PARALLELISM)
.orElse(origin.getParallelism());
Table table = mixedTable.asUnkeyedTable();
Schema projectedIcebergSchema =
projectedSchema == null
? mixedTable.schema()
: FlinkSchemaUtil.convert(
mixedTable.schema(),
org.apache.amoro.flink.FlinkSchemaUtil.filterWatermark(projectedSchema));

if (origin instanceof OneInputTransformation) {
OneInputTransformation<RowData, RowData> tf =
(OneInputTransformation<RowData, RowData>) ds.getTransformation();
OneInputStreamOperatorFactory op = (OneInputStreamOperatorFactory) tf.getOperatorFactory();
ProxyFactory<FlinkInputFormat> inputFormatProxyFactory =
IcebergClassUtil.getInputFormatProxyFactory(op, mixedTable.io(), mixedTable.schema());
IcebergClassUtil.getInputFormatProxyFactory(
tableLoader,
table,
mixedTable.io(),
mixedTable.schema(),
projectedIcebergSchema,
flinkConf,
properties,
filters,
limit,
startSnapshotId);

if (tf.getInputs().isEmpty()) {
return env.addSource(
Expand Down Expand Up @@ -305,7 +324,7 @@ private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
(InputFormatSourceFunction) IcebergClassUtil.getSourceFunction(source);

InputFormat inputFormatProxy =
(InputFormat) ProxyUtil.getProxy(function.getFormat(), mixedTable.io());
new KerberosAwareInputFormat<>(function.getFormat(), mixedTable.io());
DataStreamSource sourceStream =
env.createInput(inputFormatProxy, tfSource.getOutputType())
.setParallelism(scanParallelism);
Expand Down
Loading
Loading