Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public CallContext polarisCallContext(
return new PolarisCallContext(realmContext, metaStore, metricsPersistence, configurationSource);
}

@Produces
@RequestScoped
public MetricsPersistence metricsPersistence(CallContext callContext) {
return callContext.getPolarisCallContext().getMetricsPersistence();

@dimas-b dimas-b Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4655 implies avoiding indirection via CallContext 😉

Would you mind taking this one step further and removing MetricsPersistence from CallContext? I do not think there is any reason for it to be there after this PR. This producer can deal with the factory directly (cf. polarisCallContext() on line 138).

The benefit would be that MetricsPersistence would be materialized only on API calls involving metrics reporting.

If you prefer that can be done as a separate follow-up PR. Current change is good and self-contained 👍

}

@Produces
@RequestScoped
public RealmConfig realmConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,52 +30,46 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RequestIdSupplier;
import org.apache.polaris.core.metrics.iceberg.MetricsRecordConverter;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link PolarisMetricsReporter} that persists metrics using the {@link
* PolarisMetaStoreManager} from the current {@link CallContext}.
* Implementation of {@link PolarisMetricsReporter} that persists metrics via the request-scoped
* {@link MetricsPersistence} backend.
*
* <p>This reporter is selected when {@code polaris.iceberg-metrics.reporting.type} is set to {@code
* "persisting"}.
*
* <p>The reporter uses {@link PolarisMetaStoreManager} to persist metrics, following the same
* abstraction pattern as other Polaris operations. If the underlying persistence does not support
* metrics, they are silently discarded.
* <p>If the underlying persistence does not support metrics, they are silently discarded.
*
* <p>The reporter receives catalog and table IDs from the caller (already resolved during
* authorization), avoiding redundant entity lookups. It uses {@link MetricsRecordConverter} to
* convert Iceberg metrics reports to SPI records before persisting them.
*
* @see PolarisMetricsReporter
* @see PolarisMetaStoreManager
* @see MetricsPersistence
* @see MetricsRecordConverter
*/
@RequestScoped
@Identifier("persisting")
public class PersistingMetricsReporter implements PolarisMetricsReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(PersistingMetricsReporter.class);

private final CallContext callContext;
private final PolarisMetaStoreManager metaStoreManager;
private final MetricsPersistence metricsPersistence;
private final Instance<PolarisPrincipal> polarisPrincipal;
private final Instance<RequestIdSupplier> requestIdSupplier;

@Inject
public PersistingMetricsReporter(
CallContext callContext,
PolarisMetaStoreManager metaStoreManager,
MetricsPersistence metricsPersistence,
Instance<PolarisPrincipal> polarisPrincipal,
Instance<RequestIdSupplier> requestIdSupplier) {
this.callContext = callContext;
this.metaStoreManager = metaStoreManager;
this.metricsPersistence = metricsPersistence;
this.polarisPrincipal = polarisPrincipal;
this.requestIdSupplier = requestIdSupplier;
}
Expand Down Expand Up @@ -113,7 +107,7 @@ public void reportMetric(
.otelTraceId(otelTraceId)
.otelSpanId(otelSpanId)
.build();
metaStoreManager.writeScanMetrics(callContext.getPolarisCallContext(), record);
metricsPersistence.writeScanReport(record);
LOGGER.debug(
"Persisted scan metrics for {}.{} (reportId={})", catalogName, table, record.reportId());
} else if (metricsReport instanceof CommitReport commitReport) {
Expand All @@ -127,7 +121,7 @@ public void reportMetric(
.otelTraceId(otelTraceId)
.otelSpanId(otelSpanId)
.build();
metaStoreManager.writeCommitMetrics(callContext.getPolarisCallContext(), record);
metricsPersistence.writeCommitReport(record);
LOGGER.debug(
"Persisted commit metrics for {}.{} (reportId={})",
catalogName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RequestIdSupplier;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -56,7 +54,7 @@
*
* <p>Note: The reporter now receives catalogId and tableId directly from the caller (already
* resolved during authorization in IcebergCatalogHandler), so there's no need to mock entity
* lookups. The reporter uses {@link PolarisMetaStoreManager} to persist metrics.
* lookups. The reporter uses {@link MetricsPersistence} to persist metrics.
*/
public class PersistingMetricsReporterTest {

Expand All @@ -67,20 +65,13 @@ public class PersistingMetricsReporterTest {
private static final TableIdentifier TABLE_IDENTIFIER =
TableIdentifier.of(Namespace.of("db", "schema"), TABLE_NAME);

private PolarisMetaStoreManager metaStoreManager;
private PolarisCallContext polarisCallContext;
private MetricsPersistence metricsPersistence;
private PersistingMetricsReporter reporter;

@SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
// Mock PolarisMetaStoreManager
metaStoreManager = mock(PolarisMetaStoreManager.class);

// Mock CallContext
CallContext callContext = mock(CallContext.class);
polarisCallContext = mock(PolarisCallContext.class);
when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
metricsPersistence = mock(MetricsPersistence.class);

// Mock Instance beans (not resolvable in test context)
Instance<PolarisPrincipal> principalInstance = mock(Instance.class);
Expand All @@ -90,8 +81,7 @@ void setUp() {
when(requestIdInstance.isResolvable()).thenReturn(false);

reporter =
new PersistingMetricsReporter(
callContext, metaStoreManager, principalInstance, requestIdInstance);
new PersistingMetricsReporter(metricsPersistence, principalInstance, requestIdInstance);
}

@Test
Expand All @@ -103,9 +93,9 @@ void testReportScanMetrics() {
reporter.reportMetric(
CATALOG_NAME, CATALOG_ID, TABLE_IDENTIFIER, TABLE_ID, scanReport, Instant.now());

// Verify metaStoreManager was called with correct record
// Verify metricsPersistence was called with correct record
ArgumentCaptor<ScanMetricsRecord> captor = ArgumentCaptor.forClass(ScanMetricsRecord.class);
verify(metaStoreManager).writeScanMetrics(any(PolarisCallContext.class), captor.capture());
verify(metricsPersistence).writeScanReport(captor.capture());

ScanMetricsRecord record = captor.getValue();
assertThat(record.catalogId()).isEqualTo(CATALOG_ID);
Expand All @@ -122,9 +112,9 @@ void testReportCommitMetrics() {
reporter.reportMetric(
CATALOG_NAME, CATALOG_ID, TABLE_IDENTIFIER, TABLE_ID, commitReport, Instant.now());

// Verify metaStoreManager was called with correct record
// Verify metricsPersistence was called with correct record
ArgumentCaptor<CommitMetricsRecord> captor = ArgumentCaptor.forClass(CommitMetricsRecord.class);
verify(metaStoreManager).writeCommitMetrics(any(PolarisCallContext.class), captor.capture());
verify(metricsPersistence).writeCommitReport(captor.capture());

CommitMetricsRecord record = captor.getValue();
assertThat(record.catalogId()).isEqualTo(CATALOG_ID);
Expand All @@ -141,9 +131,9 @@ void testUnknownReportType() {
reporter.reportMetric(
CATALOG_NAME, CATALOG_ID, TABLE_IDENTIFIER, TABLE_ID, unknownReport, Instant.now());

// Verify metaStoreManager was NOT called since report type is unknown
verify(metaStoreManager, never()).writeScanMetrics(any(), any());
verify(metaStoreManager, never()).writeCommitMetrics(any(), any());
// Verify metricsPersistence was NOT called since report type is unknown
verify(metricsPersistence, never()).writeScanReport(any());
verify(metricsPersistence, never()).writeCommitReport(any());
}

private ScanReport createScanReport() {
Expand Down