-
Notifications
You must be signed in to change notification settings - Fork 460
OpenLineage: add no-op POST /api/v1/lineage ingest endpoint #4667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| // This module deliberately does NOT use the OpenAPI generator. The OpenLineage | ||
| // event schema is a oneOf over RunEvent / JobEvent / DatasetEvent, which the | ||
| // jaxrs-resteasy generator collapses into a single class with every variant's | ||
| // required fields marked @NotNull -- making every real OpenLineage event fail | ||
| // bean validation. Marquez (the OpenLineage reference server) takes the same | ||
| // approach: hand-written JAX-RS resource on top of the official | ||
| // `io.openlineage:openlineage-java` types, using Jackson polymorphism keyed on | ||
| // the `schemaURL` field to dispatch between event variants. | ||
| plugins { | ||
| id("polaris-client") | ||
| id("org.kordamp.gradle.jandex") | ||
| } | ||
|
|
||
| dependencies { | ||
| implementation(project(":polaris-core")) | ||
|
|
||
| // The official OpenLineage Java library ships POJOs for the receive side | ||
| // (`io.openlineage.server.OpenLineage.{BaseEvent,RunEvent,JobEvent,DatasetEvent}`). | ||
| // We expose them through the API surface; callers reading event content read | ||
| // these types directly. | ||
| api(libs.openlineage.java) | ||
|
|
||
| implementation(libs.jakarta.annotation.api) | ||
| implementation(libs.jakarta.inject.api) | ||
| implementation(libs.jakarta.validation.api) | ||
|
|
||
| implementation(libs.jakarta.servlet.api) | ||
| implementation(libs.jakarta.ws.rs.api) | ||
|
|
||
| implementation(platform(libs.micrometer.bom)) | ||
| implementation("io.micrometer:micrometer-core") | ||
|
|
||
| implementation(platform(libs.jackson.bom)) | ||
| implementation("com.fasterxml.jackson.core:jackson-annotations") | ||
| implementation("com.fasterxml.jackson.core:jackson-core") | ||
| implementation("com.fasterxml.jackson.core:jackson-databind") | ||
|
|
||
| implementation(libs.slf4j.api) | ||
|
|
||
| compileOnly(libs.microprofile.fault.tolerance.api) | ||
| } | ||
|
|
||
| tasks.named("javadoc") { dependsOn("jandex") } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.service.lineage.api; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
| import com.fasterxml.jackson.databind.DatabindContext; | ||
| import com.fasterxml.jackson.databind.JavaType; | ||
| import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; | ||
|
|
||
| /** | ||
| * Resolves an OpenLineage event JSON body to one of {@link PolarisLineageEvent.OfRunEvent}, {@link | ||
| * PolarisLineageEvent.OfJobEvent}, or {@link PolarisLineageEvent.OfDatasetEvent} by inspecting the | ||
| * trailing path segment of the {@code schemaURL} field. | ||
| * | ||
| * <p>The OpenLineage spec requires every event to include a {@code schemaURL} pointing at the | ||
| * variant's JSON schema fragment, e.g. {@code | ||
| * https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent}. The fragment name is the | ||
| * stable discriminator across spec versions. | ||
| * | ||
| * <p>If {@code schemaURL} is missing or unrecognized, the body is parsed as a {@code RunEvent} — | ||
| * the most common variant and the same fallback Marquez (the OpenLineage reference server) uses. | ||
| */ | ||
| public class LineageEventTypeResolver extends TypeIdResolverBase { | ||
|
|
||
| private JavaType superType; | ||
|
|
||
| @Override | ||
| public void init(JavaType baseType) { | ||
| this.superType = baseType; | ||
| } | ||
|
|
||
| @Override | ||
| public String idFromValue(Object value) { | ||
| return idFromValueAndType(value, value == null ? null : value.getClass()); | ||
| } | ||
|
|
||
| @Override | ||
| public String idFromValueAndType(Object value, Class<?> suggestedType) { | ||
| // Polaris does not currently emit lineage events; serialization is not on the | ||
| // ingest path. Returning a stable label keeps Jackson happy if a downstream | ||
| // module ever serializes a wrapper. | ||
| if (suggestedType == PolarisLineageEvent.OfJobEvent.class) { | ||
| return "JobEvent"; | ||
| } | ||
| if (suggestedType == PolarisLineageEvent.OfDatasetEvent.class) { | ||
| return "DatasetEvent"; | ||
| } | ||
| return "RunEvent"; | ||
| } | ||
|
|
||
| @Override | ||
| public JavaType typeFromId(DatabindContext context, String id) { | ||
| Class<?> subType = subTypeFor(id); | ||
| return context.constructSpecializedType(superType, subType); | ||
| } | ||
|
|
||
| @Override | ||
| public JsonTypeInfo.Id getMechanism() { | ||
| return JsonTypeInfo.Id.CUSTOM; | ||
| } | ||
|
|
||
| private static Class<?> subTypeFor(String schemaUrl) { | ||
| if (schemaUrl == null) { | ||
| return PolarisLineageEvent.OfRunEvent.class; | ||
| } | ||
| String[] segments = schemaUrl.split("/"); | ||
| String tail = segments[segments.length - 1]; | ||
| return switch (tail) { | ||
| case "JobEvent" -> PolarisLineageEvent.OfJobEvent.class; | ||
| case "DatasetEvent" -> PolarisLineageEvent.OfDatasetEvent.class; | ||
| default -> PolarisLineageEvent.OfRunEvent.class; | ||
| }; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.service.lineage.api; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
| import com.fasterxml.jackson.annotation.JsonValue; | ||
| import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; | ||
| import io.openlineage.server.OpenLineage; | ||
|
|
||
| /** | ||
| * Polaris-side wrapper for an OpenLineage event accepted by the lineage ingest endpoint. | ||
| * | ||
| * <p>The OpenLineage spec models events as a {@code oneOf} over {@code RunEvent}, {@code | ||
| * JobEvent}, and {@code DatasetEvent}, discriminated by the value of the {@code schemaURL} field. | ||
| * The official {@code openlineage-java} library ships POJOs for these three event types in {@link | ||
| * io.openlineage.server.OpenLineage} but does not annotate them for polymorphic JSON | ||
| * deserialization. This class provides that polymorphism on top. | ||
| * | ||
| * <p>Wrappers compose the OpenLineage event by reference rather than extending it because the | ||
| * {@code OpenLineage.*Event} classes are {@code final}. {@link #event()} returns the underlying | ||
| * event for follow-up code that needs typed access to fields (inputs, outputs, columnLineage | ||
| * facet, etc.). | ||
| * | ||
| * <p>This pattern follows Marquez (the OpenLineage reference server), which uses the same | ||
| * {@code @JsonTypeInfo} + {@code @JsonTypeIdResolver} discrimination on {@code schemaURL}. | ||
| */ | ||
| @JsonTypeInfo( | ||
| use = JsonTypeInfo.Id.CUSTOM, | ||
| include = JsonTypeInfo.As.EXISTING_PROPERTY, | ||
| property = "schemaURL", | ||
| defaultImpl = PolarisLineageEvent.OfRunEvent.class, | ||
| visible = true) | ||
| @JsonTypeIdResolver(LineageEventTypeResolver.class) | ||
| public abstract sealed class PolarisLineageEvent | ||
| permits PolarisLineageEvent.OfRunEvent, | ||
| PolarisLineageEvent.OfJobEvent, | ||
| PolarisLineageEvent.OfDatasetEvent { | ||
|
|
||
| /** Returns the underlying OpenLineage event. */ | ||
| public abstract OpenLineage.BaseEvent event(); | ||
|
|
||
| /** Wraps an {@link OpenLineage.RunEvent}. */ | ||
| public static final class OfRunEvent extends PolarisLineageEvent { | ||
| private final OpenLineage.RunEvent event; | ||
|
|
||
| @JsonCreator(mode = JsonCreator.Mode.DELEGATING) | ||
| public OfRunEvent(OpenLineage.RunEvent event) { | ||
| this.event = event; | ||
| } | ||
|
|
||
| @JsonValue | ||
| @Override | ||
| public OpenLineage.RunEvent event() { | ||
| return event; | ||
| } | ||
| } | ||
|
|
||
| /** Wraps an {@link OpenLineage.JobEvent}. */ | ||
| public static final class OfJobEvent extends PolarisLineageEvent { | ||
| private final OpenLineage.JobEvent event; | ||
|
|
||
| @JsonCreator(mode = JsonCreator.Mode.DELEGATING) | ||
| public OfJobEvent(OpenLineage.JobEvent event) { | ||
| this.event = event; | ||
| } | ||
|
|
||
| @JsonValue | ||
| @Override | ||
| public OpenLineage.JobEvent event() { | ||
| return event; | ||
| } | ||
| } | ||
|
|
||
| /** Wraps an {@link OpenLineage.DatasetEvent}. */ | ||
| public static final class OfDatasetEvent extends PolarisLineageEvent { | ||
| private final OpenLineage.DatasetEvent event; | ||
|
|
||
| @JsonCreator(mode = JsonCreator.Mode.DELEGATING) | ||
| public OfDatasetEvent(OpenLineage.DatasetEvent event) { | ||
| this.event = event; | ||
| } | ||
|
|
||
| @JsonValue | ||
| @Override | ||
| public OpenLineage.DatasetEvent event() { | ||
| return event; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.service.lineage.api; | ||
|
|
||
| import io.micrometer.core.annotation.Timed; | ||
| import io.micrometer.core.aop.MeterTag; | ||
| import jakarta.annotation.security.RolesAllowed; | ||
| import jakarta.inject.Inject; | ||
| import jakarta.validation.Valid; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import jakarta.ws.rs.Consumes; | ||
| import jakarta.ws.rs.POST; | ||
| import jakarta.ws.rs.Path; | ||
| import jakarta.ws.rs.core.Context; | ||
| import jakarta.ws.rs.core.Response; | ||
| import jakarta.ws.rs.core.SecurityContext; | ||
| import org.apache.polaris.core.context.RealmContext; | ||
| import org.eclipse.microprofile.faulttolerance.Timeout; | ||
|
|
||
| /** | ||
| * JAX-RS resource for the OpenLineage ingest endpoint. | ||
| * | ||
| * <p>Mounted at the standard OpenLineage path ({@code POST /api/v1/lineage}) so that any engine | ||
| * already using the OpenLineage HTTP transport (Spark, Flink, Airflow, Trino, dbt) can target | ||
| * Polaris by URL change alone — no client-side rewriting. | ||
| * | ||
| * <p>The body is parsed into a {@link PolarisLineageEvent} which Jackson resolves to one of {@code | ||
| * OfRunEvent} / {@code OfJobEvent} / {@code OfDatasetEvent} based on the {@code schemaURL} field. | ||
| * Unrecognized {@code schemaURL} values fall back to {@code RunEvent}, matching Marquez behavior. | ||
| * | ||
| * <p>This resource is hand-written rather than generated from the OpenLineage JSON Schema because | ||
| * the OpenAPI generator's Java template does not faithfully translate the spec's {@code oneOf} | ||
| * over event variants. | ||
| */ | ||
| @Path("/api/v1/lineage") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm worried this makes a one-way-door API decision.
From recent community discussions, Polaris is trying to be a platform with replaceable capabilities, not just an opinionated OpenLineage receiver. If the first public lineage route is generic but semantically OpenLineage-specific, OpenLineage becomes the default interpretation of "Polaris lineage" at the API boundary. That is a hard thing to unwind later because REST paths are external contracts. Could we either make the OL specificity visible in the route/API shape, or at least make it very explicit in naming/docs that this is the OpenLineage ingress endpoint and not the generic Polaris lineage API? That keeps room for a future lineage surface, if one is ever needed, without making OpenLineage the default meaning of "Polaris lineage" at the API boundary. |
||
| public class PolarisOpenLineageApi { | ||
|
|
||
| private final PolarisOpenLineageApiService service; | ||
|
|
||
| @Inject | ||
| public PolarisOpenLineageApi(PolarisOpenLineageApiService service) { | ||
| this.service = service; | ||
| } | ||
|
|
||
| @POST | ||
| @Consumes("application/json") | ||
| @RolesAllowed("**") | ||
| @Timed("polaris.OpenLineageApi.sendLineageEvent") | ||
| @Timeout | ||
| public Response sendLineageEvent( | ||
| @NotNull @Valid PolarisLineageEvent event, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we avoid making the JAX-RS method's parsed body type the object that follow-up implementations build around? Right now the first object passed past the REST resource is The first object passed across the runtime boundary tends to become the object that follow-up persistence, forwarding, and deployment-specific implementations accept in their APIs and tests. If that object is the JAX-RS method parameter, then the extension point is shaped by the resource binding and the current OpenLineage Java model, rather than by a small provider contract that Polaris controls. That makes it harder to evolve the HTTP adapter independently from provider implementations. Concretely, could the runtime path look more like: rather than having future provider implementations build directly around the JAX-RS method parameter? |
||
| @Context @MeterTag(key = "realm_id", expression = "realmIdentifier") | ||
| RealmContext realmContext, | ||
| @Context @MeterTag(key = "principal", expression = "userPrincipal") | ||
| SecurityContext securityContext) { | ||
| return service.sendLineageEvent(event, realmContext, securityContext); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.polaris.service.lineage.api; | ||
|
|
||
| import jakarta.ws.rs.core.Response; | ||
| import jakarta.ws.rs.core.SecurityContext; | ||
| import org.apache.polaris.core.context.RealmContext; | ||
|
|
||
| /** | ||
| * Service interface implemented by the runtime to handle OpenLineage ingest. Mirrors the pattern | ||
| * used by other Polaris API modules where the JAX-RS resource sits in the API module and | ||
| * delegates to a CDI-scoped service implementation in {@code polaris-runtime-service}. | ||
|
Comment on lines
+26
to
+28
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Javadoc usefully frames the type as the runtime service interface behind the JAX-RS resource. I think that distinction is important to preserve. My concern is not that this PR claims this is the lineage SPI; it does not. My concern is that this is currently the only seam behind the OpenLineage route. Since the method returns JAX-RS |
||
| */ | ||
| public interface PolarisOpenLineageApiService { | ||
|
|
||
| /** | ||
| * Handle an OpenLineage event accepted at the ingest endpoint. | ||
| * | ||
| * @param event the parsed OpenLineage event, dispatched to the correct {@code RunEvent}, {@code | ||
| * JobEvent}, or {@code DatasetEvent} variant by Jackson based on the {@code schemaURL} | ||
| * field. | ||
| * @return the JAX-RS response. OpenLineage clients expect {@code 201 Created} with no body on | ||
| * success. | ||
| */ | ||
| Response sendLineageEvent( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could the replacement point behind this service use OpenLineage-specific provider request/result types rather than JAX-RS/runtime types? For example: This would let this API service stay responsible for mapping HTTP/runtime context to a provider call and mapping the provider result back to Response, while giving downstream deployments a provider contract they can implement without patching the JAX-RS resource or depending on Response/SecurityContext as the extension contract. |
||
| PolarisLineageEvent event, RealmContext realmContext, SecurityContext securityContext); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Marquez-compatible fallback is reasonable as an OpenLineage ingress behavior, iiuc.
I'd just like to keep this scoped to the OL adapter layer. The Polaris provider seam behind the adapter should receive a Polaris-owned request/result shape after this compatibility decision has been made, rather than requiring provider implementations to understand OL
schemaURLfallback behavior.