Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static MutationTrackingMetrics instance()
public final Histogram readSummarySize; // Read summary sizes
public final Gauge<Long> unreconciledMutationCount; // Number of unreconciled mutations
public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal on disk
public final Gauge<Integer> pendingClearReplaySize; // Static segments awaiting clearNeedsReplay

@SuppressWarnings("Convert2MethodRef")
private MutationTrackingMetrics()
Expand All @@ -63,5 +64,9 @@ private MutationTrackingMetrics()
factory.createMetricName("JournalDiskSpaceUsed"),
() -> MutationJournal.instance().getDiskSpaceUsed()
);
pendingClearReplaySize = Metrics.register(
factory.createMetricName("PendingClearReplaySize"),
() -> MutationJournal.pendingClearReplaySize()
);
}
}
83 changes: 78 additions & 5 deletions src/java/org/apache/cassandra/replication/MutationJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand All @@ -30,10 +33,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.jctools.maps.NonBlockingHashMapLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.utils.Invariants;

Expand Down Expand Up @@ -79,11 +85,27 @@
// TODO (required): handle table truncations
public class MutationJournal
{
private static final Logger logger = LoggerFactory.getLogger(MutationJournal.class);

// opaque / immutable list of segments that we should clear the needs-replay flag on
public static class PendingClearReplay
{
private final ImmutableSet<Long> segments;

public PendingClearReplay(ImmutableSet<Long> segments)
{
this.segments = segments;
}
}

private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null;

private final Journal<ShortMutationId, Mutation> journal;
private final Map<Long, SegmentStateTracker> segmentStateTrackers;

// Static segments awaiting durable cleanup of their needsReplay=false metadata.
private final Set<Long> pendingClearReplay = ConcurrentHashMap.newKeySet();

// Most of the time during write, we will notify last known segment, so we optimistically cache last segment tracker,
// without imposing any visibility guarantees. If we do not see the right segment in this field, we will look it up
// in NBHM.
Expand Down Expand Up @@ -176,20 +198,71 @@ public CommitLogPosition getCurrentPosition()
}

// If all Memtables associated with given segment were flushed by the time we have closed active segment
// and opened it as static, mark its metadata to indicate it does not need replay. It may happen that we
// crash before persisting this metadata, in which case we will unnecessarily replay the segment, which
// has no correctness implications.
// and opened it as static, the segment is eligible to be marked as not needing replay. The actual durable
// recording of needsReplay=false is deferred — we record the segment in pendingClearReplay and let the
// LogStatePersister drain the queue after it has written witnessed offsets to system.coordinator_logs.
//
// See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
private void maybeCleanupStaticSegment(Segment<ShortMutationId, Mutation> segment)
{
Invariants.require(segment.isStatic());
SegmentStateTracker tracker = segmentStateTrackers.get(segment.id());
if (tracker != null && tracker.removeCleanFromDirty())
pendingClearReplay.add(segment.id());
}

/**
* Snapshot the current set of segments awaiting clearing of their needs replay flag.
*/
public PendingClearReplay snapshotPendingClearReplay()
{
return new PendingClearReplay(ImmutableSet.copyOf(pendingClearReplay));
}

/**
* Mark the given PendingClearReplay as not needing replay
*
* See the comment in LogStatePersister or CASSANDRA-21443 for an explanation of why we do this
*/
public void drainCleanup(PendingClearReplay toDrain)
{
for (long segId : toDrain.segments)
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
List<Segment<ShortMutationId, Mutation>> found = journal.getSegments(segId, segId);
if (found.isEmpty())
{
// segment was dropped between enqueue and drain — nothing to persist.
pendingClearReplay.remove(segId);
continue;
}
Segment<ShortMutationId, Mutation> segment = found.get(0);
try
{
segment.metadata().clearNeedsReplay();
segment.persistMetadata();
pendingClearReplay.remove(segId);
}
catch (Throwable t)
{
logger.warn("Deferred cleanup failed for segment {}; will retry next persister tick", segId, t);
// leave in live queue
}
}
}

@VisibleForTesting
public Set<Long> pendingCleanupForTesting()
{
return pendingClearReplay;
}

