[#11736] improvement(core): Close EntityChangeLogPoller commit-ordering gap and listener-failure cursor advance#11739
[#11736] improvement(core): Close EntityChangeLogPoller commit-ordering gap and listener-failure cursor advance#11739yuqi1129 wants to merge 6 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR strengthens HA cache invalidation reliability by hardening the shared EntityChangeLogPoller against (a) commit-ordering gaps when using an auto-increment id cursor and (b) cursor advancement when one listener fails, plus it introduces and documents a new polling lag configuration.
Changes:
- Add a DB-clock-based lag window (
pollLagMs/gravitino.entityChangeLog.pollLagSecs) so poll consumption avoids commit-ordering gaps from out-of-order transaction commits. - Change cursor advancement semantics so the high-water mark only advances when all listeners successfully apply a batch; failures are retried next cycle.
- Update mapper/provider signatures and tests to thread the lag through
selectEntityChanges, and update config/docs versions to1.4.0.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/gravitino-server-config.md | Documents the new gravitino.entityChangeLog.pollLagSecs and updates “since version” for change-log configs. |
| core/src/main/java/org/apache/gravitino/storage/relational/EntityChangeLogPoller.java | Adds poll lag support and changes cursor advancement to be conditional on listener success. |
| core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java | Wires pollLagSecs from config into the poller. |
| core/src/main/java/org/apache/gravitino/storage/relational/mapper/EntityChangeLogMapper.java | Extends selectEntityChanges to accept lagMs. |
| core/src/main/java/org/apache/gravitino/storage/relational/mapper/EntityChangeLogSQLProviderFactory.java | Threads lagMs through to DB-specific SQL providers. |
| core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/EntityChangeLogBaseSQLProvider.java | Adds lagging cutoff predicate to change selection SQL. |
| core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/EntityChangeLogPostgreSQLProvider.java | Overrides selection SQL to use PostgreSQL epoch-millis expression for the lag cutoff. |
| core/src/main/java/org/apache/gravitino/Configs.java | Adds ENTITY_CHANGE_LOG_POLL_LAG_SECS config entry and updates versions to 1.4.0. |
| common/src/main/java/org/apache/gravitino/config/ConfigConstants.java | Adds VERSION_1_4_0. |
| core/src/test/java/org/apache/gravitino/storage/relational/TestEntityChangeLogPoller.java | Updates tests for new cursor semantics and mapper signature; adds negative lag validation and recovery tests. |
| core/src/test/java/org/apache/gravitino/storage/relational/mapper/provider/base/TestEntityChangeLogMapper.java | Adds coverage for lag excluding “fresh” rows until they age past the lag window. |
| core/src/test/java/org/apache/gravitino/storage/relational/service/TestEntityChangeLogService.java | Updates mapper call signature in service tests. |
| core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableMetaService.java | Updates mapper call signature in service tests. |
d2681f6 to
2bee41e
Compare
…ng gap and listener-failure cursor advance The shared EntityChangeLogPoller could permanently lose a change-log row's invalidation, leaving stale data on remote nodes: 1. Commit-ordering gap: auto-increment ids are assigned at INSERT but visible at COMMIT, so a lower id can commit after a higher id and be skipped by the `id > highWater` cursor. Fixed with a lagging high-water mark: only consume rows older than `gravitino.entityChangeLog.pollLagSecs` (default 1s), measured by the DB clock so there is no app/DB skew. 2. Cursor advanced even when a listener threw, dropping that batch's invalidations. Now the cursor only advances when every listener succeeds; a failing listener no longer blocks others and the batch is retried. Also adds the missing VERSION_1_4_0 constant and relabels the entity change log configs (1.3.0 was never released; dev is 1.4.0-SNAPSHOT).
2bee41e to
a1ffcff
Compare
… lag - Reword the pollLagSecs docs/Javadoc: the lag works because a smaller-id row is inserted no later than a larger-id row, so waiting pollLagMs gives that row's inserting transaction time to commit. Drop the inaccurate claim about "all transactions that started before a consumed row". - Validate retentionMs > pollLagMs when cleanup is enabled, so a row can never be pruned before it becomes eligible for polling. Add a covering test.
|
Thanks for the review — addressed in e850679:
|
RelationalEntityStore.initialize now reads the new pollLagSecs config, so the mocked Config in tests/benchmarks returned null for it and NPE'd. Stub it (1s, below the 24h retention stub) everywhere the sibling change-log configs are stubbed.
Code Coverage Report
Files
|
jerryshao
left a comment
There was a problem hiding this comment.
Overall the fix is sound and well-tested. Two bugs (commit-ordering gap and listener-failure cursor advance) are addressed correctly. Comments below are one functional concern, two style/usability nits, and one minor test coverage gap.
| TimeUnit.DAYS.toMillis(1), | ||
| TimeUnit.HOURS.toMillis(1), | ||
| TimeUnit.SECONDS.toMillis(1), | ||
| System::currentTimeMillis); |
There was a problem hiding this comment.
Style: hardcoded 1 should reference DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS.
This repeats the magic value that is also defined in Configs.DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS. If the default is ever changed in Configs, this single-arg constructor is silently left inconsistent.
// Prefer
TimeUnit.SECONDS.toMillis(DEFAULT_ENTITY_CHANGE_LOG_POLL_LAG_SECS),There was a problem hiding this comment.
OK, it makes sense.
| listener.onEntityChange(dispatchedChanges); | ||
| } catch (Exception e) { | ||
| allListenersSucceeded = false; | ||
| LOG.warn("Entity change listener {} failed", listener.getClass().getName(), e); |
There was a problem hiding this comment.
Concern: LOG.warn is likely too quiet for a persistent listener failure.
With the new cursor semantics, a listener failure blocks forward progress for ALL new change rows. If a listener fails on every cycle, the cache silently serves stale data until it recovers — or until retention cleanup prunes the stuck rows, at which point the invalidations are permanently lost. A single WARN per cycle is easy to miss in production.
Consider escalating to LOG.error, or tracking a consecutive-failure counter per listener and switching to LOG.error after N failures. At that point the operator has a closing window (bounded by retentionMs / pollIntervalMs cycles) before invalidations are permanently dropped.
There was a problem hiding this comment.
I'm hesitant about these two levels and adopt the suggestion.
| long pollIntervalSecs, long retentionMs, long cleanupIntervalMs, long pollLagMs) { | ||
| this(pollIntervalSecs, retentionMs, cleanupIntervalMs, pollLagMs, System::currentTimeMillis); | ||
| } | ||
|
|
There was a problem hiding this comment.
Minor: Preconditions error message omits the actual values.
"retentionMs must be greater than pollLagMs when cleanup is enabled"When this fires at startup the operator sees an IllegalArgumentException without knowing which values triggered it. Prefer:
String.format(
"retentionMs (%d) must be greater than pollLagMs (%d) when cleanup is enabled",
retentionMs, pollLagMs)| // A freshly written row is younger than the lag window, so it must not be consumed yet. | ||
| entityChangeLogMapper.insertEntityChange( | ||
| "metalake1", "TABLE", "metalake1.cat.schema.fresh", OperateType.ALTER); | ||
| List<EntityChangeRecord> withLag = entityChangeLogMapper.selectEntityChanges(0L, 60_000L, 100); |
There was a problem hiding this comment.
Minor: lagMs = 0 ("disable lag") path not explicitly asserted in this new test.
Since lagMs = 0 is the documented contract for users who set pollLagSecs = 0, a brief assertion that a fresh row IS immediately returned with lagMs = 0 would pin the zero-lag SQL behaviour at the mapper level:
// lagMs = 0 disables the filter — fresh rows must be consumable immediately
List<EntityChangeRecord> noLag = entityChangeLogMapper.selectEntityChanges(0L, 0L, 100);
Assertions.assertFalse(noLag.isEmpty(), "lagMs=0 should return the fresh row");
diqiu50
left a comment
There was a problem hiding this comment.
The lag is only applied on the regular polling path. start() still initializes entityPollHighWaterId from unlagged selectMaxChangeId(), so a restart can still skip a lower id that was inserted but uncommitted while a higher id was already committed. Once the cursor is initialized past that lower id, the lagged polling query cannot recover it. The startup cursor should be initialized using the same lagged/eligible boundary.
…gPoller - Reference Configs default constants instead of hardcoded retention/cleanup/lag values in the single-arg constructor - Include actual retentionMs/pollLagMs values in the cross-field precondition message - Log at ERROR when listener failure pauses the cursor, warning about invalidation loss past retention - Add mapper test asserting lagMs=0 returns fresh rows immediately
What changes were proposed in this pull request?
Two fixes to the shared
EntityChangeLogPoller(consumed today by the jcasbin caches), which could permanently lose a change-log row's invalidation:selectEntityChangesnow only consumes rows older than a configurable lag (created_at <= DB_now - lagMs), computed with the DB clock so there is no app/DB skew. The cursor stays the simple monotonicid. New configgravitino.entityChangeLog.pollLagSecs(default1). The PostgreSQL provider gets its ownselectEntityChangesoverride for the EPOCH expression.Also adds the missing
VERSION_1_4_0constant for the newly introducedpollLagSecsconfig.Why are the changes needed?
In multi-node deployments these two paths silently drop invalidations, leaving stale entities/relations on remote nodes after an ALTER/DROP/grant/revoke/setOwner on another node. Problems (1) and (2) already affect the existing jcasbin cache path.
Fix: #11736
Does this PR introduce any user-facing change?
Adds one property key:
gravitino.entityChangeLog.pollLagSecs(default1s, documented ingravitino-server-config.md).How was this patch tested?
TestEntityChangeLogPoller: rewrote the listener-failure test (cursor no longer advances; batch retried), added cursor-recovery and negative-lag-rejection tests.TestEntityChangeLogMapper: addedtestEntityChangeLogLagExcludesFreshRows(fresh rows excluded, consumable once aged).TestEntityChangeLogService/TestTableMetaServicefor the new mapper signature. All pass; spotless clean.