From ab78fa80b35fc881fdf2763789edf057cdabcd6c Mon Sep 17 00:00:00 2001 From: Adnan Hemani Date: Tue, 9 Jun 2026 05:49:17 +0000 Subject: [PATCH] openlineage: add no-op ingest endpoint at POST /api/v1/lineage Adds the OpenLineage-compatible ingest endpoint defined in the Polaris OpenLineage proposal. This first PR mounts the route and accepts events; persistence, dataset resolution, and downstream forwarding are follow-ups. The endpoint is mounted at the standard OpenLineage path (POST /api/v1/lineage) so any engine using the OpenLineage HTTP transport (Spark, Flink, Airflow, Trino, dbt) can target Polaris by URL change alone. Body parsing follows the Marquez (OpenLineage reference server) pattern: a hand-written JAX-RS resource on top of io.openlineage:openlineage-java, with Jackson polymorphism keyed on the schemaURL field to dispatch between RunEvent / JobEvent / DatasetEvent. The OpenAPI Generator's Java template cannot translate the spec's oneOf faithfully -- it collapses the variants into a single class with every variant's required fields marked @NotNull, rejecting every valid event with 400 -- so codegen is intentionally skipped for this module. The PolarisLineageEvent wrapper hierarchy holds the official OL types by composition (since OpenLineage.{Run,Job,Dataset}Event are final). Unknown or missing schemaURL falls back to RunEvent, matching Marquez behavior. Verified against a running server with full RunEvent / JobEvent / DatasetEvent payloads (all return 201), unknown/missing schemaURL fallback (all parse as RunEvent), and unauthenticated requests (401). Standalone Jackson dispatch test confirms each schemaURL routes to the correct wrapper subclass. --- api/openlineage-service/build.gradle.kts | 62 +++++++ .../lineage/api/LineageEventTypeResolver.java | 90 +++++++++ .../lineage/api/PolarisLineageEvent.java | 106 +++++++++++ .../lineage/api/PolarisOpenLineageApi.java | 74 ++++++++ .../api/PolarisOpenLineageApiService.java | 43 +++++ gradle/libs.versions.toml | 1 + gradle/projects.main.properties | 1 + integration-tests/build.gradle.kts | 1 + .../service/it/env/OpenLineageApi.java | 40 ++++ .../service/it/env/PolarisApiEndpoints.java | 4 + .../polaris/service/it/env/PolarisClient.java | 8 + ...arisOpenLineageServiceIntegrationTest.java | 173 ++++++++++++++++++ runtime/service/build.gradle.kts | 1 + .../service/it/OpenLineageServiceIT.java | 25 +++ .../service/lineage/OpenLineageAdapter.java | 42 +++++ spec/openlineage-service.yaml | 132 +++++++++++++ 16 files changed, 803 insertions(+) create mode 100644 api/openlineage-service/build.gradle.kts create mode 100644 api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/LineageEventTypeResolver.java create mode 100644 api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisLineageEvent.java create mode 100644 api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApi.java create mode 100644 api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApiService.java create mode 100644 integration-tests/src/main/java/org/apache/polaris/service/it/env/OpenLineageApi.java create mode 100644 integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisOpenLineageServiceIntegrationTest.java create mode 100644 runtime/service/src/intTest/java/org/apache/polaris/service/it/OpenLineageServiceIT.java create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/lineage/OpenLineageAdapter.java create mode 100644 spec/openlineage-service.yaml diff --git a/api/openlineage-service/build.gradle.kts b/api/openlineage-service/build.gradle.kts new file mode 100644 index 00000000000..5ed066a799e --- /dev/null +++ b/api/openlineage-service/build.gradle.kts @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// This module deliberately does NOT use the OpenAPI generator. The OpenLineage +// event schema is a oneOf over RunEvent / JobEvent / DatasetEvent, which the +// jaxrs-resteasy generator collapses into a single class with every variant's +// required fields marked @NotNull -- making every real OpenLineage event fail +// bean validation. Marquez (the OpenLineage reference server) takes the same +// approach: hand-written JAX-RS resource on top of the official +// `io.openlineage:openlineage-java` types, using Jackson polymorphism keyed on +// the `schemaURL` field to dispatch between event variants. +plugins { + id("polaris-client") + id("org.kordamp.gradle.jandex") +} + +dependencies { + implementation(project(":polaris-core")) + + // The official OpenLineage Java library ships POJOs for the receive side + // (`io.openlineage.server.OpenLineage.{BaseEvent,RunEvent,JobEvent,DatasetEvent}`). + // We expose them through the API surface; callers reading event content read + // these types directly. + api(libs.openlineage.java) + + implementation(libs.jakarta.annotation.api) + implementation(libs.jakarta.inject.api) + implementation(libs.jakarta.validation.api) + + implementation(libs.jakarta.servlet.api) + implementation(libs.jakarta.ws.rs.api) + + implementation(platform(libs.micrometer.bom)) + implementation("io.micrometer:micrometer-core") + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-core") + implementation("com.fasterxml.jackson.core:jackson-databind") + + implementation(libs.slf4j.api) + + compileOnly(libs.microprofile.fault.tolerance.api) +} + +tasks.named("javadoc") { dependsOn("jandex") } diff --git a/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/LineageEventTypeResolver.java b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/LineageEventTypeResolver.java new file mode 100644 index 00000000000..0b49b0feebd --- /dev/null +++ b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/LineageEventTypeResolver.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage.api; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; + +/** + * Resolves an OpenLineage event JSON body to one of {@link PolarisLineageEvent.OfRunEvent}, {@link + * PolarisLineageEvent.OfJobEvent}, or {@link PolarisLineageEvent.OfDatasetEvent} by inspecting the + * trailing path segment of the {@code schemaURL} field. + * + *

