Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_RETENTION_SECS;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
Expand Down Expand Up @@ -246,6 +247,7 @@ public static void setUp() throws IllegalAccessException {
when(config.get(ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS)).thenReturn(3L);
when(config.get(ENTITY_CHANGE_LOG_RETENTION_SECS)).thenReturn(24 * 60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)).thenReturn(60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_POLL_LAG_SECS)).thenReturn(1L);
// Fix cache config for test
Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_RETENTION_SECS;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
Expand Down Expand Up @@ -165,6 +166,7 @@ public static void setUp() throws IllegalAccessException {
when(config.get(ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS)).thenReturn(3L);
when(config.get(ENTITY_CHANGE_LOG_RETENTION_SECS)).thenReturn(24 * 60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)).thenReturn(60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_POLL_LAG_SECS)).thenReturn(1L);
// Fix cache config for test
Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_RETENTION_SECS;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
Expand Down Expand Up @@ -110,6 +111,7 @@ public static void setUp() throws IOException, IllegalAccessException {
when(config.get(ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS)).thenReturn(3L);
when(config.get(ENTITY_CHANGE_LOG_RETENTION_SECS)).thenReturn(24 * 60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)).thenReturn(60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_POLL_LAG_SECS)).thenReturn(1L);
Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(false);

store = EntityStoreFactory.createEntityStore(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_RETENTION_SECS;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
Expand Down Expand Up @@ -129,6 +130,7 @@ public static void setUp() throws IOException, IllegalAccessException {
when(config.get(ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS)).thenReturn(3L);
when(config.get(ENTITY_CHANGE_LOG_RETENTION_SECS)).thenReturn(24 * 60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)).thenReturn(60 * 60L);
when(config.get(ENTITY_CHANGE_LOG_POLL_LAG_SECS)).thenReturn(1L);
// Fix cache config for test
Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private ConfigConstants() {}
/** The version number for the 1.3.0 release. */
public static final String VERSION_1_3_0 = "1.3.0";

/** The version number for the 1.4.0 release. */
public static final String VERSION_1_4_0 = "1.4.0";

/** The current version of backend storage initialization script. */
public static final String CURRENT_SCRIPT_VERSION = VERSION_1_3_0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.ENTITY_CHANGE_LOG_RETENTION_SECS;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
Expand Down Expand Up @@ -296,6 +297,7 @@ protected void initStore() throws IOException {
Mockito.when(config.get(ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS)).thenReturn(3L);
Mockito.when(config.get(ENTITY_CHANGE_LOG_RETENTION_SECS)).thenReturn(24 * 60 * 60L);
Mockito.when(config.get(ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)).thenReturn(60 * 60L);
Mockito.when(config.get(ENTITY_CHANGE_LOG_POLL_LAG_SECS)).thenReturn(1L);
Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
// Fix cache config for test
Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(CACHE_ENABLED);
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private Configs() {}
public static final long DEFAULT_ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS = 3L;
public static final long DEFAULT_ENTITY_CHANGE_LOG_RETENTION_SECS = 24 * 60 * 60L;
public static final long DEFAULT_ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS = 60 * 60L;
public static final long DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS = 1L;

public static final ConfigEntry<Long> ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS =
new ConfigBuilder("gravitino.entityChangeLog.pollIntervalSecs")
Expand All @@ -213,6 +214,22 @@ private Configs() {}
.checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
.createWithDefault(DEFAULT_ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS);

public static final ConfigEntry<Long> ENTITY_CHANGE_LOG_POLL_LAG_SECS =
new ConfigBuilder("gravitino.entityChangeLog.pollLagSecs")
.doc(
"The lag in seconds applied to the entity change log poller. The poller only consumes"
+ " change rows whose created_at is at least this many seconds in the past"
+ " (measured by the database clock). Auto-increment ids are assigned at INSERT but"
+ " become visible at COMMIT, so a lower id may commit after a higher id and be"
+ " skipped by an id-only cursor. Waiting until a row is this old gives the"
+ " inserting transaction of any earlier (smaller-id) row time to commit before the"
+ " cursor advances past it, reducing missed invalidations. The value should exceed"
+ " the longest expected write transaction duration. Set 0 to disable the lag.")
.version(ConfigConstants.VERSION_1_4_0)
.longConf()
.checkValue(value -> value >= 0, ConfigConstants.NON_NEGATIVE_NUMBER_ERROR_MSG)
.createWithDefault(DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS);

public static final ConfigEntry<Boolean> CATALOG_LOAD_ISOLATED =
new ConfigBuilder("gravitino.catalog.classloader.isolated")
.doc("Whether to load the catalog in an isolated classloader")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.gravitino.storage.relational;

import static org.apache.gravitino.Configs.DEFAULT_ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS;
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS;
import static org.apache.gravitino.Configs.DEFAULT_ENTITY_CHANGE_LOG_RETENTION_SECS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
Expand All @@ -38,8 +42,22 @@
*
* <p>The poller owns the single high-water mark for a Gravitino server process and dispatches each
* consumed batch to registered listeners. Listeners should only perform idempotent local cache
* invalidation. The cursor always advances after dispatch regardless of individual listener
* failures, so a faulty listener cannot block other listeners or prevent pruning.
* invalidation. Listener failures are isolated to polling progress: a failing listener is logged
* and does not stop the Gravitino server or prevent the poller from invoking the other listeners in
* the same cycle. However, because the process owns one shared cursor, any listener failure pauses
* forward progress: the cursor is <b>not</b> advanced past the batch, so newer change rows are not
* consumed until the failed batch is retried and every listener succeeds within the retention
* window. If a listener keeps failing, the poller keeps retrying the blocked batch until the
* listener recovers or the records age out by retention cleanup. Because invalidation is
* idempotent, re-dispatching an already-applied batch to a healthy listener is harmless.
*
* <p>The poller also applies a lagging high-water mark: it only consumes change rows whose {@code
* created_at} is at least {@code pollLagMs} in the past (by the database clock). Auto-increment ids
* are assigned at INSERT but become visible at COMMIT, so a lower id can commit after a higher id
* and be skipped by an id-only cursor. Because a smaller-id row was necessarily inserted no later,
* waiting until a row is {@code pollLagMs} old gives that row's inserting transaction time to
* commit before the cursor moves past it. As long as {@code pollLagMs} exceeds the longest write
* transaction, this prevents the commit-ordering gap from dropping an invalidation permanently.
*/
public class EntityChangeLogPoller implements AutoCloseable {

Expand All @@ -52,6 +70,7 @@ public class EntityChangeLogPoller implements AutoCloseable {
private final long pollIntervalSecs;
private final long retentionMs;
private final long cleanupIntervalMs;
private final long pollLagMs;
private final LongSupplier clockMs;

private ScheduledExecutorService scheduler;
Expand All @@ -66,8 +85,9 @@ public class EntityChangeLogPoller implements AutoCloseable {
public EntityChangeLogPoller(long pollIntervalSecs) {
this(
pollIntervalSecs,
TimeUnit.DAYS.toMillis(1),
TimeUnit.HOURS.toMillis(1),
TimeUnit.SECONDS.toMillis(DEFAULT_ENTITY_CHANGE_LOG_RETENTION_SECS),
TimeUnit.SECONDS.toMillis(DEFAULT_ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS),
TimeUnit.SECONDS.toMillis(DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS),
System::currentTimeMillis);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: hardcoded 1 should reference DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS.

This repeats the magic value that is also defined in Configs.DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS. If the default is ever changed in Configs, this single-arg constructor is silently left inconsistent.

// Prefer
TimeUnit.SECONDS.toMillis(DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it makes sense.

}

Expand All @@ -77,20 +97,38 @@ public EntityChangeLogPoller(long pollIntervalSecs) {
* @param pollIntervalSecs interval between successive polling cycles
* @param retentionMs entity change retention in milliseconds, or 0 to disable cleanup
* @param cleanupIntervalMs interval between successive cleanup attempts in milliseconds
* @param pollLagMs lag in milliseconds; only rows older than this (by the DB clock) are consumed,
* or 0 to disable the lag
*/
public EntityChangeLogPoller(long pollIntervalSecs, long retentionMs, long cleanupIntervalMs) {
this(pollIntervalSecs, retentionMs, cleanupIntervalMs, System::currentTimeMillis);
public EntityChangeLogPoller(
long pollIntervalSecs, long retentionMs, long cleanupIntervalMs, long pollLagMs) {
this(pollIntervalSecs, retentionMs, cleanupIntervalMs, pollLagMs, System::currentTimeMillis);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Preconditions error message omits the actual values.

"retentionMs must be greater than pollLagMs when cleanup is enabled"

When this fires at startup the operator sees an IllegalArgumentException without knowing which values triggered it. Prefer:

String.format(
    "retentionMs (%d) must be greater than pollLagMs (%d) when cleanup is enabled",
    retentionMs, pollLagMs)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@VisibleForTesting
EntityChangeLogPoller(
long pollIntervalSecs, long retentionMs, long cleanupIntervalMs, LongSupplier clockMs) {
long pollIntervalSecs,
long retentionMs,
long cleanupIntervalMs,
long pollLagMs,
LongSupplier clockMs) {
Preconditions.checkArgument(pollIntervalSecs > 0, "pollIntervalSecs must be positive");
Preconditions.checkArgument(retentionMs >= 0, "retentionMs must be non-negative");
Preconditions.checkArgument(cleanupIntervalMs > 0, "cleanupIntervalMs must be positive");
Preconditions.checkArgument(pollLagMs >= 0, "pollLagMs must be non-negative");
// A row becomes eligible for polling once it is pollLagMs old and eligible for pruning once it
// is retentionMs old. If cleanup is enabled (retentionMs > 0) but retentionMs is not strictly
// greater than pollLagMs, a row could be pruned before it is ever polled, silently dropping its
// invalidation.
Preconditions.checkArgument(
retentionMs == 0 || retentionMs > pollLagMs,
"retentionMs (%s) must be greater than pollLagMs (%s) when cleanup is enabled",
retentionMs,
pollLagMs);
this.pollIntervalSecs = pollIntervalSecs;
this.retentionMs = retentionMs;
this.cleanupIntervalMs = cleanupIntervalMs;
this.pollLagMs = pollLagMs;
this.clockMs = clockMs;
}

Expand Down Expand Up @@ -183,22 +221,41 @@ private synchronized void doPollChanges() {
}

List<EntityChangeRecord> dispatchedChanges = Collections.unmodifiableList(changes);
boolean allListenersSucceeded = true;
for (EntityChangeLogListener listener : listeners) {
try {
listener.onEntityChange(dispatchedChanges);
} catch (Exception e) {
allListenersSucceeded = false;
LOG.warn("Entity change listener {} failed", listener.getClass().getName(), e);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern: LOG.warn is likely too quiet for a persistent listener failure.

With the new cursor semantics, a listener failure blocks forward progress for ALL new change rows. If a listener fails on every cycle, the cache silently serves stale data until it recovers — or until retention cleanup prunes the stuck rows, at which point the invalidations are permanently lost. A single WARN per cycle is easy to miss in production.

Consider escalating to LOG.error, or tracking a consecutive-failure counter per listener and switching to LOG.error after N failures. At that point the operator has a closing window (bounded by retentionMs / pollIntervalMs cycles) before invalidations are permanently dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant about these two levels and adopt the suggestion.

}
}

entityPollHighWaterId = maxSeenId;
// Only advance the cursor when every listener applied the batch. A listener failure must not
// drop the batch's invalidations: keeping the cursor in place re-dispatches the same batch on
// the next cycle until all listeners succeed. Listeners are idempotent, so re-dispatching to an
// already-applied listener is harmless.
if (allListenersSucceeded) {
entityPollHighWaterId = maxSeenId;
} else {
// The cursor cannot advance until every listener applies the batch, so forward progress is
// paused and the same batch is re-dispatched every cycle. If this persists, the stuck rows
// will eventually be pruned by retention cleanup and their invalidations lost permanently,
// leaving caches to serve stale data. Surface this at ERROR so operators can act.
LOG.error(
"Entity change cursor is paused at id {} because at least one listener failed to apply "
+ "the current batch; invalidations will be lost if this is not resolved before the "
+ "stuck rows age past the retention window",
entityPollHighWaterId);
}
pruneExpiredChangesIfNeeded();
}

private List<EntityChangeRecord> fetchEntityChanges() {
return SessionUtils.getWithoutCommit(
EntityChangeLogMapper.class,
m -> m.selectEntityChanges(entityPollHighWaterId, ENTITY_CHANGE_POLLER_MAX_ROWS));
m ->
m.selectEntityChanges(entityPollHighWaterId, pollLagMs, ENTITY_CHANGE_POLLER_MAX_ROWS));
}

private static boolean handleInterruptIfAny(Throwable e, String context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void initialize(Config config) throws RuntimeException {
new EntityChangeLogPoller(
config.get(Configs.ENTITY_CHANGE_LOG_POLL_INTERVAL_SECS),
TimeUnit.SECONDS.toMillis(config.get(Configs.ENTITY_CHANGE_LOG_RETENTION_SECS)),
TimeUnit.SECONDS.toMillis(config.get(Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)));
TimeUnit.SECONDS.toMillis(config.get(Configs.ENTITY_CHANGE_LOG_CLEANUP_INTERVAL_SECS)),
TimeUnit.SECONDS.toMillis(config.get(Configs.ENTITY_CHANGE_LOG_POLL_LAG_SECS)));
this.entityChangeLogPoller.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public interface EntityChangeLogMapper {

@SelectProvider(type = EntityChangeLogSQLProviderFactory.class, method = "selectEntityChanges")
List<EntityChangeRecord> selectEntityChanges(
@Param("lastConsumedId") long lastConsumedId, @Param("maxRows") int maxRows);
@Param("lastConsumedId") long lastConsumedId,
@Param("lagMs") long lagMs,
@Param("maxRows") int maxRows);

@SelectProvider(type = EntityChangeLogSQLProviderFactory.class, method = "selectMaxChangeId")
Long selectMaxChangeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ static class EntityChangeLogMySQLProvider extends EntityChangeLogBaseSQLProvider
static class EntityChangeLogH2Provider extends EntityChangeLogBaseSQLProvider {}

public static String selectEntityChanges(
@Param("lastConsumedId") long lastConsumedId, @Param("maxRows") int maxRows) {
return getProvider().selectEntityChanges(lastConsumedId, maxRows);
@Param("lastConsumedId") long lastConsumedId,
@Param("lagMs") long lagMs,
@Param("maxRows") int maxRows) {
return getProvider().selectEntityChanges(lastConsumedId, lagMs, maxRows);
}

public static String selectMaxChangeId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,31 @@ public class EntityChangeLogBaseSQLProvider {
* #selectMaxChangeId()} because its cache starts empty and it does not need historical
* invalidations. Re-consuming a row on an existing instance is acceptable: entity DROP/ALTER
* handling only invalidates cache keys, and invalidation is idempotent.
*
* <p>The {@code created_at <= dbNowMs - lagMs} predicate applies a lagging high-water mark.
* Auto-increment ids are assigned at INSERT time but become visible at COMMIT time, so a lower id
* can commit after a higher id; advancing the cursor purely by the largest committed id can step
* over a not-yet-committed lower id and lose its invalidation forever. A smaller-id row was
* necessarily inserted no later than a larger-id row, so by only consuming rows whose {@code
* created_at} is at least {@code lagMs} in the past (measured by the database clock, so there is
* no app/DB clock skew), that smaller-id row's inserting transaction has had at least {@code
* lagMs} to commit. As long as {@code lagMs} exceeds the longest write transaction, no smaller id
* can still commit after the cursor has advanced past a consumed row. The cutoff is computed with
* the same DB-millis expression used by {@link #insertEntityChange} so {@code created_at} values
* are comparable.
*/
public String selectEntityChanges(
@Param("lastConsumedId") long lastConsumedId, @Param("maxRows") int maxRows) {
@Param("lastConsumedId") long lastConsumedId,
@Param("lagMs") long lagMs,
@Param("maxRows") int maxRows) {
return "SELECT id, metalake_name as metalakeName, entity_type as entityType,"
+ " entity_full_name as fullName, operate_type as operateType, created_at as createdAt"
+ " FROM "
+ ENTITY_CHANGE_LOG_TABLE_NAME
+ " WHERE id > #{lastConsumedId} ORDER BY id LIMIT #{maxRows}";
+ " WHERE id > #{lastConsumedId}"
+ " AND created_at <= ((UNIX_TIMESTAMP() * 1000.0)"
+ " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000) - #{lagMs}"
+ " ORDER BY id LIMIT #{maxRows}";
}

public String selectMaxChangeId() {
Expand Down
Loading
Loading