Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -564,40 +564,52 @@ public void run() {
"Table or partition for {} has already been dropped, the residual data will be removed.",
tabletDir,
e);
FileUtils.deleteDirectoryQuietly(tabletDir);

final Tuple2<PhysicalTablePath, TableBucket> pathAndBucket;
try {
Tuple2<PhysicalTablePath, TableBucket> pathAndBucket =
FlussPaths.parseTabletDir(tabletDir);

// Also delete corresponding KV tablet directory if it exists
File kvTabletDir =
FlussPaths.kvTabletDir(
dataDir, pathAndBucket.f0, pathAndBucket.f1);
if (kvTabletDir.exists()) {
LOG.info(
"Also removing corresponding KV tablet directory: {}",
kvTabletDir);
FileUtils.deleteDirectoryQuietly(kvTabletDir);
pathAndBucket = FlussPaths.parseTabletDir(tabletDir);
} catch (Exception parseException) {
LOG.warn(
"Failed to parse tablet directory {} while removing residual data, "
+ "skip the cleanup so it can be retried later.",
tabletDir,
parseException);
return;
}

// Delete the corresponding KV tablet directory first and only drop the log
// (the anchor of an unfinished delete) once the KV directory is confirmed
// gone, so that a KV deletion failure leaves the log in place for retry.
File kvTabletDir =
FlussPaths.kvTabletDir(dataDir, pathAndBucket.f0, pathAndBucket.f1);
if (kvTabletDir.exists()) {
try {
FileUtils.deleteDirectory(kvTabletDir);
LOG.info("Removed residual KV tablet directory: {}", kvTabletDir);
} catch (IOException kvDeleteException) {
LOG.warn(
"Failed to delete residual KV tablet directory {}, keeping the "
+ "log {} so the cleanup can be retried later.",
kvTabletDir,
tabletDir,
kvDeleteException);
return;
}
}

FileUtils.deleteDirectoryQuietly(tabletDir);

boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null;
File partitionDir = tabletDir.getParentFile();
if (partitionDir != null) {
deleteEmptyDirQuietly(partitionDir);
boolean isPartitioned = pathAndBucket.f0.getPartitionName() != null;
File partitionDir = tabletDir.getParentFile();
if (partitionDir != null) {
deleteEmptyDirQuietly(partitionDir);

if (isPartitioned) {
File tableDir = partitionDir.getParentFile();
if (tableDir != null) {
deleteEmptyDirQuietly(tableDir);
}
if (isPartitioned) {
File tableDir = partitionDir.getParentFile();
if (tableDir != null) {
deleteEmptyDirQuietly(tableDir);
}
}
} catch (Exception cleanupException) {
LOG.warn(
"Failed to clean up residual KV/parent directories for {}: {}",
tabletDir,
cleanupException.getMessage());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.fluss.server.zk.data.ZkData;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.CloseableRegistry;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.clock.Clock;
Expand Down Expand Up @@ -729,6 +730,36 @@ private void dropKv() {
checkNotNull(kvManager);
kvManager.dropKv(tableBucket);
kvTablet = null;
} else if (isKvTable()) {
// The in-memory KvTablet is not open (e.g. a follower replica), but a residual kv
// tablet directory may still exist on disk. Remove it so that we do not leave an
// orphan kv directory behind after the log is dropped.
File kvTabletDir =
FlussPaths.kvTabletDir(logTablet.getDataDir(), physicalPath, tableBucket);
if (kvTabletDir.exists()) {
try {
FileUtils.deleteDirectory(kvTabletDir);
LOG.info(
"Deleted residual kv tablet directory {} for bucket {} whose kv tablet "
+ "was not open.",
kvTabletDir,
tableBucket);
} catch (IOException e) {
// Surface the failure so the caller does not drop the log; the log is kept as
// an anchor so the cleanup can be retried later.
LOG.warn(
"Failed to delete residual kv tablet directory {} for bucket {}, "
+ "keeping the log so the cleanup can be retried later.",
kvTabletDir,
tableBucket,
e);
throw new KvStorageException(
String.format(
"Failed to delete residual kv tablet directory %s for bucket %s",
kvTabletDir, tableBucket),
e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,14 @@ private void sweepOrphanTabletDirs(
try {
FileUtils.deleteDirectory(kvTabletDir);
} catch (IOException e) {
// Keep the log as an anchor so a coordinator retry can re-enter this method
// and finish the cleanup; throwing here prevents the log from being dropped.
LOG.warn(
"Failed to delete orphan KV tablet directory {} for bucket {}, "
+ "keeping the log so the cleanup can be retried later.",
kvTabletDir,
tb,
e);
throw new KvStorageException(
String.format(
"Failed to delete orphan KV tablet directory %s", kvTabletDir),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.fluss.server.zk.ZooKeeperExtension;
import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.FlussScheduler;

Expand Down Expand Up @@ -244,6 +245,75 @@ void testRecoveryAfterLogManagerShutdown() throws Exception {
newLogManager.shutdown();
}

@Test
void testResidualKvAndLogRemovedWhenTableDropped() throws Exception {
initTableBuckets(null);
// Create the on-disk log dir and a corresponding kv tablet dir.
LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1);
File logDir = log1.getLogDir();
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath1);
File kvTabletDir = FlussPaths.kvTabletDir(tempDir, physicalTablePath, tableBucket1);
assertThat(kvTabletDir.mkdirs()).isTrue();
assertThat(logDir).exists();
assertThat(kvTabletDir).exists();

// Drop the table from ZK so that loadLog sees the schema is gone (residual data).
logManager.shutdown();
logManager = null;
zkClient.deleteTable(tablePath1);

LogManager newLogManager =
LogManager.create(
conf,
zkClient,
new FlussScheduler(1),
SystemClock.getInstance(),
TestingMetricGroups.TABLET_SERVER_METRICS,
localDiskManager);
newLogManager.startup();
logManager = newLogManager;

// Both the kv and log tablet directories should be removed.
assertThat(kvTabletDir).doesNotExist();
assertThat(logDir).doesNotExist();
}

@Test
void testLogKeptWhenResidualKvDeletionFails() throws Exception {
initTableBuckets(null);
// Create the on-disk log dir, then place a regular FILE where the kv tablet dir would be,
// so FileUtils.deleteDirectory fails (it is not a directory).
LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1);
File logDir = log1.getLogDir();
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath1);
File kvTabletDir = FlussPaths.kvTabletDir(tempDir, physicalTablePath, tableBucket1);
// The table dir already exists (created alongside the log dir); create the kv path as a
// regular file so FileUtils.deleteDirectory fails (it is not a directory).
assertThat(kvTabletDir.createNewFile()).isTrue();
assertThat(kvTabletDir).isFile();
assertThat(logDir).exists();

// Drop the table from ZK so that loadLog sees the schema is gone (residual data).
logManager.shutdown();
logManager = null;
zkClient.deleteTable(tablePath1);

LogManager newLogManager =
LogManager.create(
conf,
zkClient,
new FlussScheduler(1),
SystemClock.getInstance(),
TestingMetricGroups.TABLET_SERVER_METRICS,
localDiskManager);
newLogManager.startup();
logManager = newLogManager;

// KV deletion failed, so the log must be kept as an anchor for a later retry.
assertThat(logDir).exists();
assertThat(kvTabletDir).exists();
}

@ParameterizedTest
@MethodSource("partitionProvider")
void testHasCleanShutdownMarkerAfterLogManagerShutdown(String partitionName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,35 @@ void testMakeLeader() throws Exception {
assertThat(kvReplica.getKvTablet()).isNotNull();
}

@Test
void testDeleteRemovesResidualKvDirWhenKvTabletNotOpen() throws Exception {
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
Replica kvReplica = makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket);
makeKvReplicaAsLeader(kvReplica);

File logDir = kvReplica.getLogTablet().getLogDir();
File kvTabletDir =
kvReplica.getTabletParentDir().resolve("kv-" + tableBucket.getBucket()).toFile();
assertThat(kvTabletDir).exists();

// Become follower: the in-memory KvTablet is dropped (getKvTablet() == null).
makeKvReplicaAsFollower(kvReplica, 1);
assertThat(kvReplica.getKvTablet()).isNull();

// Simulate a residual on-disk kv directory left behind (e.g. a previous dropKv that did
// not finish), while the in-memory KvTablet is null - this is the orphan-kv condition.
assertThat(kvTabletDir.mkdirs()).isTrue();
assertThat(kvTabletDir).exists();
assertThat(logDir).exists();

kvReplica.delete();

// Even though the in-memory KvTablet was null, the residual kv directory must be removed
// (and the log removed too).
assertThat(kvTabletDir).doesNotExist();
assertThat(logDir).doesNotExist();
}

@Test
void testAppendRecordsToLeader() throws Exception {
Replica logReplica =
Expand Down