From b9265f00e6e3e00ebbe50f6e16b6a797cf043989 Mon Sep 17 00:00:00 2001 From: iting0321 Date: Mon, 18 May 2026 13:18:55 +0800 Subject: [PATCH 1/8] add Core SPI and domain model --- .../core/config/FeatureConfiguration.java | 7 + .../core/lineage/LineageColumnEdge.java | 29 ++++ .../polaris/core/lineage/LineageDataset.java | 37 ++++++ .../core/lineage/LineageDirection.java | 26 ++++ .../polaris/core/lineage/LineageEdge.java | 29 ++++ .../core/lineage/LineageFieldReference.java | 29 ++++ .../core/lineage/LineageGranularity.java | 25 ++++ .../polaris/core/lineage/LineageGraph.java | 32 +++++ .../core/lineage/LineageIngestRequest.java | 38 ++++++ .../polaris/core/lineage/LineageNode.java | 31 +++++ .../polaris/core/lineage/LineageNodeType.java | 25 ++++ .../core/lineage/LineagePersistence.java | 28 ++++ .../core/lineage/LineageQueryRequest.java | 31 +++++ .../polaris/core/lineage/LineageService.java | 26 ++++ .../lineage/DefaultLineageService.java | 73 +++++++++++ .../lineage/DisabledLineagePersistence.java | 43 ++++++ .../service/lineage/LineageConfiguration.java | 49 +++++++ .../lineage/DefaultLineageServiceTest.java | 124 ++++++++++++++++++ 18 files changed, 682 insertions(+) create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDirection.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGranularity.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageQueryRequest.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java create mode 100644 runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 48eed5b2309..3a0ed1796bc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -357,6 +357,13 @@ public static void enforceFeatureEnabledOrThrow( .defaultValue(true) .buildFeatureConfiguration(); + public static final FeatureConfiguration ENABLE_LINEAGE = + PolarisConfiguration.builder() + .key("ENABLE_LINEAGE") + .description("If true, lineage services are enabled") + .defaultValue(false) + .buildFeatureConfiguration(); + public static final FeatureConfiguration> SUPPORTED_CATALOG_CONNECTION_TYPES = PolarisConfiguration.>builder() .key("SUPPORTED_CATALOG_CONNECTION_TYPES") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java new file mode 100644 index 00000000000..b2f91173b29 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; + +/** A field-level lineage relationship between two dataset columns. */ +public record LineageColumnEdge(LineageFieldReference source, LineageFieldReference target) { + public LineageColumnEdge { + Objects.requireNonNull(source, "source must be non-null"); + Objects.requireNonNull(target, "target must be non-null"); + } +} \ No newline at end of file diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java new file mode 100644 index 00000000000..3c5b7326b52 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; +import java.util.OptionalLong; + +/** A dataset participating in lineage. */ +public record LineageDataset( + String catalog, String namespace, String name, OptionalLong polarisEntityId) { + public LineageDataset { + Objects.requireNonNull(catalog, "catalog must be non-null"); + Objects.requireNonNull(namespace, "namespace must be non-null"); + Objects.requireNonNull(name, "name must be non-null"); + Objects.requireNonNull(polarisEntityId, "polarisEntityId must be non-null"); + } + + public LineageDataset(String catalog, String namespace, String name) { + this(catalog, namespace, name, OptionalLong.empty()); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDirection.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDirection.java new file mode 100644 index 00000000000..c3cdd8a5bad --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDirection.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +/** Supported directions for lineage queries. */ +public enum LineageDirection { + UPSTREAM, + DOWNSTREAM, + BOTH +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java new file mode 100644 index 00000000000..d2c2fbb86a6 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; + +/** A dataset-level lineage relationship. */ +public record LineageEdge(LineageDataset source, LineageDataset target) { + public LineageEdge { + Objects.requireNonNull(source, "source must be non-null"); + Objects.requireNonNull(target, "target must be non-null"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java new file mode 100644 index 00000000000..325fc7fdd9e --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; + +/** A reference to a specific field on a lineage dataset. */ +public record LineageFieldReference(LineageDataset dataset, String field) { + public LineageFieldReference { + Objects.requireNonNull(dataset, "dataset must be non-null"); + Objects.requireNonNull(field, "field must be non-null"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGranularity.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGranularity.java new file mode 100644 index 00000000000..8b202422d1b --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGranularity.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +/** Supported query granularities for lineage lookups. */ +public enum LineageGranularity { + DATASET, + COLUMN +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java new file mode 100644 index 00000000000..6863e30ba9a --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.List; +import java.util.Objects; + +/** Normalized response model for lineage queries. */ +public record LineageGraph( + LineageNode node, List upstream, List downstream) { + public LineageGraph { + Objects.requireNonNull(node, "node must be non-null"); + upstream = List.copyOf(upstream); + downstream = List.copyOf(downstream); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java new file mode 100644 index 00000000000..1f9fd4726ca --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** Extracted lineage payload that can be persisted independent of the transport event shape. */ +public record LineageIngestRequest( + List datasets, + List edges, + List columnEdges, + Optional eventTime) { + public LineageIngestRequest { + datasets = List.copyOf(datasets); + edges = List.copyOf(edges); + columnEdges = List.copyOf(columnEdges); + Objects.requireNonNull(eventTime, "eventTime must be non-null"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java new file mode 100644 index 00000000000..314a85962ba --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import jakarta.annotation.Nullable; +import java.util.Objects; + +/** A node returned in a lineage query response. */ +public record LineageNode( + String id, LineageNodeType type, @Nullable LineageDataset dataset, boolean opaque) { + public LineageNode { + Objects.requireNonNull(id, "id must be non-null"); + Objects.requireNonNull(type, "type must be non-null"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java new file mode 100644 index 00000000000..9d11988aa18 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +/** Node kinds surfaced by normalized lineage queries. */ +public enum LineageNodeType { + DATASET, + COLUMN +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java new file mode 100644 index 00000000000..78769fb17e0 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import org.apache.polaris.core.context.RealmContext; + +/** Persistence SPI for lineage storage backends. */ +public interface LineagePersistence { + void ingest(RealmContext realmContext, LineageIngestRequest request); + + LineageGraph query(RealmContext realmContext, LineageQueryRequest request); +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageQueryRequest.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageQueryRequest.java new file mode 100644 index 00000000000..1ecf5d5beeb --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageQueryRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; + +/** Request model for normalized lineage lookups. */ +public record LineageQueryRequest( + String nodeId, LineageDirection direction, LineageGranularity granularity) { + public LineageQueryRequest { + Objects.requireNonNull(nodeId, "nodeId must be non-null"); + Objects.requireNonNull(direction, "direction must be non-null"); + Objects.requireNonNull(granularity, "granularity must be non-null"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java new file mode 100644 index 00000000000..5b51ff1e1dc --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +/** Service boundary for lineage operations used by transport-layer adapters. */ +public interface LineageService { + void ingest(LineageIngestRequest request); + + LineageGraph query(LineageQueryRequest request); +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java new file mode 100644 index 00000000000..e4515848536 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage; + +import jakarta.enterprise.context.RequestScoped; +import jakarta.inject.Inject; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.lineage.LineageGraph; +import org.apache.polaris.core.lineage.LineageIngestRequest; +import org.apache.polaris.core.lineage.LineagePersistence; +import org.apache.polaris.core.lineage.LineageQueryRequest; +import org.apache.polaris.core.lineage.LineageService; + +@RequestScoped +public class DefaultLineageService implements LineageService { + private final CallContext callContext; + private final LineageConfiguration configuration; + private final LineagePersistence persistence; + + @Inject + public DefaultLineageService( + CallContext callContext, LineageConfiguration configuration, LineagePersistence persistence) { + this.callContext = callContext; + this.configuration = configuration; + this.persistence = persistence; + } + + @Override + public void ingest(LineageIngestRequest request) { + ensureEnabled(); + persistence.ingest(callContext.getRealmContext(), request); + } + + @Override + public LineageGraph query(LineageQueryRequest request) { + ensureEnabled(); + return persistence.query(callContext.getRealmContext(), request); + } + + private void ensureEnabled() { + if (!configuration.enabled()) { + throw new UnsupportedOperationException( + "Lineage is disabled: set polaris.lineage.enabled=true to enable it."); + } + + if (!callContext.getRealmConfig().getConfig(FeatureConfiguration.ENABLE_LINEAGE)) { + throw new UnsupportedOperationException( + "Feature not enabled: " + FeatureConfiguration.ENABLE_LINEAGE.key()); + } + + if (!configuration.persistence().enabled()) { + throw new UnsupportedOperationException( + "Lineage persistence is disabled: set polaris.lineage.persistence.enabled=true."); + } + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java new file mode 100644 index 00000000000..eec1d82483e --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage; + +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.lineage.LineageGraph; +import org.apache.polaris.core.lineage.LineageIngestRequest; +import org.apache.polaris.core.lineage.LineagePersistence; +import org.apache.polaris.core.lineage.LineageQueryRequest; + +/** Placeholder persistence until a concrete lineage backend is added. */ +@ApplicationScoped +public class DisabledLineagePersistence implements LineagePersistence { + private static final String MESSAGE = + "No lineage persistence implementation is configured for this deployment."; + + @Override + public void ingest(RealmContext realmContext, LineageIngestRequest request) { + throw new UnsupportedOperationException(MESSAGE); + } + + @Override + public LineageGraph query(RealmContext realmContext, LineageQueryRequest request) { + throw new UnsupportedOperationException(MESSAGE); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java new file mode 100644 index 00000000000..beee52651f5 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage; + +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "polaris.lineage") +public interface LineageConfiguration { + + @WithDefault("false") + boolean enabled(); + + @WithName("persistence") + PersistenceConfiguration persistence(); + + @WithName("dataset-resolution") + DatasetResolutionConfiguration datasetResolution(); + + interface PersistenceConfiguration { + @WithDefault("true") + boolean enabled(); + + @WithDefault("relational-jdbc") + String type(); + } + + interface DatasetResolutionConfiguration { + @WithDefault("true") + boolean enabled(); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java new file mode 100644 index 00000000000..71903ed6456 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.lineage.LineageDataset; +import org.apache.polaris.core.lineage.LineageGraph; +import org.apache.polaris.core.lineage.LineageIngestRequest; +import org.apache.polaris.core.lineage.LineageNode; +import org.apache.polaris.core.lineage.LineageNodeType; +import org.apache.polaris.core.lineage.LineagePersistence; +import org.apache.polaris.core.lineage.LineageQueryRequest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class DefaultLineageServiceTest { + @Mock private CallContext callContext; + @Mock private RealmContext realmContext; + @Mock private RealmConfig realmConfig; + @Mock private LineageConfiguration configuration; + @Mock private LineageConfiguration.PersistenceConfiguration persistenceConfiguration; + @Mock private LineagePersistence persistence; + + private DefaultLineageService service; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(callContext.getRealmContext()).thenReturn(realmContext); + when(callContext.getRealmConfig()).thenReturn(realmConfig); + when(configuration.persistence()).thenReturn(persistenceConfiguration); + service = new DefaultLineageService(callContext, configuration, persistence); + } + + @Test + void ingestThrowsWhenStaticConfigDisabled() { + when(configuration.enabled()).thenReturn(false); + + assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("polaris.lineage.enabled"); + + verify(persistence, never()).ingest( + org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + } + + @Test + void queryThrowsWhenRealmFeatureDisabled() { + when(configuration.enabled()).thenReturn(true); + when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(false); + + assertThatThrownBy(() -> service.query(queryRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); + + verify(persistence, never()).query( + org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + } + + @Test + void delegatesWhenLineageEnabled() { + LineageGraph graph = + new LineageGraph( + new LineageNode( + "dataset:test:orders", LineageNodeType.DATASET, dataset("test", "orders"), false), + List.of(), + List.of()); + + when(configuration.enabled()).thenReturn(true); + when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(true); + when(persistenceConfiguration.enabled()).thenReturn(true); + when(persistence.query(realmContext, queryRequest())).thenReturn(graph); + + service.ingest(emptyIngestRequest()); + service.query(queryRequest()); + + verify(persistence).ingest(realmContext, emptyIngestRequest()); + verify(persistence).query(realmContext, queryRequest()); + } + + private static LineageIngestRequest emptyIngestRequest() { + return new LineageIngestRequest(List.of(), List.of(), List.of(), Optional.empty()); + } + + private static LineageQueryRequest queryRequest() { + return new LineageQueryRequest( + "dataset:test:orders", + org.apache.polaris.core.lineage.LineageDirection.BOTH, + org.apache.polaris.core.lineage.LineageGranularity.DATASET); + } + + private static LineageDataset dataset(String namespace, String name) { + return new LineageDataset("test-catalog", namespace, name, OptionalLong.empty()); + } +} From d630a09fb46ae209e4ebbe8b98f4428850d744f3 Mon Sep 17 00:00:00 2001 From: iting0321 Date: Wed, 20 May 2026 14:01:53 +0800 Subject: [PATCH 2/8] update the node type --- .../apache/polaris/core/lineage/LineageColumnEdge.java | 2 +- .../org/apache/polaris/core/lineage/LineageGraph.java | 1 + .../org/apache/polaris/core/lineage/LineageNodeType.java | 3 +-- .../service/lineage/DefaultLineageServiceTest.java | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java index b2f91173b29..d8190f65440 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java @@ -26,4 +26,4 @@ public record LineageColumnEdge(LineageFieldReference source, LineageFieldRefere Objects.requireNonNull(source, "source must be non-null"); Objects.requireNonNull(target, "target must be non-null"); } -} \ No newline at end of file +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java index 6863e30ba9a..b005d31814c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java @@ -22,6 +22,7 @@ import java.util.Objects; /** Normalized response model for lineage queries. */ +// TODO: Check the data field content and add the corresponding data. public record LineageGraph( LineageNode node, List upstream, List downstream) { public LineageGraph { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java index 9d11988aa18..492dda54e23 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java @@ -20,6 +20,5 @@ /** Node kinds surfaced by normalized lineage queries. */ public enum LineageNodeType { - DATASET, - COLUMN + DATASET } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java index 71903ed6456..fd5a2187ae5 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java @@ -69,8 +69,8 @@ void ingestThrowsWhenStaticConfigDisabled() { .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("polaris.lineage.enabled"); - verify(persistence, never()).ingest( - org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + verify(persistence, never()) + .ingest(org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); } @Test @@ -82,8 +82,8 @@ void queryThrowsWhenRealmFeatureDisabled() { .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); - verify(persistence, never()).query( - org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + verify(persistence, never()) + .query(org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); } @Test From c073aae61cbccb739322d05b539763cee87f65bd Mon Sep 17 00:00:00 2001 From: iting0321 Date: Sat, 23 May 2026 18:13:43 +0800 Subject: [PATCH 3/8] add lineage data field --- .../polaris/core/lineage/LineageData.java | 71 +++++++++++++++++++ .../core/lineage/LineageFieldMapping.java | 33 +++++++++ .../polaris/core/lineage/LineageGraph.java | 1 - .../polaris/core/lineage/LineageNode.java | 26 ++++++- .../polaris/core/lineage/LineageNodeType.java | 3 +- 5 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java new file mode 100644 index 00000000000..712e711f071 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import jakarta.annotation.Nullable; +import java.util.Objects; +import java.util.OptionalLong; + +/** Dataset metadata returned in a lineage query response. */ +public record LineageData( + OptionalLong catalogId, + OptionalLong datasetId, + String namespace, + String name, + @Nullable String subType, + OptionalLong createdAt, + OptionalLong updatedAt) { + public LineageData { + Objects.requireNonNull(catalogId, "catalogId must be non-null"); + Objects.requireNonNull(datasetId, "datasetId must be non-null"); + Objects.requireNonNull(namespace, "namespace must be non-null"); + Objects.requireNonNull(name, "name must be non-null"); + Objects.requireNonNull(createdAt, "createdAt must be non-null"); + Objects.requireNonNull(updatedAt, "updatedAt must be non-null"); + } + + public LineageData( + long catalogId, + long datasetId, + String namespace, + String name, + @Nullable String subType, + long createdAt, + long updatedAt) { + this( + OptionalLong.of(catalogId), + OptionalLong.of(datasetId), + namespace, + name, + subType, + OptionalLong.of(createdAt), + OptionalLong.of(updatedAt)); + } + + public LineageData(LineageDataset dataset) { + this( + OptionalLong.empty(), + dataset.polarisEntityId(), + dataset.namespace(), + dataset.name(), + null, + OptionalLong.empty(), + OptionalLong.empty()); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java new file mode 100644 index 00000000000..1c8e7a91e38 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.lineage; + +import java.util.Objects; + +/** A source-to-target field mapping returned for column-granularity queries. */ +public record LineageFieldMapping(String sourceField, String targetField) { + public LineageFieldMapping { + Objects.requireNonNull(sourceField, "sourceField must be non-null"); + Objects.requireNonNull(targetField, "targetField must be non-null"); + } + + public LineageFieldMapping(LineageColumnEdge edge) { + this(edge.source().field(), edge.target().field()); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java index b005d31814c..6863e30ba9a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java @@ -22,7 +22,6 @@ import java.util.Objects; /** Normalized response model for lineage queries. */ -// TODO: Check the data field content and add the corresponding data. public record LineageGraph( LineageNode node, List upstream, List downstream) { public LineageGraph { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java index 314a85962ba..3c24b09d08d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java @@ -19,13 +19,37 @@ package org.apache.polaris.core.lineage; import jakarta.annotation.Nullable; +import java.util.List; import java.util.Objects; /** A node returned in a lineage query response. */ public record LineageNode( - String id, LineageNodeType type, @Nullable LineageDataset dataset, boolean opaque) { + String id, + LineageNodeType type, + @Nullable LineageData data, + boolean opaque, + List fieldMappings) { public LineageNode { Objects.requireNonNull(id, "id must be non-null"); Objects.requireNonNull(type, "type must be non-null"); + fieldMappings = List.copyOf(fieldMappings); + } + + public LineageNode(String id, LineageNodeType type, @Nullable LineageData data, boolean opaque) { + this(id, type, data, opaque, List.of()); + } + + public LineageNode( + String id, LineageNodeType type, @Nullable LineageDataset dataset, boolean opaque) { + this(id, type, dataset == null ? null : new LineageData(dataset), opaque); + } + + public LineageNode( + String id, + LineageNodeType type, + @Nullable LineageDataset dataset, + boolean opaque, + List fieldMappings) { + this(id, type, dataset == null ? null : new LineageData(dataset), opaque, fieldMappings); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java index 492dda54e23..9d11988aa18 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNodeType.java @@ -20,5 +20,6 @@ /** Node kinds surfaced by normalized lineage queries. */ public enum LineageNodeType { - DATASET + DATASET, + COLUMN } From b85e227d88711cea69ac7b756077b29e86f5d99d Mon Sep 17 00:00:00 2001 From: iting0321 Date: Mon, 25 May 2026 15:40:38 +0800 Subject: [PATCH 4/8] fix the DefaultLineageServiceTest.java for feature is dark coverage --- .../service/lineage/LineageConfiguration.java | 4 +- .../lineage/DefaultLineageServiceTest.java | 37 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java index beee52651f5..9c27a127978 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java @@ -35,7 +35,7 @@ public interface LineageConfiguration { DatasetResolutionConfiguration datasetResolution(); interface PersistenceConfiguration { - @WithDefault("true") + @WithDefault("false") boolean enabled(); @WithDefault("relational-jdbc") @@ -43,7 +43,7 @@ interface PersistenceConfiguration { } interface DatasetResolutionConfiguration { - @WithDefault("true") + @WithDefault("false") boolean enabled(); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java index fd5a2187ae5..1061a015688 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java @@ -19,8 +19,8 @@ package org.apache.polaris.service.lineage; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.util.List; @@ -62,28 +62,51 @@ void setUp() { } @Test - void ingestThrowsWhenStaticConfigDisabled() { + void throwsWhenStaticConfigDisabled() { when(configuration.enabled()).thenReturn(false); assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("polaris.lineage.enabled"); - verify(persistence, never()) - .ingest(org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + assertThatThrownBy(() -> service.query(queryRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("polaris.lineage.enabled"); + + verifyNoInteractions(persistence); } @Test - void queryThrowsWhenRealmFeatureDisabled() { + void throwsWhenRealmFeatureDisabled() { when(configuration.enabled()).thenReturn(true); when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(false); + assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); + assertThatThrownBy(() -> service.query(queryRequest())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); - verify(persistence, never()) - .query(org.mockito.ArgumentMatchers.any(), org.mockito.ArgumentMatchers.any()); + verifyNoInteractions(persistence); + } + + @Test + void throwsWhenPersistenceDisabled() { + when(configuration.enabled()).thenReturn(true); + when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(true); + when(persistenceConfiguration.enabled()).thenReturn(false); + + assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("polaris.lineage.persistence.enabled"); + + assertThatThrownBy(() -> service.query(queryRequest())) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("polaris.lineage.persistence.enabled"); + + verifyNoInteractions(persistence); } @Test From c8bd14096c3dcc041c3e92c8bace2df0b64e59d3 Mon Sep 17 00:00:00 2001 From: iting0321 Date: Wed, 3 Jun 2026 14:36:46 +0800 Subject: [PATCH 5/8] refactor LineagePersistence --- .../core/lineage/LineagePersistence.java | 42 +++++++++++++++++-- .../lineage/DefaultLineageService.java | 9 +++- .../lineage/DisabledLineagePersistence.java | 22 ++++++++-- .../lineage/DefaultLineageServiceTest.java | 30 +++++++++++-- 4 files changed, 91 insertions(+), 12 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java index 78769fb17e0..d30ea312410 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java @@ -18,11 +18,47 @@ */ package org.apache.polaris.core.lineage; +import java.time.Instant; +import java.util.List; import org.apache.polaris.core.context.RealmContext; -/** Persistence SPI for lineage storage backends. */ +/** + * Persistence SPI for lineage storage backends. + * + *

This contract is intentionally expressed in terms of Polaris's local lineage graph rather than + * OpenLineage run events. The service layer owns event parsing, forwarding, authorization, and + * dataset resolution. Persistence backends only need to atomically upsert dataset nodes, dataset + * edges, column edges, and load normalized lineage graphs. + */ public interface LineagePersistence { - void ingest(RealmContext realmContext, LineageIngestRequest request); - LineageGraph query(RealmContext realmContext, LineageQueryRequest request); + /** + * Upserts dataset nodes. + * + *

Implementations should treat {@code (realm, catalog, namespace, name)} as the stable dataset + * identity and update metadata such as the linked Polaris entity when the dataset already exists. + */ + void upsertDatasets(RealmContext realmContext, List datasets); + + /** + * Upserts directed dataset-level lineage edges. + * + *

Implementations should treat {@code (realm, source_dataset, target_dataset)} as unique and + * update the edge timestamp when the same relationship is asserted again. + */ + void upsertDatasetEdges( + RealmContext realmContext, List edges, Instant lastEventAt); + + /** + * Upserts directed column-level lineage edges. + * + *

Implementations should treat {@code (realm, source_dataset, source_field, target_dataset, + * target_field)} as unique and update the edge timestamp when the same relationship is asserted + * again. + */ + void upsertColumnEdges( + RealmContext realmContext, List columnEdges, Instant lastEventAt); + + /** Loads a normalized lineage graph for the requested node and direction. */ + LineageGraph loadLineage(RealmContext realmContext, LineageQueryRequest request); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java index e4515848536..5a2e349f48b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java @@ -20,6 +20,7 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; +import java.time.Instant; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.lineage.LineageGraph; @@ -45,13 +46,17 @@ public DefaultLineageService( @Override public void ingest(LineageIngestRequest request) { ensureEnabled(); - persistence.ingest(callContext.getRealmContext(), request); + Instant lastEventAt = request.eventTime().orElseGet(Instant::now); + persistence.upsertDatasets(callContext.getRealmContext(), request.datasets()); + persistence.upsertDatasetEdges(callContext.getRealmContext(), request.edges(), lastEventAt); + persistence.upsertColumnEdges( + callContext.getRealmContext(), request.columnEdges(), lastEventAt); } @Override public LineageGraph query(LineageQueryRequest request) { ensureEnabled(); - return persistence.query(callContext.getRealmContext(), request); + return persistence.loadLineage(callContext.getRealmContext(), request); } private void ensureEnabled() { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java index eec1d82483e..c78ccd7719c 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java @@ -19,9 +19,13 @@ package org.apache.polaris.service.lineage; import jakarta.enterprise.context.ApplicationScoped; +import java.time.Instant; +import java.util.List; import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.lineage.LineageColumnEdge; +import org.apache.polaris.core.lineage.LineageDataset; +import org.apache.polaris.core.lineage.LineageEdge; import org.apache.polaris.core.lineage.LineageGraph; -import org.apache.polaris.core.lineage.LineageIngestRequest; import org.apache.polaris.core.lineage.LineagePersistence; import org.apache.polaris.core.lineage.LineageQueryRequest; @@ -32,12 +36,24 @@ public class DisabledLineagePersistence implements LineagePersistence { "No lineage persistence implementation is configured for this deployment."; @Override - public void ingest(RealmContext realmContext, LineageIngestRequest request) { + public void upsertDatasets(RealmContext realmContext, List datasets) { throw new UnsupportedOperationException(MESSAGE); } @Override - public LineageGraph query(RealmContext realmContext, LineageQueryRequest request) { + public void upsertDatasetEdges( + RealmContext realmContext, List edges, Instant lastEventAt) { + throw new UnsupportedOperationException(MESSAGE); + } + + @Override + public void upsertColumnEdges( + RealmContext realmContext, List columnEdges, Instant lastEventAt) { + throw new UnsupportedOperationException(MESSAGE); + } + + @Override + public LineageGraph loadLineage(RealmContext realmContext, LineageQueryRequest request) { throw new UnsupportedOperationException(MESSAGE); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java index 1061a015688..cbcd3c28513 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -30,7 +31,10 @@ import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.lineage.LineageColumnEdge; import org.apache.polaris.core.lineage.LineageDataset; +import org.apache.polaris.core.lineage.LineageEdge; +import org.apache.polaris.core.lineage.LineageFieldReference; import org.apache.polaris.core.lineage.LineageGraph; import org.apache.polaris.core.lineage.LineageIngestRequest; import org.apache.polaris.core.lineage.LineageNode; @@ -43,6 +47,8 @@ import org.mockito.MockitoAnnotations; public class DefaultLineageServiceTest { + private static final Instant EVENT_TIME = Instant.parse("2026-01-01T00:00:00Z"); + @Mock private CallContext callContext; @Mock private RealmContext realmContext; @Mock private RealmConfig realmConfig; @@ -111,6 +117,7 @@ void throwsWhenPersistenceDisabled() { @Test void delegatesWhenLineageEnabled() { + LineageIngestRequest ingestRequest = ingestRequest(); LineageGraph graph = new LineageGraph( new LineageNode( @@ -121,19 +128,34 @@ void delegatesWhenLineageEnabled() { when(configuration.enabled()).thenReturn(true); when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(true); when(persistenceConfiguration.enabled()).thenReturn(true); - when(persistence.query(realmContext, queryRequest())).thenReturn(graph); + when(persistence.loadLineage(realmContext, queryRequest())).thenReturn(graph); - service.ingest(emptyIngestRequest()); + service.ingest(ingestRequest); service.query(queryRequest()); - verify(persistence).ingest(realmContext, emptyIngestRequest()); - verify(persistence).query(realmContext, queryRequest()); + verify(persistence).upsertDatasets(realmContext, ingestRequest.datasets()); + verify(persistence).upsertDatasetEdges(realmContext, ingestRequest.edges(), EVENT_TIME); + verify(persistence).upsertColumnEdges(realmContext, ingestRequest.columnEdges(), EVENT_TIME); + verify(persistence).loadLineage(realmContext, queryRequest()); } private static LineageIngestRequest emptyIngestRequest() { return new LineageIngestRequest(List.of(), List.of(), List.of(), Optional.empty()); } + private static LineageIngestRequest ingestRequest() { + LineageDataset source = dataset("raw", "orders"); + LineageDataset target = dataset("test", "orders"); + return new LineageIngestRequest( + List.of(source, target), + List.of(new LineageEdge(source, target)), + List.of( + new LineageColumnEdge( + new LineageFieldReference(source, "id"), + new LineageFieldReference(target, "order_id"))), + Optional.of(EVENT_TIME)); + } + private static LineageQueryRequest queryRequest() { return new LineageQueryRequest( "dataset:test:orders", From a793842c18953ae4738b0f93382fa735e2c95b88 Mon Sep 17 00:00:00 2001 From: iting0321 Date: Thu, 11 Jun 2026 21:34:32 +0800 Subject: [PATCH 6/8] remove jakarta.annotation.Nullable --- .../org/apache/polaris/core/lineage/LineageData.java | 5 ++--- .../org/apache/polaris/core/lineage/LineageNode.java | 10 ++++------ .../polaris/core/lineage/LineagePersistence.java | 3 +-- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java index 712e711f071..bddffc8cec0 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java @@ -18,7 +18,6 @@ */ package org.apache.polaris.core.lineage; -import jakarta.annotation.Nullable; import java.util.Objects; import java.util.OptionalLong; @@ -28,7 +27,7 @@ public record LineageData( OptionalLong datasetId, String namespace, String name, - @Nullable String subType, + String subType, OptionalLong createdAt, OptionalLong updatedAt) { public LineageData { @@ -45,7 +44,7 @@ public LineageData( long datasetId, String namespace, String name, - @Nullable String subType, + String subType, long createdAt, long updatedAt) { this( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java index 3c24b09d08d..67c47657f65 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java @@ -18,7 +18,6 @@ */ package org.apache.polaris.core.lineage; -import jakarta.annotation.Nullable; import java.util.List; import java.util.Objects; @@ -26,7 +25,7 @@ public record LineageNode( String id, LineageNodeType type, - @Nullable LineageData data, + LineageData data, boolean opaque, List fieldMappings) { public LineageNode { @@ -35,19 +34,18 @@ public record LineageNode( fieldMappings = List.copyOf(fieldMappings); } - public LineageNode(String id, LineageNodeType type, @Nullable LineageData data, boolean opaque) { + public LineageNode(String id, LineageNodeType type, LineageData data, boolean opaque) { this(id, type, data, opaque, List.of()); } - public LineageNode( - String id, LineageNodeType type, @Nullable LineageDataset dataset, boolean opaque) { + public LineageNode(String id, LineageNodeType type, LineageDataset dataset, boolean opaque) { this(id, type, dataset == null ? null : new LineageData(dataset), opaque); } public LineageNode( String id, LineageNodeType type, - @Nullable LineageDataset dataset, + LineageDataset dataset, boolean opaque, List fieldMappings) { this(id, type, dataset == null ? null : new LineageData(dataset), opaque, fieldMappings); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java index d30ea312410..30ab8fcb798 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java @@ -46,8 +46,7 @@ public interface LineagePersistence { *

Implementations should treat {@code (realm, source_dataset, target_dataset)} as unique and * update the edge timestamp when the same relationship is asserted again. */ - void upsertDatasetEdges( - RealmContext realmContext, List edges, Instant lastEventAt); + void upsertDatasetEdges(RealmContext realmContext, List edges, Instant lastEventAt); /** * Upserts directed column-level lineage edges. From 00c432a9f7c2f0d34bc583beebf6b463b0b06cde Mon Sep 17 00:00:00 2001 From: iting0321 Date: Thu, 11 Jun 2026 22:06:38 +0800 Subject: [PATCH 7/8] add lineage feature configurations to documentation --- .../config-sections/flags-polaris_features.md | 9 ++++++ .../smallrye-polaris_lineage.md | 31 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md diff --git a/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md b/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md index 3a24803023c..b1296a36d24 100644 --- a/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md +++ b/site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md @@ -298,6 +298,15 @@ If true, the generic-tables endpoints are enabled --- +##### `polaris.features."ENABLE_LINEAGE"` + +If true, lineage services are enabled + +- **Type:** `Boolean` +- **Default:** `false` + +--- + ##### `polaris.features."ENABLE_POLICY_STORE"` If true, the policy-store endpoints are enabled diff --git a/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md new file mode 100644 index 00000000000..0e54a2c3a9c --- /dev/null +++ b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md @@ -0,0 +1,31 @@ +--- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +title: smallrye-polaris_lineage +build: + list: never + render: never +--- + +| Property | Default Value | Type | Description | +|----------|---------------|------|-------------| +| `polaris.lineage.enabled` | `false` | `boolean` | | +| `polaris.lineage.persistence.enabled` | `false` | `boolean` | | +| `polaris.lineage.persistence.type` | `relational-jdbc` | `string` | | +| `polaris.lineage.dataset-resolution.enabled` | `false` | `boolean` | | From bf4c5d9007c90419d2339ecafc0335450a6b72d8 Mon Sep 17 00:00:00 2001 From: iting0321 Date: Fri, 12 Jun 2026 23:40:23 +0800 Subject: [PATCH 8/8] remove persistence-related model --- .../core/lineage/LineageColumnEdge.java | 29 ------ .../polaris/core/lineage/LineageData.java | 11 --- .../polaris/core/lineage/LineageDataset.java | 37 ------- .../polaris/core/lineage/LineageEdge.java | 29 ------ .../core/lineage/LineageFieldMapping.java | 4 - .../core/lineage/LineageFieldReference.java | 29 ------ .../polaris/core/lineage/LineageGraph.java | 4 +- .../core/lineage/LineageIngestRequest.java | 38 ------- .../polaris/core/lineage/LineageNode.java | 16 +-- .../core/lineage/LineagePersistence.java | 63 ------------ .../polaris/core/lineage/LineageService.java | 2 - .../lineage/DefaultLineageService.java | 29 +----- .../lineage/DisabledLineagePersistence.java | 59 ----------- .../service/lineage/LineageConfiguration.java | 20 ---- .../lineage/DefaultLineageServiceTest.java | 99 ++----------------- .../smallrye-polaris_lineage.md | 3 - 16 files changed, 15 insertions(+), 457 deletions(-) delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java delete mode 100644 polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java delete mode 100644 runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java deleted file mode 100644 index d8190f65440..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageColumnEdge.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.util.Objects; - -/** A field-level lineage relationship between two dataset columns. */ -public record LineageColumnEdge(LineageFieldReference source, LineageFieldReference target) { - public LineageColumnEdge { - Objects.requireNonNull(source, "source must be non-null"); - Objects.requireNonNull(target, "target must be non-null"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java index bddffc8cec0..19db4608dd1 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageData.java @@ -56,15 +56,4 @@ public LineageData( OptionalLong.of(createdAt), OptionalLong.of(updatedAt)); } - - public LineageData(LineageDataset dataset) { - this( - OptionalLong.empty(), - dataset.polarisEntityId(), - dataset.namespace(), - dataset.name(), - null, - OptionalLong.empty(), - OptionalLong.empty()); - } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java deleted file mode 100644 index 3c5b7326b52..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageDataset.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.util.Objects; -import java.util.OptionalLong; - -/** A dataset participating in lineage. */ -public record LineageDataset( - String catalog, String namespace, String name, OptionalLong polarisEntityId) { - public LineageDataset { - Objects.requireNonNull(catalog, "catalog must be non-null"); - Objects.requireNonNull(namespace, "namespace must be non-null"); - Objects.requireNonNull(name, "name must be non-null"); - Objects.requireNonNull(polarisEntityId, "polarisEntityId must be non-null"); - } - - public LineageDataset(String catalog, String namespace, String name) { - this(catalog, namespace, name, OptionalLong.empty()); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java deleted file mode 100644 index d2c2fbb86a6..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageEdge.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.util.Objects; - -/** A dataset-level lineage relationship. */ -public record LineageEdge(LineageDataset source, LineageDataset target) { - public LineageEdge { - Objects.requireNonNull(source, "source must be non-null"); - Objects.requireNonNull(target, "target must be non-null"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java index 1c8e7a91e38..0fa72497856 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldMapping.java @@ -26,8 +26,4 @@ public record LineageFieldMapping(String sourceField, String targetField) { Objects.requireNonNull(sourceField, "sourceField must be non-null"); Objects.requireNonNull(targetField, "targetField must be non-null"); } - - public LineageFieldMapping(LineageColumnEdge edge) { - this(edge.source().field(), edge.target().field()); - } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java deleted file mode 100644 index 325fc7fdd9e..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageFieldReference.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.util.Objects; - -/** A reference to a specific field on a lineage dataset. */ -public record LineageFieldReference(LineageDataset dataset, String field) { - public LineageFieldReference { - Objects.requireNonNull(dataset, "dataset must be non-null"); - Objects.requireNonNull(field, "field must be non-null"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java index 6863e30ba9a..bbe878f59eb 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageGraph.java @@ -26,7 +26,7 @@ public record LineageGraph( LineageNode node, List upstream, List downstream) { public LineageGraph { Objects.requireNonNull(node, "node must be non-null"); - upstream = List.copyOf(upstream); - downstream = List.copyOf(downstream); + upstream = List.copyOf(Objects.requireNonNull(upstream, "upstream must be non-null")); + downstream = List.copyOf(Objects.requireNonNull(downstream, "downstream must be non-null")); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java deleted file mode 100644 index 1f9fd4726ca..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageIngestRequest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.time.Instant; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -/** Extracted lineage payload that can be persisted independent of the transport event shape. */ -public record LineageIngestRequest( - List datasets, - List edges, - List columnEdges, - Optional eventTime) { - public LineageIngestRequest { - datasets = List.copyOf(datasets); - edges = List.copyOf(edges); - columnEdges = List.copyOf(columnEdges); - Objects.requireNonNull(eventTime, "eventTime must be non-null"); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java index 67c47657f65..07839b5715d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageNode.java @@ -31,23 +31,11 @@ public record LineageNode( public LineageNode { Objects.requireNonNull(id, "id must be non-null"); Objects.requireNonNull(type, "type must be non-null"); - fieldMappings = List.copyOf(fieldMappings); + fieldMappings = + List.copyOf(Objects.requireNonNull(fieldMappings, "fieldMappings must be non-null")); } public LineageNode(String id, LineageNodeType type, LineageData data, boolean opaque) { this(id, type, data, opaque, List.of()); } - - public LineageNode(String id, LineageNodeType type, LineageDataset dataset, boolean opaque) { - this(id, type, dataset == null ? null : new LineageData(dataset), opaque); - } - - public LineageNode( - String id, - LineageNodeType type, - LineageDataset dataset, - boolean opaque, - List fieldMappings) { - this(id, type, dataset == null ? null : new LineageData(dataset), opaque, fieldMappings); - } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java deleted file mode 100644 index 30ab8fcb798..00000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineagePersistence.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.lineage; - -import java.time.Instant; -import java.util.List; -import org.apache.polaris.core.context.RealmContext; - -/** - * Persistence SPI for lineage storage backends. - * - *

This contract is intentionally expressed in terms of Polaris's local lineage graph rather than - * OpenLineage run events. The service layer owns event parsing, forwarding, authorization, and - * dataset resolution. Persistence backends only need to atomically upsert dataset nodes, dataset - * edges, column edges, and load normalized lineage graphs. - */ -public interface LineagePersistence { - - /** - * Upserts dataset nodes. - * - *

Implementations should treat {@code (realm, catalog, namespace, name)} as the stable dataset - * identity and update metadata such as the linked Polaris entity when the dataset already exists. - */ - void upsertDatasets(RealmContext realmContext, List datasets); - - /** - * Upserts directed dataset-level lineage edges. - * - *

Implementations should treat {@code (realm, source_dataset, target_dataset)} as unique and - * update the edge timestamp when the same relationship is asserted again. - */ - void upsertDatasetEdges(RealmContext realmContext, List edges, Instant lastEventAt); - - /** - * Upserts directed column-level lineage edges. - * - *

Implementations should treat {@code (realm, source_dataset, source_field, target_dataset, - * target_field)} as unique and update the edge timestamp when the same relationship is asserted - * again. - */ - void upsertColumnEdges( - RealmContext realmContext, List columnEdges, Instant lastEventAt); - - /** Loads a normalized lineage graph for the requested node and direction. */ - LineageGraph loadLineage(RealmContext realmContext, LineageQueryRequest request); -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java index 5b51ff1e1dc..13898516dee 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/lineage/LineageService.java @@ -20,7 +20,5 @@ /** Service boundary for lineage operations used by transport-layer adapters. */ public interface LineageService { - void ingest(LineageIngestRequest request); - LineageGraph query(LineageQueryRequest request); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java index 5a2e349f48b..730957aedb7 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DefaultLineageService.java @@ -20,12 +20,9 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; -import java.time.Instant; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.lineage.LineageGraph; -import org.apache.polaris.core.lineage.LineageIngestRequest; -import org.apache.polaris.core.lineage.LineagePersistence; import org.apache.polaris.core.lineage.LineageQueryRequest; import org.apache.polaris.core.lineage.LineageService; @@ -33,30 +30,17 @@ public class DefaultLineageService implements LineageService { private final CallContext callContext; private final LineageConfiguration configuration; - private final LineagePersistence persistence; @Inject - public DefaultLineageService( - CallContext callContext, LineageConfiguration configuration, LineagePersistence persistence) { + public DefaultLineageService(CallContext callContext, LineageConfiguration configuration) { this.callContext = callContext; this.configuration = configuration; - this.persistence = persistence; - } - - @Override - public void ingest(LineageIngestRequest request) { - ensureEnabled(); - Instant lastEventAt = request.eventTime().orElseGet(Instant::now); - persistence.upsertDatasets(callContext.getRealmContext(), request.datasets()); - persistence.upsertDatasetEdges(callContext.getRealmContext(), request.edges(), lastEventAt); - persistence.upsertColumnEdges( - callContext.getRealmContext(), request.columnEdges(), lastEventAt); } @Override public LineageGraph query(LineageQueryRequest request) { ensureEnabled(); - return persistence.loadLineage(callContext.getRealmContext(), request); + throw new UnsupportedOperationException("Lineage query is not implemented yet."); } private void ensureEnabled() { @@ -67,12 +51,9 @@ private void ensureEnabled() { if (!callContext.getRealmConfig().getConfig(FeatureConfiguration.ENABLE_LINEAGE)) { throw new UnsupportedOperationException( - "Feature not enabled: " + FeatureConfiguration.ENABLE_LINEAGE.key()); - } - - if (!configuration.persistence().enabled()) { - throw new UnsupportedOperationException( - "Lineage persistence is disabled: set polaris.lineage.persistence.enabled=true."); + "Lineage realm feature is disabled: enable " + + FeatureConfiguration.ENABLE_LINEAGE.key() + + " in the realm feature configuration."); } } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java deleted file mode 100644 index c78ccd7719c..00000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/DisabledLineagePersistence.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.lineage; - -import jakarta.enterprise.context.ApplicationScoped; -import java.time.Instant; -import java.util.List; -import org.apache.polaris.core.context.RealmContext; -import org.apache.polaris.core.lineage.LineageColumnEdge; -import org.apache.polaris.core.lineage.LineageDataset; -import org.apache.polaris.core.lineage.LineageEdge; -import org.apache.polaris.core.lineage.LineageGraph; -import org.apache.polaris.core.lineage.LineagePersistence; -import org.apache.polaris.core.lineage.LineageQueryRequest; - -/** Placeholder persistence until a concrete lineage backend is added. */ -@ApplicationScoped -public class DisabledLineagePersistence implements LineagePersistence { - private static final String MESSAGE = - "No lineage persistence implementation is configured for this deployment."; - - @Override - public void upsertDatasets(RealmContext realmContext, List datasets) { - throw new UnsupportedOperationException(MESSAGE); - } - - @Override - public void upsertDatasetEdges( - RealmContext realmContext, List edges, Instant lastEventAt) { - throw new UnsupportedOperationException(MESSAGE); - } - - @Override - public void upsertColumnEdges( - RealmContext realmContext, List columnEdges, Instant lastEventAt) { - throw new UnsupportedOperationException(MESSAGE); - } - - @Override - public LineageGraph loadLineage(RealmContext realmContext, LineageQueryRequest request) { - throw new UnsupportedOperationException(MESSAGE); - } -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java index 9c27a127978..6d27e0bdd31 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/LineageConfiguration.java @@ -20,30 +20,10 @@ import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; -import io.smallrye.config.WithName; @ConfigMapping(prefix = "polaris.lineage") public interface LineageConfiguration { @WithDefault("false") boolean enabled(); - - @WithName("persistence") - PersistenceConfiguration persistence(); - - @WithName("dataset-resolution") - DatasetResolutionConfiguration datasetResolution(); - - interface PersistenceConfiguration { - @WithDefault("false") - boolean enabled(); - - @WithDefault("relational-jdbc") - String type(); - } - - interface DatasetResolutionConfiguration { - @WithDefault("false") - boolean enabled(); - } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java index cbcd3c28513..ce81095221c 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/lineage/DefaultLineageServiceTest.java @@ -19,27 +19,13 @@ package org.apache.polaris.service.lineage; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import java.time.Instant; -import java.util.List; -import java.util.Optional; -import java.util.OptionalLong; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; -import org.apache.polaris.core.lineage.LineageColumnEdge; -import org.apache.polaris.core.lineage.LineageDataset; -import org.apache.polaris.core.lineage.LineageEdge; -import org.apache.polaris.core.lineage.LineageFieldReference; -import org.apache.polaris.core.lineage.LineageGraph; -import org.apache.polaris.core.lineage.LineageIngestRequest; -import org.apache.polaris.core.lineage.LineageNode; -import org.apache.polaris.core.lineage.LineageNodeType; -import org.apache.polaris.core.lineage.LineagePersistence; +import org.apache.polaris.core.lineage.LineageDirection; +import org.apache.polaris.core.lineage.LineageGranularity; import org.apache.polaris.core.lineage.LineageQueryRequest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,39 +33,26 @@ import org.mockito.MockitoAnnotations; public class DefaultLineageServiceTest { - private static final Instant EVENT_TIME = Instant.parse("2026-01-01T00:00:00Z"); - @Mock private CallContext callContext; - @Mock private RealmContext realmContext; @Mock private RealmConfig realmConfig; @Mock private LineageConfiguration configuration; - @Mock private LineageConfiguration.PersistenceConfiguration persistenceConfiguration; - @Mock private LineagePersistence persistence; private DefaultLineageService service; @BeforeEach void setUp() { MockitoAnnotations.openMocks(this); - when(callContext.getRealmContext()).thenReturn(realmContext); when(callContext.getRealmConfig()).thenReturn(realmConfig); - when(configuration.persistence()).thenReturn(persistenceConfiguration); - service = new DefaultLineageService(callContext, configuration, persistence); + service = new DefaultLineageService(callContext, configuration); } @Test void throwsWhenStaticConfigDisabled() { when(configuration.enabled()).thenReturn(false); - assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("polaris.lineage.enabled"); - assertThatThrownBy(() -> service.query(queryRequest())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("polaris.lineage.enabled"); - - verifyNoInteractions(persistence); } @Test @@ -87,83 +60,23 @@ void throwsWhenRealmFeatureDisabled() { when(configuration.enabled()).thenReturn(true); when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(false); - assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); - assertThatThrownBy(() -> service.query(queryRequest())) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining(FeatureConfiguration.ENABLE_LINEAGE.key()); - - verifyNoInteractions(persistence); } @Test - void throwsWhenPersistenceDisabled() { + void throwsNotImplementedWhenLineageEnabled() { when(configuration.enabled()).thenReturn(true); when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(true); - when(persistenceConfiguration.enabled()).thenReturn(false); - - assertThatThrownBy(() -> service.ingest(emptyIngestRequest())) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("polaris.lineage.persistence.enabled"); assertThatThrownBy(() -> service.query(queryRequest())) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("polaris.lineage.persistence.enabled"); - - verifyNoInteractions(persistence); - } - - @Test - void delegatesWhenLineageEnabled() { - LineageIngestRequest ingestRequest = ingestRequest(); - LineageGraph graph = - new LineageGraph( - new LineageNode( - "dataset:test:orders", LineageNodeType.DATASET, dataset("test", "orders"), false), - List.of(), - List.of()); - - when(configuration.enabled()).thenReturn(true); - when(realmConfig.getConfig(FeatureConfiguration.ENABLE_LINEAGE)).thenReturn(true); - when(persistenceConfiguration.enabled()).thenReturn(true); - when(persistence.loadLineage(realmContext, queryRequest())).thenReturn(graph); - - service.ingest(ingestRequest); - service.query(queryRequest()); - - verify(persistence).upsertDatasets(realmContext, ingestRequest.datasets()); - verify(persistence).upsertDatasetEdges(realmContext, ingestRequest.edges(), EVENT_TIME); - verify(persistence).upsertColumnEdges(realmContext, ingestRequest.columnEdges(), EVENT_TIME); - verify(persistence).loadLineage(realmContext, queryRequest()); - } - - private static LineageIngestRequest emptyIngestRequest() { - return new LineageIngestRequest(List.of(), List.of(), List.of(), Optional.empty()); - } - - private static LineageIngestRequest ingestRequest() { - LineageDataset source = dataset("raw", "orders"); - LineageDataset target = dataset("test", "orders"); - return new LineageIngestRequest( - List.of(source, target), - List.of(new LineageEdge(source, target)), - List.of( - new LineageColumnEdge( - new LineageFieldReference(source, "id"), - new LineageFieldReference(target, "order_id"))), - Optional.of(EVENT_TIME)); + .hasMessageContaining("Lineage query is not implemented yet"); } private static LineageQueryRequest queryRequest() { return new LineageQueryRequest( - "dataset:test:orders", - org.apache.polaris.core.lineage.LineageDirection.BOTH, - org.apache.polaris.core.lineage.LineageGranularity.DATASET); - } - - private static LineageDataset dataset(String namespace, String name) { - return new LineageDataset("test-catalog", namespace, name, OptionalLong.empty()); + "dataset:test:orders", LineageDirection.BOTH, LineageGranularity.DATASET); } } diff --git a/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md index 0e54a2c3a9c..0d434dd67aa 100644 --- a/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md +++ b/site/content/in-dev/unreleased/configuration/config-sections/smallrye-polaris_lineage.md @@ -26,6 +26,3 @@ build: | Property | Default Value | Type | Description | |----------|---------------|------|-------------| | `polaris.lineage.enabled` | `false` | `boolean` | | -| `polaris.lineage.persistence.enabled` | `false` | `boolean` | | -| `polaris.lineage.persistence.type` | `relational-jdbc` | `string` | | -| `polaris.lineage.dataset-resolution.enabled` | `false` | `boolean` | |