Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public Future<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean
if (mutation.id().isNone())
return applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>());
else
return applyInternalTracked(mutation, new AsyncPromise<>());
return applyInternalTracked(mutation, false, new AsyncPromise<>());
}

public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
Expand Down Expand Up @@ -441,7 +441,10 @@ public void apply(final Mutation mutation,
boolean isDroppable)
{
if (MigrationRouter.isFullyTracked(mutation))
applyInternalTracked(mutation, null);
{
// makeDurable is ignored for tracked mutations, the mutation journal is required for replication
applyInternalTracked(mutation, false, null);
}
else
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
}
Expand Down Expand Up @@ -612,10 +615,23 @@ else if (isDeferrable)
}
}

/**
* Apply a tracked mutation read from the {@link org.apache.cassandra.replication.MutationJournal}
* during static-segment replay on startup.
*
* Compared to the normal write apply path, this skips the journal append (since we're replaying from it)
* and always writes to the memtable, even when {@link MutationTrackingService#startWriting} reports the offset
* as already witnessed.
*/
public void applyForReplay(Mutation mutation)
{
applyInternalTracked(mutation, true, null);
}

/**
* Append the mutation to the mutation journal, then update memtables and indexes.
*/
private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
private Future<?> applyInternalTracked(Mutation mutation, boolean isReplay, Promise<?> future)
{
MutationTrackingService.ensureEnabled();
if (!MigrationRouter.isFullyTracked(mutation) || mutation.id().isNone())
Expand All @@ -628,11 +644,11 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
throw new RuntimeException("Testing write failures");

boolean started;
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true))
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, !isReplay))
{
started = MutationTrackingService.instance().startWriting(mutation);

if (started)
if (started || isReplay)
{
for (PartitionUpdate upd : mutation.getPartitionUpdates())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re

MigrationRouter.validateTrackedMutation(mutation);

Tracing.trace("Appending to mutation journal");
CommitLogPosition pointer = MutationJournal.instance().write(mutation.id(), mutation);
CommitLogPosition pointer = null;
if (makeDurable)
{
Tracing.trace("Appending to mutation journal");
pointer = MutationJournal.instance().write(mutation.id(), mutation);
}

return new CassandraWriteContext(group, pointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static MutationTrackingMetrics instance()
public final Histogram readSummarySize; // Read summary sizes
public final Gauge<Long> unreconciledMutationCount; // Number of unreconciled mutations
public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal on disk
public final Gauge<Integer> pendingClearReplaySize; // Static segments awaiting clearNeedsReplay

@SuppressWarnings("Convert2MethodRef")
private MutationTrackingMetrics()
Expand All @@ -63,5 +64,9 @@ private MutationTrackingMetrics()
factory.createMetricName("JournalDiskSpaceUsed"),
() -> MutationJournal.instance().getDiskSpaceUsed()
);
pendingClearReplaySize = Metrics.register(
factory.createMetricName("PendingClearReplaySize"),
() -> MutationJournal.pendingClearReplaySize()
);
}
}
85 changes: 79 additions & 6 deletions src/java/org/apache/cassandra/replication/MutationJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand All @@ -30,10 +33,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.utils.Invariants;

Expand Down Expand Up @@ -79,11 +85,27 @@
// TODO (required): handle table truncations
public class MutationJournal
{
private static final Logger logger = LoggerFactory.getLogger(MutationJournal.class);

// opaque / immutable list of segments that we should clear the needs-replay flag on
public static class PendingClearReplay
{
private final ImmutableSet<Long> segments;

public PendingClearReplay(ImmutableSet<Long> segments)
{
this.segments = segments;
}
}

private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null;

private final Journal<ShortMutationId, Mutation> journal;
private final Map<Long, SegmentStateTracker> segmentStateTrackers;

// Static segments awaiting durable cleanup of their needsReplay=false metadata.
private final Set<Long> pendingClearReplay = ConcurrentHashMap.newKeySet();

// Most of the time during write, we will notify last known segment, so we optimistically cache last segment tracker,
// without imposing any visibility guarantees. If we do not see the right segment in this field, we will look it up
// in NBHM.
Expand Down Expand Up @@ -176,20 +198,71 @@ public CommitLogPosition getCurrentPosition()
}

// If all Memtables associated with given segment were flushed by the time we have closed active segment
// and opened it as static, mark its metadata to indicate it does not need replay. It may happen that we
// crash before persisting this metadata, in which case we will unnecessarily replay the segment, which
// has no correctness implications.
// and opened it as static, the segment is eligible to be marked as not needing replay. The actual durable
// recording of needsReplay=false is deferred — we record the segment in pendingClearReplay and let the
// LogStatePersister drain the queue after it has written witnessed offsets to system.coordinator_logs.
//
// See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
private void maybeCleanupStaticSegment(Segment<ShortMutationId, Mutation> segment)
{
Invariants.require(segment.isStatic());
SegmentStateTracker tracker = segmentStateTrackers.get(segment.id());
if (tracker != null && tracker.removeCleanFromDirty())
pendingClearReplay.add(segment.id());
}

/**
* Snapshot the current set of segments awaiting clearing of their needs replay flag.
*/
public PendingClearReplay snapshotPendingClearReplay()
{
return new PendingClearReplay(ImmutableSet.copyOf(pendingClearReplay));
}

/**
* Mark the given PendingClearReplay as not needing replay
*
* See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
*/
public void drainCleanup(PendingClearReplay toDrain)
{
for (long segId : toDrain.segments)
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
List<Segment<ShortMutationId, Mutation>> found = journal.getSegments(segId, segId);
if (found.isEmpty())
{
// segment was dropped between enqueue and drain — nothing to persist.
pendingClearReplay.remove(segId);
continue;
}
Segment<ShortMutationId, Mutation> segment = found.get(0);
try
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
pendingClearReplay.remove(segId);
}
catch (Throwable t)
{
logger.warn("Deferred cleanup failed for segment {}; will retry next persister tick", segId, t);
// leave in live queue
}
}
}

@VisibleForTesting
public Set<Long> pendingCleanupForTesting()
{
return pendingClearReplay;
}

public static int pendingClearReplaySize()
{
if (instance == null)
return 0;
return instance.pendingClearReplay.size();
}

void startInternal()
{
journal.start();
Expand Down Expand Up @@ -328,7 +401,7 @@ protected void accept(long segmentId, int position, ShortMutationId key, Mutatio
if (newPUCollector != null)
{
assert !newPUCollector.isEmpty();
keyspace.apply(newPUCollector.build(), false, true, false);
keyspace.applyForReplay(newPUCollector.build());
}
}
}, getAvailableProcessors());
Expand Down
Loading