CEP-45: Lost witness marker race#4892
Conversation
Don't truncate journal segments until witnessed offsets they contain are flushed. Also moves MutationTrackingService startup to after the commit log is replayed
frankgh
left a comment
There was a problem hiding this comment.
I've added a couple of comments, but the patch looks good in general.
| ); | ||
| pendingClearReplaySize = Metrics.register( | ||
| factory.createMetricName("PendingClearReplaySize"), | ||
| () -> MutationJournal.instance().pendingClearReplaySize() |
There was a problem hiding this comment.
Do we want to worry about the case where MT is disabled and maybe handle the IllegalStateException thrown when the instance is null?
| // opaque / immutable list of segments that we should clear the needs-replay flag on | ||
| public static class PendingClearReplay | ||
| { | ||
| private ImmutableSet<Long> segments; |
There was a problem hiding this comment.
NIT: can we make this final?
| private ImmutableSet<Long> segments; | |
| private final mmutableSet<Long> segments; |
| executor.awaitTermination(1, TimeUnit.MINUTES); | ||
| // attempt to persist offsets and mark segments as | ||
| // not needing replay one last time before shutdown | ||
| if (started) |
There was a problem hiding this comment.
| if (started) | |
| if (wasStarted) |
| @@ -323,6 +337,10 @@ private void shutdownBlocking() throws InterruptedException | |||
| activeReconciler.shutdownBlocking(); | |||
| executor.shutdown(); | |||
| executor.awaitTermination(1, TimeUnit.MINUTES); | |||
There was a problem hiding this comment.
Should we log if we fail to shutdown here?
| executor.awaitTermination(1, TimeUnit.MINUTES); | |
| if (!executor.awaitTermination(1, TimeUnit.MINUTES)) | |
| { | |
| logger.warn("Mutation tracking executor did not terminate within 1 minute; forcing shutdown"); | |
| } |
| * To improve startup, we periodically save our view of mutation ids that we've witnessed to disk as part of this | ||
| * class. Any ids witnessed since the last time this class was run are reconstructed by replaying the journal. | ||
| * | ||
| * However, if an sstable is flushed is after the most recent LogStatePersister run, AND it marks a segment as no |
There was a problem hiding this comment.
NIT:
| * However, if an sstable is flushed is after the most recent LogStatePersister run, AND it marks a segment as no | |
| * However, if an sstable is flushed after the most recent LogStatePersister run, AND it marks a segment as no |
| TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); | ||
| DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(key)); | ||
| MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); | ||
| if (summary.size() == 0) |
There was a problem hiding this comment.
NIT:
| if (summary.size() == 0) | |
| if (summary.isEmpty()) |
|
Just pushed up a small test fix. Unlike normal writes, mutation tracking noops a write if we’ve already seen it, which is a reasonable optimization when we’re tracking each write. Unfortunately this can bite us on startup. If witnessed offsets are flushed to disk before the memtable containing those offsets are also flushed to sstables, then on startup mutation tracking will think it’s already seen all of those mutations and not write them to the memtable and losing a bunch of data in the process. The fix is pretty simple and just does what commit log replay does. Commit log replay applies it’s mutations with |
| started = MutationTrackingService.instance().startWriting(mutation); | ||
|
|
||
| if (started) | ||
| if (started || !makeDurable) |
There was a problem hiding this comment.
Do we need durable MT journal when makeDurable=false? I'm thinking mostly of the case where the schema is created with durable_writes=false
There was a problem hiding this comment.
so basically, the question is whether on line 633 do we need to write the journal when we don't need durable writes.
There was a problem hiding this comment.
that's a good point, I'd forgotten you can turn off log durability for keyspaces. I think the answer is that MT doesn't work without the mutation journal, so we need to make this path only reachable on replay and that we add a check to schema changes to fail validation if durable writes are turned off
| int writesPerKey = 2; | ||
| int pks = 100; | ||
| withRandom(rng -> { | ||
| withRandom(1509900183613458L, rng -> { |
There was a problem hiding this comment.
do we want to reset this seed?
There was a problem hiding this comment.
I'll remove it on commit. Thanks!
| // CREATE: tracked + durable_writes=false should be rejected | ||
| String createKs = nextKsName(); | ||
| Throwable createFailure = expectFailure(() -> | ||
| schemaChange("CREATE KEYSPACE " + createKs + |
| assertEquals("Pre-bounce witness count must equal write count", writes, preBounceOffsetCount); | ||
|
|
||
| // Flush so notifyFlushed marks the active segment's interval clean. | ||
| cluster.get(1).nodetoolResult("flush", KEYSPACE).asserts().success(); |
There was a problem hiding this comment.
should we flush system.coordinator_logs here as well?
cluster.get(1).nodetoolResult("flush", "system", "coordinator_logs").asserts().success();
| Throwable createFailure = expectFailure(() -> | ||
| schemaChange("CREATE KEYSPACE " + createKs + | ||
| " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + | ||
| " AND replication_type = 'tracked'" + | ||
| " AND durable_writes = false") | ||
| ); | ||
| assertTrue("Expected ConfigurationException root cause, got: " + createFailure, | ||
| rootCause(createFailure) instanceof ConfigurationException); |
There was a problem hiding this comment.
| Throwable createFailure = expectFailure(() -> | |
| schemaChange("CREATE KEYSPACE " + createKs + | |
| " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + | |
| " AND replication_type = 'tracked'" + | |
| " AND durable_writes = false") | |
| ); | |
| assertTrue("Expected ConfigurationException root cause, got: " + createFailure, | |
| rootCause(createFailure) instanceof ConfigurationException); | |
| assertThatThrownBy(() -> | |
| schemaChange("CREATE KEYSPACE " + createKs + | |
| " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}" + | |
| " AND replication_type = 'tracked'" + | |
| " AND durable_writes = false") | |
| ).hasRootCauseInstanceOf(ConfigurationException.class); |
| Throwable alterTrackedFailure = expectFailure(() -> | ||
| schemaChange("ALTER KEYSPACE " + alterKs + " WITH durable_writes = false") | ||
| ); | ||
| assertTrue("Expected ConfigurationException root cause, got: " + alterTrackedFailure, | ||
| rootCause(alterTrackedFailure) instanceof ConfigurationException); |
There was a problem hiding this comment.
| Throwable alterTrackedFailure = expectFailure(() -> | |
| schemaChange("ALTER KEYSPACE " + alterKs + " WITH durable_writes = false") | |
| ); | |
| assertTrue("Expected ConfigurationException root cause, got: " + alterTrackedFailure, | |
| rootCause(alterTrackedFailure) instanceof ConfigurationException); | |
| assertThatThrownBy(() -> schemaChange("ALTER KEYSPACE " + alterKs + " WITH durable_writes = false")).hasRootCauseInstanceOf(ConfigurationException.class); |
| Throwable alterToTrackedFailure = expectFailure(() -> | ||
| schemaChange("ALTER KEYSPACE " + migratedKs + " WITH replication_type = 'tracked'") | ||
| ); | ||
| assertTrue("Expected ConfigurationException root cause, got: " + alterToTrackedFailure, | ||
| rootCause(alterToTrackedFailure) instanceof ConfigurationException); |
There was a problem hiding this comment.
| Throwable alterToTrackedFailure = expectFailure(() -> | |
| schemaChange("ALTER KEYSPACE " + migratedKs + " WITH replication_type = 'tracked'") | |
| ); | |
| assertTrue("Expected ConfigurationException root cause, got: " + alterToTrackedFailure, | |
| rootCause(alterToTrackedFailure) instanceof ConfigurationException); | |
| assertThatThrownBy(() -> schemaChange("ALTER KEYSPACE " + migratedKs + " WITH replication_type = 'tracked'")).hasRootCauseInstanceOf(ConfigurationException.class); |
| private static Throwable expectFailure(Runnable r) | ||
| { | ||
| try | ||
| { | ||
| r.run(); | ||
| } | ||
| catch (Throwable t) | ||
| { | ||
| return t; | ||
| } | ||
| fail("Expected exception but none was thrown"); | ||
| return null; | ||
| } | ||
|
|
||
| private static Throwable rootCause(Throwable t) | ||
| { | ||
| Throwable cause = t; | ||
| while (cause.getCause() != null && cause.getCause() != cause) | ||
| cause = cause.getCause(); | ||
| return cause; | ||
| } |
There was a problem hiding this comment.
| private static Throwable expectFailure(Runnable r) | |
| { | |
| try | |
| { | |
| r.run(); | |
| } | |
| catch (Throwable t) | |
| { | |
| return t; | |
| } | |
| fail("Expected exception but none was thrown"); | |
| return null; | |
| } | |
| private static Throwable rootCause(Throwable t) | |
| { | |
| Throwable cause = t; | |
| while (cause.getCause() != null && cause.getCause() != cause) | |
| cause = cause.getCause(); | |
| return cause; | |
| } |
There was a problem hiding this comment.
Can we instead use import static org.apache.cassandra.utils.AssertionUtils.assertThatThrownBy; here?
Don't truncate journal segments until witnessed offsets they contain are flushed. Also moves MutationTrackingService startup to after the commit log is replayed
Thanks for sending a pull request! Here are some tips if you're new here:
Commit messages should follow the following format:
The Cassandra Jira