public static int pendingClearReplaySize()
{
if (instance == null)
return 0;
return instance.pendingClearReplay.size();
}

void startInternal()
{
journal.start();
Expand Down
117 changes: 106 additions & 11 deletions src/java/org/apache/cassandra/replication/MutationTrackingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -147,11 +148,22 @@ public static void ensureEnabled()
throw new IllegalStateException(DISABLED_MESSAGE);
}

public static void start(ClusterMetadata metadata)
public static ClusterMetadata register(ChangeListener listener)
{
ClusterMetadataService.instance().log().addListener(listener);
return ClusterMetadata.current();
}

public static void start(Function<ChangeListener, ClusterMetadata> register)
{
if (!isEnabled())
return;
instance().startInternal(metadata);
instance().startInternal(register);
}

public static void start()
{
start(MutationTrackingService::register);
}

public static void shutdown() throws InterruptedException
Expand All @@ -178,6 +190,11 @@ public static void shutdown() throws InterruptedException
private ConcurrentHashMap<CoordinatorLogId, Shard> log2ShardMap = new ConcurrentHashMap<>();
private final ChangeListener tcmListener;

// The highest TCM epoch we have applied to keyspaceShards via onNewClusterMetadata.
// Updates with next.epoch <= this value are skipped. Protects against state going
// backwards in time when events are delivered out of order
private volatile Epoch lastAppliedEpoch = Epoch.EMPTY;

// prevents a race between topology changes (shard recreation) and coordinator log creation.
//
// coordinator log creation can race with topology updates and be lost if shard recreation discards the old
Expand Down Expand Up @@ -218,7 +235,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
};
}

private synchronized void startInternal(ClusterMetadata metadata)
private synchronized void startInternal(Function<ChangeListener, ClusterMetadata> register)
{
if (started)
return;
Expand All @@ -227,6 +244,8 @@ private synchronized void startInternal(ClusterMetadata metadata)

logger.info("Starting mutation tracking service. Previous host log id: {}", prevHostLogId);

ClusterMetadata metadata = register.apply(tcmListener);

if (metadata.myNodeId() != null)
for (KeyspaceShards ks : KeyspaceShards.loadFromSystemTables(metadata, this::nextLogId, this::onNewLog))
keyspaceShards.put(ks.keyspace, ks);
Expand Down Expand Up @@ -307,11 +326,6 @@ public ReconciledLogSnapshot snapshotReconciledLogs()
return builder.build();
}

public void registerMetadataListener()
{
ClusterMetadataService.instance().log().addListener(tcmListener);
}

public synchronized boolean isStarted()
{
return started;
Expand All @@ -322,7 +336,13 @@ private void shutdownBlocking() throws InterruptedException
ClusterMetadataService.instance().log().removeListener(tcmListener);
Comment thread
frankgh marked this conversation as resolved.
activeReconciler.shutdownBlocking();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
if (!executor.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Mutation tracking executor did not terminate within 1 minute; forcing shutdown");

// attempt to persist offsets and mark segments as
// not needing replay one last time before shutdown
if (isStarted())
offsetsPersister.run(true);
ExpiredStatePurger.instance.shutdownBlocking();
}

Expand Down Expand Up @@ -883,12 +903,15 @@ public boolean isDurablyReconciled(ImmutableCoordinatorLogOffsets logOffsets)
}
}

private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadata next)
private synchronized void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadata next)
{
if (logger.isTraceEnabled())
logger.trace("Processing cluster metadata change - epoch {} -> {}",
prev != null ? prev.epoch : "none", next.epoch);

if (!next.epoch.isAfter(lastAppliedEpoch))
return;

shardLock.readLock().lock();
try
{
Expand All @@ -905,6 +928,9 @@ private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadat
ConcurrentHashMap<String, KeyspaceShards> originalKeyspaceShards = keyspaceShards;
try
{
if (!next.epoch.isAfter(lastAppliedEpoch))
return;

if (!shardUpdateNeeded(keyspaceShards, prev, next))
return;

Expand All @@ -919,6 +945,8 @@ private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadat
if (!newKeyspaces.isEmpty())
logBackgroundReconciliationDisabledWarning(newKeyspaces);
}

lastAppliedEpoch = next.epoch;
}
catch (Throwable t)
{
Expand Down Expand Up @@ -1576,27 +1604,82 @@ private void run(Shard shard, boolean durable)
}
}

