diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java index 06e71dd4a6c3..30da365f2832 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffset.java @@ -111,6 +111,12 @@ public long getTimestamp() { return longOffsetValue(offset, TIMESTAMP_KEY); } + public boolean isTimestampOffset() { + return offset.containsKey(TIMESTAMP_KEY) + && !offset.containsKey(BINLOG_FILENAME_OFFSET_KEY) + && !offset.containsKey(BINLOG_POSITION_OFFSET_KEY); + } + public Long getServerId() { return longOffsetValue(offset, SERVER_ID_KEY); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index 9e6b03db58f4..e47a85aa3476 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -301,7 +301,8 @@ private MySqlOffsetContext loadStartingOffsetState( private Offset getInitOffset(SourceSplitBase mySqlSplit) { StartupMode startupMode = getSourceConfig().getStartupConfig().getStartupMode(); - if (startupMode.equals(StartupMode.TIMESTAMP)) { + Offset splitStartupOffset = mySqlSplit.asIncrementalSplit().getStartupOffset(); + if (shouldResolveTimestampStartupOffset(startupMode, splitStartupOffset)) { long timestamp = getSourceConfig().getStartupConfig().getTimestamp(); try (JdbcConnection jdbcConnection = getDataSourceDialect().openJdbcConnection(getSourceConfig())) { @@ -310,10 +311,17 @@ private Offset getInitOffset(SourceSplitBase mySqlSplit) { throw new SeaTunnelException(e); } } else { - return mySqlSplit.asIncrementalSplit().getStartupOffset(); + return splitStartupOffset; } } + static boolean shouldResolveTimestampStartupOffset( + StartupMode startupMode, Offset startupOffset) { + return startupMode.equals(StartupMode.TIMESTAMP) + && startupOffset instanceof BinlogOffset + && ((BinlogOffset) startupOffset).isTimestampOffset(); + } + private boolean isBinlogAvailable(MySqlOffsetContext offset) { String gtidStr = offset.gtidSet(); if (gtidStr != null) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java index e3b8ea7ac0eb..f0ade84e4bf9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; +import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask; import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; @@ -72,7 +73,7 @@ public void execute(FetchTask.Context context) throws Exception { StartupConfig startupConfig = sourceFetchContext.getSourceConfig().getStartupConfig(); StartupMode startupMode = startupConfig.getStartupMode(); - if (startupMode.equals(StartupMode.TIMESTAMP)) { + if (shouldFilterByTimestamp(startupMode, split.getStartupOffset())) { log.info( "Starting MySQL binlog reader,with timestamp filter {}", startupConfig.getTimestamp()); @@ -139,6 +140,12 @@ public SourceSplitBase getSplit() { return split; } + static boolean shouldFilterByTimestamp(StartupMode startupMode, Offset startupOffset) { + return startupMode.equals(StartupMode.TIMESTAMP) + && startupOffset instanceof BinlogOffset + && ((BinlogOffset) startupOffset).isTimestampOffset(); + } + /** * A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark * to highWatermark) binlog. diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlTimestampStartupOffsetTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlTimestampStartupOffsetTest.java new file mode 100644 index 000000000000..3e8bc851b151 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlTimestampStartupOffsetTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; + +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; +import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit; +import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.debezium.relational.TableId; + +import java.util.Collections; + +public class MySqlTimestampStartupOffsetTest { + + @Test + public void testTimestampStartupResolvesConfiguredTimestampOnlyForBootstrapOffset() { + BinlogOffset timestampOffset = new BinlogOffset(1716076800L); + BinlogOffset restoredOffset = new BinlogOffset("mysql-bin.000021", 4096L); + + Assertions.assertTrue( + MySqlSourceFetchTaskContext.shouldResolveTimestampStartupOffset( + StartupMode.TIMESTAMP, timestampOffset)); + Assertions.assertFalse( + MySqlSourceFetchTaskContext.shouldResolveTimestampStartupOffset( + StartupMode.TIMESTAMP, restoredOffset)); + } + + @Test + public void testCheckpointRestoreUsesPersistedBinlogOffsetAfterTimestampBootstrap() { + BinlogOffset timestampOffset = new BinlogOffset(1716076800L); + IncrementalSplit incrementalSplit = + new IncrementalSplit( + "incremental-split", + Collections.singletonList(TableId.parse("test.orders")), + timestampOffset, + BinlogOffset.NO_STOPPING_OFFSET, + Collections.emptyList()); + + IncrementalSplitState splitState = new IncrementalSplitState(incrementalSplit); + BinlogOffset checkpointOffset = new BinlogOffset("mysql-bin.000021", 4096L); + splitState.setStartupOffset(checkpointOffset); + + IncrementalSplit restoredSplit = splitState.toSourceSplit(); + + Assertions.assertEquals(checkpointOffset, restoredSplit.getStartupOffset()); + Assertions.assertFalse( + MySqlSourceFetchTaskContext.shouldResolveTimestampStartupOffset( + StartupMode.TIMESTAMP, restoredSplit.getStartupOffset())); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTaskTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTaskTest.java new file mode 100644 index 000000000000..7da89b3b6937 --- /dev/null +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTaskTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog; + +import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; +import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MySqlBinlogFetchTaskTest { + + @Test + public void testTimestampFilterIsSkippedForRestoredBinlogOffset() { + BinlogOffset timestampOffset = new BinlogOffset(1716076800L); + BinlogOffset restoredOffset = new BinlogOffset("mysql-bin.000021", 4096L); + + Assertions.assertTrue( + MySqlBinlogFetchTask.shouldFilterByTimestamp( + StartupMode.TIMESTAMP, timestampOffset)); + Assertions.assertFalse( + MySqlBinlogFetchTask.shouldFilterByTimestamp( + StartupMode.TIMESTAMP, restoredOffset)); + } +}