Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public Future<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean
if (mutation.id().isNone())
return applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>());
else
return applyInternalTracked(mutation, new AsyncPromise<>());
return applyInternalTracked(mutation, writeCommitLog, new AsyncPromise<>());
}

public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
Expand Down Expand Up @@ -441,7 +441,7 @@ public void apply(final Mutation mutation,
boolean isDroppable)
{
if (MigrationRouter.isFullyTracked(mutation))
applyInternalTracked(mutation, null);
applyInternalTracked(mutation, makeDurable, null);
else
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
}
Expand Down Expand Up @@ -615,7 +615,7 @@ else if (isDeferrable)
/**
* Append the mutation to the mutation journal, then update memtables and indexes.
*/
private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
private Future<?> applyInternalTracked(Mutation mutation, boolean makeDurable, Promise<?> future)
{
MutationTrackingService.ensureEnabled();
if (!MigrationRouter.isFullyTracked(mutation) || mutation.id().isNone())
Expand All @@ -628,11 +628,11 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
throw new RuntimeException("Testing write failures");

boolean started;
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true))
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, makeDurable))
{
started = MutationTrackingService.instance().startWriting(mutation);

if (started)
if (started || !makeDurable)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need durable MT journal when makeDurable=false? I'm thinking mostly of the case where the schema is created with durable_writes=false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so basically, the question is whether on line 633 do we need to write the journal when we don't need durable writes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point, I'd forgotten you can turn off log durability for keyspaces. I think the answer is that MT doesn't work without the mutation journal, so we need to make this path only reachable on replay and that we add a check to schema changes to fail validation if durable writes are turned off

{
for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re

MigrationRouter.validateTrackedMutation(mutation);

Tracing.trace("Appending to mutation journal");
CommitLogPosition pointer = MutationJournal.instance().write(mutation.id(), mutation);
CommitLogPosition pointer = null;
if (makeDurable)
{
Tracing.trace("Appending to mutation journal");
pointer = MutationJournal.instance().write(mutation.id(), mutation);
}

return new CassandraWriteContext(group, pointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static MutationTrackingMetrics instance()
public final Histogram readSummarySize; // Read summary sizes
public final Gauge<Long> unreconciledMutationCount; // Number of unreconciled mutations
public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal on disk
public final Gauge<Integer> pendingClearReplaySize; // Static segments awaiting clearNeedsReplay

@SuppressWarnings("Convert2MethodRef")
private MutationTrackingMetrics()
Expand All @@ -63,5 +64,9 @@ private MutationTrackingMetrics()
factory.createMetricName("JournalDiskSpaceUsed"),
() -> MutationJournal.instance().getDiskSpaceUsed()
);
pendingClearReplaySize = Metrics.register(
factory.createMetricName("PendingClearReplaySize"),
() -> MutationJournal.pendingClearReplaySize()
);
}
}
83 changes: 78 additions & 5 deletions src/java/org/apache/cassandra/replication/MutationJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand All @@ -30,10 +33,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.utils.Invariants;

Expand Down Expand Up @@ -79,11 +85,27 @@
// TODO (required): handle table truncations
public class MutationJournal
{
private static final Logger logger = LoggerFactory.getLogger(MutationJournal.class);

// opaque / immutable list of segments that we should clear the needs-replay flag on
public static class PendingClearReplay
{
private final ImmutableSet<Long> segments;

public PendingClearReplay(ImmutableSet<Long> segments)
{
this.segments = segments;
}
}

private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null;

private final Journal<ShortMutationId, Mutation> journal;
private final Map<Long, SegmentStateTracker> segmentStateTrackers;

// Static segments awaiting durable cleanup of their needsReplay=false metadata.
private final Set<Long> pendingClearReplay = ConcurrentHashMap.newKeySet();

// Most of the time during write, we will notify last known segment, so we optimistically cache last segment tracker,
// without imposing any visibility guarantees. If we do not see the right segment in this field, we will look it up
// in NBHM.
Expand Down Expand Up @@ -176,20 +198,71 @@ public CommitLogPosition getCurrentPosition()
}

// If all Memtables associated with given segment were flushed by the time we have closed active segment
// and opened it as static, mark its metadata to indicate it does not need replay. It may happen that we
// crash before persisting this metadata, in which case we will unnecessarily replay the segment, which
// has no correctness implications.
// and opened it as static, the segment is eligible to be marked as not needing replay. The actual durable
// recording of needsReplay=false is deferred — we record the segment in pendingClearReplay and let the
// LogStatePersister drain the queue after it has written witnessed offsets to system.coordinator_logs.
//
// See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
private void maybeCleanupStaticSegment(Segment<ShortMutationId, Mutation> segment)
{
Invariants.require(segment.isStatic());
SegmentStateTracker tracker = segmentStateTrackers.get(segment.id());
if (tracker != null && tracker.removeCleanFromDirty())
pendingClearReplay.add(segment.id());
}

/**
* Snapshot the current set of segments awaiting clearing of their needs replay flag.
*/
public PendingClearReplay snapshotPendingClearReplay()
{
return new PendingClearReplay(ImmutableSet.copyOf(pendingClearReplay));
}

/**
* Mark the given PendingClearReplay as not needing replay
*
* See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
*/
public void drainCleanup(PendingClearReplay toDrain)
{
for (long segId : toDrain.segments)
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
List<Segment<ShortMutationId, Mutation>> found = journal.getSegments(segId, segId);
if (found.isEmpty())
{
// segment was dropped between enqueue and drain — nothing to persist.
pendingClearReplay.remove(segId);
continue;
}
Segment<ShortMutationId, Mutation> segment = found.get(0);
try
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
pendingClearReplay.remove(segId);
}
catch (Throwable t)
{
logger.warn("Deferred cleanup failed for segment {}; will retry next persister tick", segId, t);
// leave in live queue
}
}
}

@VisibleForTesting
public Set<Long> pendingCleanupForTesting()
{
return pendingClearReplay;
}

public static int pendingClearReplaySize()
{
if (instance == null)
return 0;
return instance.pendingClearReplay.size();
}

void startInternal()
{
journal.start();
Expand Down
Loading