/**
* Persists per-log witnessed offsets, and durably marks needsReplay=false on any segments that have become eligible
* for it since the most recent run of this class. These 2 operations need to performed in a specific sequence to avoid
* correctness problems.
*
* For background, mutation tracking needs to keep a record of every mutation id it's written locally. For correctness
* purposes, a nodes view of mutation ids it's written locally needs to exactly match the data it has on disk.
* Having data on disk you dont have an id for, or thinking you have ids on disk that you don't breaks the mutation
* tracking consistency mechanism.
*
* To improve startup, we periodically save our view of mutation ids that we've witnessed to disk as part of this
* class. Any ids witnessed since the last time this class was run are reconstructed by replaying the journal.
*
* However, if an sstable is flushed after the most recent LogStatePersister run, AND it marks a segment as no
* longer needing replay, AND the node is stopped before the next LogStatePersister, then the offsets witnessed
* between the LogStatePersister and sstable flush will be forgotten on startup.
*
* This is a correctness problem for mutation tracking because it means that we will be returning data in reads that
* are not included in our mutation summaries, which breaks reconciliation and read monotonicity.
*
* To prevent this, witnessed offsets are flushed and segments are marked as not needing replay together in 3 steps.
*
* 1. Snapshot the set of journal segments that have been marked as needing their need replay flag set to false (but not yet updated on disk)
* 2. Flush per-log witnessed offsets to the system table
* 3. Durably mark the snapshotted segments as not needing replay
*
* This guarantees that, on startup, we will always replay all segments that may contain offsets not persisted to
* system.coordinator_logs
*/
private static class LogStatePersister implements Runnable
{
// TODO (expected): consider a different interval
// TODO: Revert before merge, just increased frequency for test
// private static final long PERSIST_INTERVAL_MILLIS = 60_000;
private static final long PERSIST_INTERVAL_MILLIS = 1_000;

private volatile boolean isPaused = false;

void start()
{
executor.scheduleWithFixedDelay(this, PERSIST_INTERVAL_MILLIS, PERSIST_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}

void pauseForTesting(boolean pause)
{
isPaused = pause;
}

@Override
public void run()
{
if (isPaused)
return;
run(true);
}

private void run(boolean dropSegments)
{
MutationTrackingService.instance().forEachKeyspace(this::run);

MutationJournal.PendingClearReplay toDrain = MutationJournal.instance().snapshotPendingClearReplay();

boolean writesOk;
try
{
MutationTrackingService.instance().forEachKeyspace(this::run);
writesOk = true;
}
catch (Throwable t)
{
writesOk = false;
logger.error("LogStatePersister write to system.coordinator_logs failed; deferring segment cleanup drain to next tick", t);
}

if (writesOk)
MutationJournal.instance().drainCleanup(toDrain);

if (dropSegments)
MutationTrackingService.instance().truncateMutationJournal();
}
Expand Down Expand Up @@ -1643,6 +1726,18 @@ public void resumeActiveReconciler()
activeReconciler.resumeForTesting();
}

@VisibleForTesting
public void pauseOffsetsPersisterForTesting()
{
offsetsPersister.pauseForTesting(true);
}

@VisibleForTesting
public void resumeOffsetsPersisterForTesting()
{
offsetsPersister.pauseForTesting(false);
}

/**
* Pause only regular-priority (background write retry) delivery in the active reconciler.
* High-priority tasks (needed by tracked read reconciliation) continue to be processed.
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.net.StartupClusterConnectivityChecker;
import org.apache.cassandra.replication.MutationJournal;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.security.ThreadAwareSecurityManager;
Expand Down Expand Up @@ -363,7 +364,10 @@ protected void setup()
AccordService.localStartup(self);

if (DatabaseDescriptor.getMutationTrackingEnabled())
{
MutationTrackingService.start();
MutationJournal.instance().replayStaticSegments();
}
}
catch (IOException e)
{
Expand Down
Loading