The OpenLineage spec requires every event to include a {@code schemaURL} pointing at the + * variant's JSON schema fragment, e.g. {@code + * https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent}. The fragment name is the + * stable discriminator across spec versions. + * + *

If {@code schemaURL} is missing or unrecognized, the body is parsed as a {@code RunEvent} — + * the most common variant and the same fallback Marquez (the OpenLineage reference server) uses. + */ +public class LineageEventTypeResolver extends TypeIdResolverBase { + + private JavaType superType; + + @Override + public void init(JavaType baseType) { + this.superType = baseType; + } + + @Override + public String idFromValue(Object value) { + return idFromValueAndType(value, value == null ? null : value.getClass()); + } + + @Override + public String idFromValueAndType(Object value, Class suggestedType) { + // Polaris does not currently emit lineage events; serialization is not on the + // ingest path. Returning a stable label keeps Jackson happy if a downstream + // module ever serializes a wrapper. + if (suggestedType == PolarisLineageEvent.OfJobEvent.class) { + return "JobEvent"; + } + if (suggestedType == PolarisLineageEvent.OfDatasetEvent.class) { + return "DatasetEvent"; + } + return "RunEvent"; + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) { + Class subType = subTypeFor(id); + return context.constructSpecializedType(superType, subType); + } + + @Override + public JsonTypeInfo.Id getMechanism() { + return JsonTypeInfo.Id.CUSTOM; + } + + private static Class subTypeFor(String schemaUrl) { + if (schemaUrl == null) { + return PolarisLineageEvent.OfRunEvent.class; + } + String[] segments = schemaUrl.split("/"); + String tail = segments[segments.length - 1]; + return switch (tail) { + case "JobEvent" -> PolarisLineageEvent.OfJobEvent.class; + case "DatasetEvent" -> PolarisLineageEvent.OfDatasetEvent.class; + default -> PolarisLineageEvent.OfRunEvent.class; + }; + } +} diff --git a/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisLineageEvent.java b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisLineageEvent.java new file mode 100644 index 00000000000..071e13aa1b2 --- /dev/null +++ b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisLineageEvent.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage.api; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import io.openlineage.server.OpenLineage; + +/** + * Polaris-side wrapper for an OpenLineage event accepted by the lineage ingest endpoint. + * + *

The OpenLineage spec models events as a {@code oneOf} over {@code RunEvent}, {@code + * JobEvent}, and {@code DatasetEvent}, discriminated by the value of the {@code schemaURL} field. + * The official {@code openlineage-java} library ships POJOs for these three event types in {@link + * io.openlineage.server.OpenLineage} but does not annotate them for polymorphic JSON + * deserialization. This class provides that polymorphism on top. + * + *

Wrappers compose the OpenLineage event by reference rather than extending it because the + * {@code OpenLineage.*Event} classes are {@code final}. {@link #event()} returns the underlying + * event for follow-up code that needs typed access to fields (inputs, outputs, columnLineage + * facet, etc.). + * + *

This pattern follows Marquez (the OpenLineage reference server), which uses the same + * {@code @JsonTypeInfo} + {@code @JsonTypeIdResolver} discrimination on {@code schemaURL}. + */ +@JsonTypeInfo( + use = JsonTypeInfo.Id.CUSTOM, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "schemaURL", + defaultImpl = PolarisLineageEvent.OfRunEvent.class, + visible = true) +@JsonTypeIdResolver(LineageEventTypeResolver.class) +public abstract sealed class PolarisLineageEvent + permits PolarisLineageEvent.OfRunEvent, + PolarisLineageEvent.OfJobEvent, + PolarisLineageEvent.OfDatasetEvent { + + /** Returns the underlying OpenLineage event. */ + public abstract OpenLineage.BaseEvent event(); + + /** Wraps an {@link OpenLineage.RunEvent}. */ + public static final class OfRunEvent extends PolarisLineageEvent { + private final OpenLineage.RunEvent event; + + @JsonCreator(mode = JsonCreator.Mode.DELEGATING) + public OfRunEvent(OpenLineage.RunEvent event) { + this.event = event; + } + + @JsonValue + @Override + public OpenLineage.RunEvent event() { + return event; + } + } + + /** Wraps an {@link OpenLineage.JobEvent}. */ + public static final class OfJobEvent extends PolarisLineageEvent { + private final OpenLineage.JobEvent event; + + @JsonCreator(mode = JsonCreator.Mode.DELEGATING) + public OfJobEvent(OpenLineage.JobEvent event) { + this.event = event; + } + + @JsonValue + @Override + public OpenLineage.JobEvent event() { + return event; + } + } + + /** Wraps an {@link OpenLineage.DatasetEvent}. */ + public static final class OfDatasetEvent extends PolarisLineageEvent { + private final OpenLineage.DatasetEvent event; + + @JsonCreator(mode = JsonCreator.Mode.DELEGATING) + public OfDatasetEvent(OpenLineage.DatasetEvent event) { + this.event = event; + } + + @JsonValue + @Override + public OpenLineage.DatasetEvent event() { + return event; + } + } +} diff --git a/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApi.java b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApi.java new file mode 100644 index 00000000000..3b61ef1785d --- /dev/null +++ b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApi.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage.api; + +import io.micrometer.core.annotation.Timed; +import io.micrometer.core.aop.MeterTag; +import jakarta.annotation.security.RolesAllowed; +import jakarta.inject.Inject; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.RealmContext; +import org.eclipse.microprofile.faulttolerance.Timeout; + +/** + * JAX-RS resource for the OpenLineage ingest endpoint. + * + *

Mounted at the standard OpenLineage path ({@code POST /api/v1/lineage}) so that any engine + * already using the OpenLineage HTTP transport (Spark, Flink, Airflow, Trino, dbt) can target + * Polaris by URL change alone — no client-side rewriting. + * + *

The body is parsed into a {@link PolarisLineageEvent} which Jackson resolves to one of {@code + * OfRunEvent} / {@code OfJobEvent} / {@code OfDatasetEvent} based on the {@code schemaURL} field. + * Unrecognized {@code schemaURL} values fall back to {@code RunEvent}, matching Marquez behavior. + * + *

This resource is hand-written rather than generated from the OpenLineage JSON Schema because + * the OpenAPI generator's Java template does not faithfully translate the spec's {@code oneOf} + * over event variants. + */ +@Path("/api/v1/lineage") +public class PolarisOpenLineageApi { + + private final PolarisOpenLineageApiService service; + + @Inject + public PolarisOpenLineageApi(PolarisOpenLineageApiService service) { + this.service = service; + } + + @POST + @Consumes("application/json") + @RolesAllowed("**") + @Timed("polaris.OpenLineageApi.sendLineageEvent") + @Timeout + public Response sendLineageEvent( + @NotNull @Valid PolarisLineageEvent event, + @Context @MeterTag(key = "realm_id", expression = "realmIdentifier") + RealmContext realmContext, + @Context @MeterTag(key = "principal", expression = "userPrincipal") + SecurityContext securityContext) { + return service.sendLineageEvent(event, realmContext, securityContext); + } +} diff --git a/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApiService.java b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApiService.java new file mode 100644 index 00000000000..c667df4996e --- /dev/null +++ b/api/openlineage-service/src/main/java/org/apache/polaris/service/lineage/api/PolarisOpenLineageApiService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage.api; + +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.RealmContext; + +/** + * Service interface implemented by the runtime to handle OpenLineage ingest. Mirrors the pattern + * used by other Polaris API modules where the JAX-RS resource sits in the API module and + * delegates to a CDI-scoped service implementation in {@code polaris-runtime-service}. + */ +public interface PolarisOpenLineageApiService { + + /** + * Handle an OpenLineage event accepted at the ingest endpoint. + * + * @param event the parsed OpenLineage event, dispatched to the correct {@code RunEvent}, {@code + * JobEvent}, or {@code DatasetEvent} variant by Jackson based on the {@code schemaURL} + * field. + * @return the JAX-RS response. OpenLineage clients expect {@code 201 Created} with no body on + * success. + */ + Response sendLineageEvent( + PolarisLineageEvent event, RealmContext realmContext, SecurityContext securityContext); +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1abd0b5f160..70f546e1a96 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -94,6 +94,7 @@ microprofile-fault-tolerance-api = { module = "org.eclipse.microprofile.fault-to mockito-core = { module = "org.mockito:mockito-core", version = "5.23.0" } mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version = "5.23.0" } mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "5.8.0" } +openlineage-java = { module = "io.openlineage:openlineage-java", version = "1.48.0" } opentelemetry-instrumentation-bom-alpha = { module = "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha", version= "2.20.1-alpha" } picocli = { module = "info.picocli:picocli-codegen", version.ref = "picocli" } picocli-codegen = { module = "info.picocli:picocli-codegen", version.ref = "picocli" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 322510c5578..be45180a942 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -24,6 +24,7 @@ polaris-api-iceberg-service=api/iceberg-service polaris-api-management-model=api/management-model polaris-api-management-service=api/management-service polaris-api-catalog-service=api/polaris-catalog-service +polaris-api-openlineage-service=api/openlineage-service polaris-runtime-defaults=runtime/defaults polaris-runtime-service=runtime/service polaris-server=runtime/server diff --git a/integration-tests/build.gradle.kts b/integration-tests/build.gradle.kts index 518000c8a83..975da2c2cc9 100644 --- a/integration-tests/build.gradle.kts +++ b/integration-tests/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { implementation(project(":polaris-core")) implementation(project(":polaris-api-management-model")) implementation(project(":polaris-api-catalog-service")) + implementation(project(":polaris-api-openlineage-service")) implementation(libs.jakarta.annotation.api) implementation(libs.jakarta.ws.rs.api) diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/OpenLineageApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/OpenLineageApi.java new file mode 100644 index 00000000000..7040c1890be --- /dev/null +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/OpenLineageApi.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.it.env; + +import jakarta.ws.rs.client.Client; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.Response; +import java.net.URI; + +/** Test helper for {@code POST /api/v1/lineage} (the OpenLineage ingest endpoint). */ +public class OpenLineageApi extends PolarisRestApi { + + OpenLineageApi(Client client, PolarisApiEndpoints endpoints, String authToken, URI uri) { + super(client, endpoints, authToken, uri); + } + + /** + * POSTs a JSON event body to the OpenLineage ingest endpoint and returns the raw response. The + * caller is responsible for closing the response. + */ + public Response sendEvent(String json) { + return request("lineage").post(Entity.json(json)); + } +} diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisApiEndpoints.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisApiEndpoints.java index 78e62bcd296..8a4cc249a4a 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisApiEndpoints.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisApiEndpoints.java @@ -52,6 +52,10 @@ public URI managementApiEndpoint() { return baseUri.resolve(baseUri.getRawPath() + "/api/management").normalize(); } + public URI openLineageApiEndpoint() { + return baseUri.resolve(baseUri.getRawPath() + "/api/v1").normalize(); + } + public String realmId() { return realmId; } diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java index 2a5c31eeab8..724f579b342 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java @@ -103,6 +103,14 @@ public PolicyApi policyApi(String authToken) { return new PolicyApi(client, endpoints, authToken, endpoints.catalogApiEndpoint()); } + public OpenLineageApi openLineageApi(String authToken) { + return new OpenLineageApi(client, endpoints, authToken, endpoints.openLineageApiEndpoint()); + } + + public OpenLineageApi openLineageApiPlain() { + return new OpenLineageApi(client, endpoints, null, endpoints.openLineageApiEndpoint()); + } + /** Requests an access token from the Polaris server for the given principal. */ public String obtainToken(PrincipalWithCredentials credentials) { return obtainToken(new ClientPrincipal(credentials)); diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisOpenLineageServiceIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisOpenLineageServiceIntegrationTest.java new file mode 100644 index 00000000000..31b9b41c102 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisOpenLineageServiceIntegrationTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.it.test; + +import static org.apache.polaris.service.it.env.PolarisClient.polarisClient; +import static org.assertj.core.api.Assertions.assertThat; + +import jakarta.ws.rs.core.Response; +import org.apache.polaris.service.it.env.ClientCredentials; +import org.apache.polaris.service.it.env.OpenLineageApi; +import org.apache.polaris.service.it.env.PolarisApiEndpoints; +import org.apache.polaris.service.it.env.PolarisClient; +import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Integration tests for the OpenLineage ingest endpoint ({@code POST /api/v1/lineage}). + * + *

The endpoint is currently a no-op: every authenticated, well-formed event returns {@code + * 201 Created}. These tests pin down the wire-level contract so that follow-up PRs (persistence, + * dataset resolution, downstream forwarding) cannot accidentally regress dispatch or auth + * behavior. + */ +@ExtendWith(PolarisIntegrationTestExtension.class) +public class PolarisOpenLineageServiceIntegrationTest { + + private static final String SCHEMA_BASE = "https://openlineage.io/spec/2-0-2/OpenLineage.json"; + + private static final String RUN_EVENT = + """ + { + "eventTime": "2024-01-01T00:00:00Z", + "eventType": "START", + "run": {"runId": "123e4567-e89b-12d3-a456-426614174000"}, + "job": {"namespace": "test", "name": "job"}, + "producer": "https://example.com", + "schemaURL": "%s#/$defs/RunEvent", + "inputs": [{"namespace": "db", "name": "orders_raw"}], + "outputs": [{"namespace": "db", "name": "orders_daily"}] + } + """ + .formatted(SCHEMA_BASE); + + private static final String JOB_EVENT = + """ + { + "eventTime": "2024-01-01T00:00:00Z", + "job": {"namespace": "test", "name": "job"}, + "producer": "https://example.com", + "schemaURL": "%s#/$defs/JobEvent" + } + """ + .formatted(SCHEMA_BASE); + + private static final String DATASET_EVENT = + """ + { + "eventTime": "2024-01-01T00:00:00Z", + "dataset": {"namespace": "db", "name": "orders_daily"}, + "producer": "https://example.com", + "schemaURL": "%s#/$defs/DatasetEvent" + } + """ + .formatted(SCHEMA_BASE); + + private static final String UNKNOWN_SCHEMA_URL_EVENT = + """ + { + "eventTime": "2024-01-01T00:00:00Z", + "eventType": "START", + "run": {"runId": "123e4567-e89b-12d3-a456-426614174000"}, + "job": {"namespace": "test", "name": "job"}, + "producer": "https://example.com", + "schemaURL": "https://example.com/UnknownEvent" + } + """; + + private static final String MISSING_SCHEMA_URL_EVENT = + """ + { + "eventTime": "2024-01-01T00:00:00Z", + "eventType": "START", + "run": {"runId": "123e4567-e89b-12d3-a456-426614174000"}, + "job": {"namespace": "test", "name": "job"}, + "producer": "https://example.com" + } + """; + + private static PolarisClient client; + private static OpenLineageApi authenticated; + private static OpenLineageApi anonymous; + + @BeforeAll + public static void setup(PolarisApiEndpoints endpoints, ClientCredentials credentials) { + client = polarisClient(endpoints); + String token = client.obtainToken(credentials); + authenticated = client.openLineageApi(token); + anonymous = client.openLineageApiPlain(); + } + + @AfterAll + public static void close() throws Exception { + client.close(); + } + + @Test + public void runEventReturns201() { + try (Response res = authenticated.sendEvent(RUN_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + } + + @Test + public void jobEventReturns201() { + try (Response res = authenticated.sendEvent(JOB_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + } + + @Test + public void datasetEventReturns201() { + try (Response res = authenticated.sendEvent(DATASET_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + } + + @Test + public void unknownSchemaUrlIsAccepted() { + try (Response res = authenticated.sendEvent(UNKNOWN_SCHEMA_URL_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + } + + @Test + public void missingSchemaUrlIsAccepted() { + try (Response res = authenticated.sendEvent(MISSING_SCHEMA_URL_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode()); + } + } + + @Test + public void unauthenticatedReturns401() { + try (Response res = anonymous.sendEvent(RUN_EVENT)) { + assertThat(res.getStatus()).isEqualTo(Response.Status.UNAUTHORIZED.getStatusCode()); + } + } + + @Test + public void malformedJsonReturns400() { + try (Response res = authenticated.sendEvent("not-json")) { + assertThat(res.getStatus()).isEqualTo(Response.Status.BAD_REQUEST.getStatusCode()); + } + } +} diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 2253c02b92b..41e808e7156 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) + implementation(project(":polaris-api-openlineage-service")) runtimeOnly(project(":polaris-relational-jdbc")) diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/OpenLineageServiceIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/OpenLineageServiceIT.java new file mode 100644 index 00000000000..9551a0401b1 --- /dev/null +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/OpenLineageServiceIT.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.it; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import org.apache.polaris.service.it.test.PolarisOpenLineageServiceIntegrationTest; + +@QuarkusIntegrationTest +public class OpenLineageServiceIT extends PolarisOpenLineageServiceIntegrationTest {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/lineage/OpenLineageAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/lineage/OpenLineageAdapter.java new file mode 100644 index 00000000000..376a30127c9 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/lineage/OpenLineageAdapter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.lineage; + +import jakarta.enterprise.context.RequestScoped; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.service.lineage.api.PolarisLineageEvent; +import org.apache.polaris.service.lineage.api.PolarisOpenLineageApiService; + +/** + * No-op implementation of the OpenLineage ingest endpoint. + * + *

Accepts and discards events. Persistence, dataset resolution, and downstream forwarding will + * land in follow-up PRs as described in the Polaris OpenLineage proposal. + */ +@RequestScoped +public class OpenLineageAdapter implements PolarisOpenLineageApiService { + + @Override + public Response sendLineageEvent( + PolarisLineageEvent event, RealmContext realmContext, SecurityContext securityContext) { + return Response.status(Response.Status.CREATED).build(); + } +} diff --git a/spec/openlineage-service.yaml b/spec/openlineage-service.yaml new file mode 100644 index 00000000000..ddf766c9723 --- /dev/null +++ b/spec/openlineage-service.yaml @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file documents the OpenLineage ingest endpoint exposed by Apache Polaris. +# +# IMPORTANT: This spec is documentation, not a code-generation source. The +# OpenLineage event schema is a `oneOf` over RunEvent / JobEvent / DatasetEvent +# discriminated by the `schemaURL` field, which the OpenAPI Generator's Java +# templates cannot translate faithfully (they collapse the variants into a +# single class with every variant's required fields marked `@NotNull`, +# rejecting every valid event with a 400). Polaris instead hand-writes the +# JAX-RS resource on top of the official `io.openlineage:openlineage-java` +# library, exactly as Marquez (the OpenLineage reference server) does. See +# `api/openlineage-service/` for the implementation. +# +# The authoritative event schema lives upstream: +# https://openlineage.io/spec/2-0-2/OpenLineage.json +# +# This file is kept so the OpenLineage endpoint shows up alongside Polaris's +# other API specs and can be browsed by API tools. + +--- +openapi: 3.0.3 +info: + title: Apache Polaris OpenLineage Service + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html + version: 0.0.1 + description: + Defines the OpenLineage-compatible ingest API exposed by Apache Polaris. + Polaris accepts OpenLineage events at the standard OpenLineage path so + engines (Spark, Flink, Airflow, Trino, dbt) can be reconfigured by URL + alone. Event bodies follow the OpenLineage specification at + https://openlineage.io/spec/2-0-2/OpenLineage.json. + +servers: + - url: "{scheme}://{host}/api/v1" + description: Server URL when the port can be inferred from the scheme + variables: + scheme: + description: The scheme of the URI, either http or https. + default: https + host: + description: The host address for the specified server + default: localhost + +security: + - OAuth2: [] + - BearerAuth: [] + +paths: + /lineage: + post: + tags: + - OpenLineage API + summary: Submit an OpenLineage event + description: + Accepts an OpenLineage RunEvent, JobEvent, or DatasetEvent as defined + by the OpenLineage specification + (https://openlineage.io/spec/2-0-2/OpenLineage.json). + + + The body is dispatched to the correct event variant by inspecting the + `schemaURL` field — the fragment after the final `/` selects RunEvent, + JobEvent, or DatasetEvent. Unrecognized values fall back to RunEvent. + + + This endpoint is the standard OpenLineage ingest path, so existing + OpenLineage transports can target Polaris by URL change alone. + operationId: sendLineageEvent + requestBody: + description: + An OpenLineage event. The exact body schema is the upstream + OpenLineage 2-0-2 JSON Schema referenced above. + required: true + content: + application/json: + schema: + type: object + additionalProperties: true + externalDocs: + description: OpenLineage 2-0-2 JSON Schema (authoritative) + url: https://openlineage.io/spec/2-0-2/OpenLineage.json + responses: + 201: + description: + Created. The event was accepted for processing. Per the + OpenLineage server specification the response body is empty. + 400: + description: Bad Request - The request body is not a valid OpenLineage event + 401: + description: Unauthorized - The caller is not authenticated + 403: + description: Forbidden - The caller is not authorized to submit lineage events + 503: + description: Service Unavailable - The lineage subsystem is temporarily unable to accept events + 5XX: + description: Server error + +components: + securitySchemes: + OAuth2: + type: oauth2 + description: + OAuth2 client-credentials flow against the Polaris token endpoint. + The same client-id/secret used for catalog access is used here; only + an additional LINEAGE_INGEST privilege grant is required. + flows: + clientCredentials: + tokenUrl: /v1/oauth/tokens + scopes: {} + BearerAuth: + type: http + scheme: bearer + bearerFormat: JWT