diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java index 262d846ac5..e9ad80c5ad 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java @@ -124,6 +124,7 @@ public void before(TestInfo testInfo) { .addProperty( FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true") .addProperty(FeatureConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "true") + .addProperty(FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP.catalogConfig(), "false") .build(); Catalog catalog = PolarisCatalog.builder() diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/LocalIcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/LocalIcebergCatalog.java index 446ce2e553..4424e4e67e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/LocalIcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/LocalIcebergCatalog.java @@ -1035,8 +1035,39 @@ public boolean dropView(TableIdentifier identifier) { boolean purge = realmConfig.getConfig(FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP, catalogEntity); + Map storageProperties = Map.of(); + ViewMetadata lastMetadata = null; + + if (purge) { + ViewOperations ops = newViewOps(identifier); + ViewMetadata currentMetadata = ops.current(); + if (currentMetadata != null && currentMetadata.location() != null) { + lastMetadata = currentMetadata; + + Map clone = new HashMap<>(); + clone.putAll(lastMetadata.properties()); + clone.put(CatalogProperties.FILE_IO_IMPL, ioImplClassName); + + PolarisResolvedPathWrapper resolvedViewEntities = + resolvedEntityView.getResolvedPath( + ResolvedPathKey.ofTableLike(identifier), PolarisEntitySubType.ICEBERG_VIEW); + PolarisResolvedPathWrapper storageHierarchy = + resolvedViewEntities != null + ? resolvedViewEntities + : resolvedEntityView.getResolvedPath( + ResolvedPathKey.ofNamespace(identifier.namespace())); + Optional storageInfoEntity = + FileIOUtil.findStorageInfoFromHierarchy(storageHierarchy); + + storageInfoEntity.map(PolarisEntity::getInternalPropertiesAsMap).ifPresent(clone::putAll); + clone.put(PolarisTaskConstants.STORAGE_LOCATION, lastMetadata.location()); + + storageProperties = clone; + } + } + DropEntityResult dropEntityResult = - dropTableLike(PolarisEntitySubType.ICEBERG_VIEW, identifier, Map.of(), purge); + dropTableLike(PolarisEntitySubType.ICEBERG_VIEW, identifier, storageProperties, purge); if (!dropEntityResult.isSuccess()) { switch (dropEntityResult.getReturnStatus()) { case BaseResult.ReturnStatus.ENTITY_NOT_FOUND: @@ -1061,6 +1092,13 @@ public boolean dropView(TableIdentifier identifier) { dropEntityResult.getExtraInformation()); } } + + if (purge && lastMetadata != null && dropEntityResult.getCleanupTaskId() != null) { + LOGGER.info( + "Scheduled cleanup task {} for view {}", dropEntityResult.getCleanupTaskId(), identifier); + taskExecutor.addTaskHandlerContext(dropEntityResult.getCleanupTaskId(), callContext); + } + return true; } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index cd8e5b5a3b..208254810b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -39,6 +39,7 @@ import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; @@ -85,6 +86,11 @@ private Optional tryGetTableEntity(TaskEntity task) { @Override public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { IcebergTableLikeEntity tableEntity = tryGetTableEntity(cleanupTask).orElseThrow(); + + if (tableEntity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) { + return handleViewCleanup(cleanupTask, tableEntity); + } + LOGGER .atInfo() .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) @@ -151,6 +157,34 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { return false; } + private boolean handleViewCleanup(TaskEntity cleanupTask, IcebergTableLikeEntity viewEntity) { + LOGGER + .atInfo() + .addKeyValue("viewIdentifier", viewEntity.getTableIdentifier()) + .addKeyValue("metadataLocation", viewEntity.getMetadataLocation()) + .log("Handling view metadata cleanup task"); + + try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, viewEntity.getTableIdentifier())) { + if (!TaskUtils.exists(viewEntity.getMetadataLocation(), fileIO)) { + LOGGER + .atWarn() + .addKeyValue("viewIdentifier", viewEntity.getTableIdentifier()) + .addKeyValue("metadataLocation", viewEntity.getMetadataLocation()) + .log("View metadata cleanup scheduled, but metadata file does not exist"); + return true; + } + + fileIO.deleteFile(viewEntity.getMetadataLocation()); + LOGGER + .atInfo() + .addKeyValue("viewIdentifier", viewEntity.getTableIdentifier()) + .addKeyValue("metadataLocation", viewEntity.getMetadataLocation()) + .log("Successfully deleted view metadata file"); + + return true; + } + } + private Stream getManifestTaskStream( TaskEntity cleanupTask, TableMetadata tableMetadata, diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractLocalIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractLocalIcebergCatalogTest.java index 70f59eec0c..0443b9724f 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractLocalIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractLocalIcebergCatalogTest.java @@ -313,6 +313,8 @@ public void before(TestInfo testInfo) { "true") .addProperty( FeatureConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "true") + .addProperty( + FeatureConfiguration.PURGE_VIEW_METADATA_ON_DROP.catalogConfig(), "true") .setStorageConfigurationInfo(realmConfig, storageConfigModel, STORAGE_LOCATION) .build() .asCatalog(serviceIdentityProvider))); @@ -1902,6 +1904,52 @@ public void testDropTableWithPurge() { .isInstanceOf(InMemoryFileIO.class); } + @Test + public void testDropViewWithPurge() { + MeasuredFileIOFactory measured = new MeasuredFileIOFactory(); + LocalIcebergCatalog viewCatalog = newIcebergCatalog(CATALOG_NAME, metaStoreManager, measured); + viewCatalog.initialize( + CATALOG_NAME, + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + viewCatalog.createNamespace(NS); + viewCatalog + .buildView(TABLE) + .withSchema(SCHEMA) + .withDefaultNamespace(NS) + .withQuery("spark", "SELECT * FROM ns.tbl") + .create(); + + Assertions.assertThat(viewCatalog.dropView(TABLE)).as("Drop should succeed").isTrue(); + + TaskEntity taskEntity = + TaskEntity.of( + metaStoreManager + .loadTasks(polarisContext, "testExecutor", PageToken.fromLimit(1)) + .getEntities() + .getFirst()); + Map properties = taskEntity.getInternalPropertiesAsMap(); + properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"); + taskEntity = + TaskEntity.of( + new PolarisBaseEntity.Builder(taskEntity).internalPropertiesAsMap(properties).build()); + TaskFileIOSupplier taskFileIOSupplier = + new TaskFileIOSupplier( + (accessConfig, ioImplClassName, properties1) -> + measured.loadFileIO( + accessConfig, "org.apache.iceberg.inmemory.InMemoryFileIO", Map.of()), + storageAccessConfigProvider); + + TableCleanupTaskHandler handler = + new TableCleanupTaskHandler( + Mockito.mock(), clock, metaStoreManagerFactory, taskFileIOSupplier); + handler.handleTask(taskEntity, polarisContext); + Assertions.assertThat(measured.getNumDeletedFiles()) + .as("View metadata file should be deleted") + .isGreaterThan(0); + } + @Test public void testDropTableWithPurgeDisabled() { // Create a catalog with purge disabled: