Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void before(TestInfo testInfo) {
.addProperty(
FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true")
.addProperty(FeatureConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "true")
.addProperty(FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP.catalogConfig(), "false")
.build();
Catalog catalog =
PolarisCatalog.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,8 +1035,39 @@ public boolean dropView(TableIdentifier identifier) {
boolean purge =
realmConfig.getConfig(FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP, catalogEntity);

Map<String, String> storageProperties = Map.of();
ViewMetadata lastMetadata = null;

if (purge) {
ViewOperations ops = newViewOps(identifier);
ViewMetadata currentMetadata = ops.current();
if (currentMetadata != null && currentMetadata.location() != null) {
lastMetadata = currentMetadata;

Map<String, String> clone = new HashMap<>();
clone.putAll(lastMetadata.properties());
clone.put(CatalogProperties.FILE_IO_IMPL, ioImplClassName);

PolarisResolvedPathWrapper resolvedViewEntities =
resolvedEntityView.getResolvedPath(
ResolvedPathKey.ofTableLike(identifier), PolarisEntitySubType.ICEBERG_VIEW);
PolarisResolvedPathWrapper storageHierarchy =
resolvedViewEntities != null
? resolvedViewEntities
: resolvedEntityView.getResolvedPath(
ResolvedPathKey.ofNamespace(identifier.namespace()));
Optional<PolarisEntity> storageInfoEntity =
FileIOUtil.findStorageInfoFromHierarchy(storageHierarchy);

storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).ifPresent(clone::putAll);
clone.put(PolarisTaskConstants.STORAGE_LOCATION, lastMetadata.location());

storageProperties = clone;
}
}

DropEntityResult dropEntityResult =
dropTableLike(PolarisEntitySubType.ICEBERG_VIEW, identifier, Map.of(), purge);
dropTableLike(PolarisEntitySubType.ICEBERG_VIEW, identifier, storageProperties, purge);
Comment thread
ayushtkn marked this conversation as resolved.
if (!dropEntityResult.isSuccess()) {
switch (dropEntityResult.getReturnStatus()) {
case BaseResult.ReturnStatus.ENTITY_NOT_FOUND:
Expand All @@ -1061,6 +1092,13 @@ public boolean dropView(TableIdentifier identifier) {
dropEntityResult.getExtraInformation());
}
}

if (purge && lastMetadata != null && dropEntityResult.getCleanupTaskId() != null) {
LOGGER.info(
"Scheduled cleanup task {} for view {}", dropEntityResult.getCleanupTaskId(), identifier);
taskExecutor.addTaskHandlerContext(dropEntityResult.getCleanupTaskId(), callContext);
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
Expand Down Expand Up @@ -85,6 +86,11 @@ private Optional<IcebergTableLikeEntity> tryGetTableEntity(TaskEntity task) {
@Override
public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
IcebergTableLikeEntity tableEntity = tryGetTableEntity(cleanupTask).orElseThrow();

if (tableEntity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
return handleViewCleanup(cleanupTask, tableEntity);
}

LOGGER
.atInfo()
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
Expand Down Expand Up @@ -151,6 +157,34 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
return false;
}

private boolean handleViewCleanup(TaskEntity cleanupTask, IcebergTableLikeEntity viewEntity) {
LOGGER
.atInfo()
.addKeyValue("viewIdentifier", viewEntity.getTableIdentifier())
.addKeyValue("metadataLocation", viewEntity.getMetadataLocation())
.log("Handling view metadata cleanup task");

try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, viewEntity.getTableIdentifier())) {
if (!TaskUtils.exists(viewEntity.getMetadataLocation(), fileIO)) {
LOGGER
.atWarn()
.addKeyValue("viewIdentifier", viewEntity.getTableIdentifier())
.addKeyValue("metadataLocation", viewEntity.getMetadataLocation())
.log("View metadata cleanup scheduled, but metadata file does not exist");
return true;
}

fileIO.deleteFile(viewEntity.getMetadataLocation());
LOGGER
.atInfo()
.addKeyValue("viewIdentifier", viewEntity.getTableIdentifier())
.addKeyValue("metadataLocation", viewEntity.getMetadataLocation())
.log("Successfully deleted view metadata file");

return true;
}
}

private Stream<TaskEntity> getManifestTaskStream(
TaskEntity cleanupTask,
TableMetadata tableMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ public void before(TestInfo testInfo) {
"true")
.addProperty(
FeatureConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "true")
.addProperty(
FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP.catalogConfig(), "true")
.setStorageConfigurationInfo(realmConfig, storageConfigModel, STORAGE_LOCATION)
.build()
.asCatalog(serviceIdentityProvider)));
Expand Down Expand Up @@ -1902,6 +1904,52 @@ public void testDropTableWithPurge() {
.isInstanceOf(InMemoryFileIO.class);
}

@Test
public void testDropViewWithPurge() {
MeasuredFileIOFactory measured = new MeasuredFileIOFactory();
LocalIcebergCatalog viewCatalog = newIcebergCatalog(CATALOG_NAME, metaStoreManager, measured);
viewCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

viewCatalog.createNamespace(NS);
viewCatalog
.buildView(TABLE)
.withSchema(SCHEMA)
.withDefaultNamespace(NS)
.withQuery("spark", "SELECT * FROM ns.tbl")
.create();

Assertions.assertThat(viewCatalog.dropView(TABLE)).as("Drop should succeed").isTrue();

TaskEntity taskEntity =
TaskEntity.of(
metaStoreManager
.loadTasks(polarisContext, "testExecutor", PageToken.fromLimit(1))
.getEntities()
.getFirst());
Map<String, String> properties = taskEntity.getInternalPropertiesAsMap();
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO");
taskEntity =
TaskEntity.of(
new PolarisBaseEntity.Builder(taskEntity).internalPropertiesAsMap(properties).build());
TaskFileIOSupplier taskFileIOSupplier =
new TaskFileIOSupplier(
(accessConfig, ioImplClassName, properties1) ->
measured.loadFileIO(
accessConfig, "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()),
storageAccessConfigProvider);

TableCleanupTaskHandler handler =
new TableCleanupTaskHandler(
Mockito.mock(), clock, metaStoreManagerFactory, taskFileIOSupplier);
handler.handleTask(taskEntity, polarisContext);
Assertions.assertThat(measured.getNumDeletedFiles())
.as("View metadata file should be deleted")
.isGreaterThan(0);
}

@Test
public void testDropTableWithPurgeDisabled() {
// Create a catalog with purge disabled:
Expand Down