Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ private TestSetup bootstrapRealm() {

var manager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
var metrics = metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
var callCtx = new PolarisCallContext(realmContext, session, metrics, configurationSource);
var callCtx = new PolarisCallContext(realmContext, session, configurationSource);
var persistence =
realmPersistenceFactory.newBuilder().realmId(realmId).skipDecorators().build();

Expand Down Expand Up @@ -927,8 +926,7 @@ private void checkEntities(String step, List<PolarisBaseEntity> entities) {

var manager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
var metrics = metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
var callCtx = new PolarisCallContext(realmContext, session, metrics, configurationSource);
var callCtx = new PolarisCallContext(realmContext, session, configurationSource);

for (var e : entities) {
var result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,7 @@ private PrincipalSecretsResult bootstrapRealm(
clock);

PolarisCallContext ctx =
new PolarisCallContext(
() -> realmId,
metaStore,
NO_OP_METRICS_PERSISTENCE,
RealmConfigurationSource.EMPTY_CONFIG);
new PolarisCallContext(() -> realmId, metaStore, RealmConfigurationSource.EMPTY_CONFIG);
var secretsResult = createPolarisPrincipalForRealm(metaStoreManager, ctx);

realmManagement.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {

var manager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
var metrics = metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);

var callCtx = new PolarisCallContext(realmContext, session, metrics, configurationSource);
var callCtx = new PolarisCallContext(realmContext, session, configurationSource);

return new PolarisTestMetaStoreManager(manager, callCtx, startTime, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ protected PolarisCallContext callCtx() {

metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
var metrics = metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
callCtx = new PolarisCallContext(realmContext, session, metrics, configurationSource);
callCtx = new PolarisCallContext(realmContext, session, configurationSource);

tm = new PolarisTestMetaStoreManager(metaStoreManager, callCtx, startTime, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ public void setup(TestInfo testInfo) {
var realmContext = (RealmContext) () -> realmId;
callContext =
new PolarisCallContext(
realmContext,
metaStoreManagerFactory.getOrCreateSession(realmContext),
metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext));
realmContext, metaStoreManagerFactory.getOrCreateSession(realmContext));
metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ public void setup(TestInfo testInfo) {
var realmContext = (RealmContext) () -> realmId;
callContext =
new PolarisCallContext(
realmContext,
metaStoreManagerFactory.getOrCreateSession(realmContext),
metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext));
realmContext, metaStoreManagerFactory.getOrCreateSession(realmContext));
metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);

memoized = MemoizedIndexedAccess.newMemoizedIndexedAccess(persistence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,93 +25,59 @@
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
import org.jspecify.annotations.NonNull;

/**
* The Call context is allocated each time a new REST request is processed. It contains instances of
* low-level services required to process that request.
*
* <p>{@link BasePersistence} continues to carry the bulk of the metastore SPI surface (and still
* extends {@code PolicyMappingPersistence} / acts as the {@code IntegrationPersistence} via a
* runtime cast for now). {@link MetricsPersistence} is intentionally decoupled and supplied
* separately so callers that only need metrics persistence do not have to depend on {@link
* BasePersistence}.
* <p>{@link BasePersistence} carries the bulk of the metastore SPI surface (and still extends
* {@code PolicyMappingPersistence} / acts as the {@code IntegrationPersistence} via a runtime cast
* for now). {@link org.apache.polaris.core.persistence.metrics.MetricsPersistence} is intentionally
* kept out of this context; service code that needs metrics persistence should depend on that SPI
* directly.
*/
public class PolarisCallContext implements CallContext {

// meta store which is used to persist Polaris entity metadata
private final BasePersistence metaStore;
private final MetricsPersistence metricsPersistence;
private final RealmConfigurationSource configurationSource;
private final RealmContext realmContext;
private final RealmConfig realmConfig;

/**
* @deprecated Use {@link PolarisCallContext#PolarisCallContext(RealmContext, BasePersistence,
* MetricsPersistence, RealmConfigurationSource)}.
* RealmConfigurationSource)}.
*/
@SuppressWarnings("removal")
@Deprecated(forRemoval = true)
public PolarisCallContext(
@NonNull RealmContext realmContext,
@NonNull BasePersistence metaStore,
@NonNull PolarisConfigurationStore configurationStore) {
this(
realmContext, metaStore, new MetricsPersistence() {}, configurationStore::getConfiguration);
this(realmContext, metaStore, configurationStore::getConfiguration);
}

/**
* Convenience constructor for backends whose {@link BasePersistence} implementation also
* implements {@link MetricsPersistence} (the common in-tree case). Callers that need to wire a
* distinct metrics implementation should use {@link #PolarisCallContext(RealmContext,
* BasePersistence, MetricsPersistence, RealmConfigurationSource)}.
*/
public <P extends BasePersistence & MetricsPersistence> PolarisCallContext(
@NonNull RealmContext realmContext,
@NonNull P metaStore,
@NonNull RealmConfigurationSource configurationSource) {
this(realmContext, metaStore, metaStore, configurationSource);
}

/** Primary constructor — {@link MetricsPersistence} is supplied separately from the metastore. */
public PolarisCallContext(
@NonNull RealmContext realmContext,
@NonNull BasePersistence metaStore,
@NonNull MetricsPersistence metricsPersistence,
@NonNull RealmConfigurationSource configurationSource) {
this.realmContext = realmContext;
this.metaStore = metaStore;
this.metricsPersistence = metricsPersistence;
this.configurationSource = configurationSource;
this.realmConfig = new RealmConfigImpl(this.configurationSource, this.realmContext);
}

/** Convenience constructor that defaults to {@link RealmConfigurationSource#EMPTY_CONFIG}. */
public PolarisCallContext(
@NonNull RealmContext realmContext,
@NonNull BasePersistence metaStore,
@NonNull MetricsPersistence metricsPersistence) {
this(realmContext, metaStore, metricsPersistence, RealmConfigurationSource.EMPTY_CONFIG);
}

/**
* Convenience constructor for callers whose persistence type satisfies both SPIs and who do not
* have a {@link RealmConfigurationSource}.
*/
public <P extends BasePersistence & MetricsPersistence> PolarisCallContext(
@NonNull RealmContext realmContext, @NonNull P metaStore) {
this(realmContext, metaStore, metaStore, RealmConfigurationSource.EMPTY_CONFIG);
@NonNull RealmContext realmContext, @NonNull BasePersistence metaStore) {
this(realmContext, metaStore, RealmConfigurationSource.EMPTY_CONFIG);
}

public BasePersistence getMetaStore() {
return metaStore;
}

public MetricsPersistence getMetricsPersistence() {
return metricsPersistence;
}

@Override
public RealmContext getRealmContext() {
return realmContext;
Expand All @@ -136,7 +102,6 @@ public PolarisCallContext copy() {
// copy of the RealmContext to ensure the access during the task executor.
String realmId = this.realmContext.getRealmIdentifier();
RealmContext realmContext = () -> realmId;
return new PolarisCallContext(
realmContext, this.metaStore, this.metricsPersistence, this.configurationSource);
return new PolarisCallContext(realmContext, this.metaStore, this.configurationSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.core.persistence.metrics;

import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.persistence.BasePersistence;
import org.jspecify.annotations.NonNull;

/**
Expand Down Expand Up @@ -56,7 +57,10 @@ public interface PolarisMetricsManager {
*/
default void writeScanMetrics(
@NonNull PolarisCallContext callCtx, @NonNull ScanMetricsRecord record) {
callCtx.getMetricsPersistence().writeScanReport(record);
BasePersistence metaStore = callCtx.getMetaStore();
if (metaStore instanceof MetricsPersistence metricsPersistence) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is no longer invoked, I think... why not remove it?

metricsPersistence.writeScanReport(record);
}
}

/**
Expand All @@ -73,6 +77,9 @@ default void writeScanMetrics(
*/
default void writeCommitMetrics(
@NonNull PolarisCallContext callCtx, @NonNull CommitMetricsRecord record) {
callCtx.getMetricsPersistence().writeCommitReport(record);
BasePersistence metaStore = callCtx.getMetaStore();
if (metaStore instanceof MetricsPersistence metricsPersistence) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also not invoked, AFAIK.

metricsPersistence.writeCommitReport(record);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand All @@ -58,7 +57,7 @@ public class AtomicOperationMetaStoreManagerRefreshTest {
public void setUp() {
diagnostics = new PolarisDefaultDiagServiceImpl();
metaStore = Mockito.mock(BasePersistence.class);
callCtx = new PolarisCallContext(() -> "testRealm", metaStore, new MetricsPersistence() {});
callCtx = new PolarisCallContext(() -> "testRealm", metaStore);
manager = new AtomicOperationMetaStoreManager(Clock.systemUTC(), diagnostics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,19 @@ public CallContext polarisCallContext(
RealmConfigurationSource configurationSource,
MetaStoreManagerFactory metaStoreManagerFactory) {
BasePersistence metaStore = metaStoreManagerFactory.getOrCreateSession(realmContext);
return new PolarisCallContext(realmContext, metaStore, configurationSource);
}

@Produces
@RequestScoped
public MetricsPersistence metricsPersistence(
RealmContext realmContext, MetaStoreManagerFactory metaStoreManagerFactory) {
BasePersistence metaStore = metaStoreManagerFactory.getOrCreateSession(realmContext);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bother calling getOrCreateSession() now? This producer will be invoked by CDI only for use cases that need a MetricsPersistence, not BasePersistence 🤔

The old optimization of reusing the same object is no longer relevant.

I think calling getOrCreateMetricsPersistence() (below) is sufficient.

// When the backend implements both SPIs on the same instance (e.g. JDBC, in-memory), reuse the
// session instead of building a second persistence instance per request.
MetricsPersistence metricsPersistence =
(metaStore instanceof MetricsPersistence mp)
? mp
: metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
return new PolarisCallContext(realmContext, metaStore, metricsPersistence, configurationSource);
return (metaStore instanceof MetricsPersistence mp)
? mp
: metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ protected void flush(String realmId, List<PolarisEvent> events) {
RealmContext realmContext = () -> realmId;
var metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
var basePersistence = metaStoreManagerFactory.getOrCreateSession(realmContext);
var metricsPersistence = metaStoreManagerFactory.getOrCreateMetricsPersistence(realmContext);
var callContext = new PolarisCallContext(realmContext, basePersistence, metricsPersistence);
var callContext = new PolarisCallContext(realmContext, basePersistence);
metaStoreManager.writeEvents(callContext, events);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,52 +30,46 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RequestIdSupplier;
import org.apache.polaris.core.metrics.iceberg.MetricsRecordConverter;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link PolarisMetricsReporter} that persists metrics using the {@link
* PolarisMetaStoreManager} from the current {@link CallContext}.
* Implementation of {@link PolarisMetricsReporter} that persists metrics via the request-scoped
* {@link MetricsPersistence} backend.
*
* <p>This reporter is selected when {@code polaris.iceberg-metrics.reporting.type} is set to {@code
* "persisting"}.
*
* <p>The reporter uses {@link PolarisMetaStoreManager} to persist metrics, following the same
* abstraction pattern as other Polaris operations. If the underlying persistence does not support
* metrics, they are silently discarded.
* <p>If the underlying persistence does not support metrics, they are silently discarded.
*
* <p>The reporter receives catalog and table IDs from the caller (already resolved during
* authorization), avoiding redundant entity lookups. It uses {@link MetricsRecordConverter} to
* convert Iceberg metrics reports to SPI records before persisting them.
*
* @see PolarisMetricsReporter
* @see PolarisMetaStoreManager
* @see MetricsPersistence
* @see MetricsRecordConverter
*/
@RequestScoped
@Identifier("persisting")
public class PersistingMetricsReporter implements PolarisMetricsReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(PersistingMetricsReporter.class);

private final CallContext callContext;
private final PolarisMetaStoreManager metaStoreManager;
private final MetricsPersistence metricsPersistence;
private final Instance<PolarisPrincipal> polarisPrincipal;
private final Instance<RequestIdSupplier> requestIdSupplier;

@Inject
public PersistingMetricsReporter(
CallContext callContext,
PolarisMetaStoreManager metaStoreManager,
MetricsPersistence metricsPersistence,
Instance<PolarisPrincipal> polarisPrincipal,
Instance<RequestIdSupplier> requestIdSupplier) {
this.callContext = callContext;
this.metaStoreManager = metaStoreManager;
this.metricsPersistence = metricsPersistence;
this.polarisPrincipal = polarisPrincipal;
this.requestIdSupplier = requestIdSupplier;
}
Expand Down Expand Up @@ -113,7 +107,7 @@ public void reportMetric(
.otelTraceId(otelTraceId)
.otelSpanId(otelSpanId)
.build();
metaStoreManager.writeScanMetrics(callContext.getPolarisCallContext(), record);
metricsPersistence.writeScanReport(record);
LOGGER.debug(
"Persisted scan metrics for {}.{} (reportId={})", catalogName, table, record.reportId());
} else if (metricsReport instanceof CommitReport commitReport) {
Expand All @@ -127,7 +121,7 @@ public void reportMetric(
.otelTraceId(otelTraceId)
.otelSpanId(otelSpanId)
.build();
metaStoreManager.writeCommitMetrics(callContext.getPolarisCallContext(), record);
metricsPersistence.writeCommitReport(record);
LOGGER.debug(
"Persisted commit metrics for {}.{} (reportId={})",
catalogName,
Expand Down
Loading