Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.spi.connector.MetadataProvider;
import io.trino.spi.type.TypeManager;

import javax.management.MBeanServer;

import static java.util.Objects.requireNonNull;

public class ConnectorContextInstance
Expand All @@ -36,6 +38,7 @@ public class ConnectorContextInstance
private final MetadataProvider metadataProvider;
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final MBeanServer mbeanServer;

public ConnectorContextInstance(
OpenTelemetry openTelemetry,
Expand All @@ -45,7 +48,8 @@ public ConnectorContextInstance(
TypeManager typeManager,
MetadataProvider metadataProvider,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory)
PageIndexerFactory pageIndexerFactory,
MBeanServer mbeanServer)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
Expand All @@ -55,6 +59,7 @@ public ConnectorContextInstance(
this.metadataProvider = requireNonNull(metadataProvider, "metadataProvider is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
}

@Override
Expand Down Expand Up @@ -104,4 +109,10 @@ public PageIndexerFactory getPageIndexerFactory()
{
return pageIndexerFactory;
}

@Override
public MBeanServer getMBeanServer()
{
return mbeanServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.trino.sql.planner.OptimizerConfig;
import io.trino.transaction.TransactionManager;

import javax.management.MBeanServer;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class DefaultCatalogFactory

private final ConcurrentMap<ConnectorName, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
private final SecretsResolver secretsResolver;
private final MBeanServer mbeanServer;

@Inject
public DefaultCatalogFactory(
Expand All @@ -87,7 +90,8 @@ public DefaultCatalogFactory(
TypeManager typeManager,
NodeSchedulerConfig nodeSchedulerConfig,
OptimizerConfig optimizerConfig,
SecretsResolver secretsResolver)
SecretsResolver secretsResolver,
MBeanServer mbeanServer)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
Expand All @@ -102,6 +106,7 @@ public DefaultCatalogFactory(
this.schedulerIncludeCoordinator = nodeSchedulerConfig.isIncludeCoordinator();
this.maxPrefetchedInformationSchemaPrefixes = optimizerConfig.getMaxPrefetchedInformationSchemaPrefixes();
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
}

@Override
Expand Down Expand Up @@ -192,7 +197,8 @@ private Connector createConnector(CatalogName catalogName, ConnectorFactory conn
typeManager,
new InternalMetadataProvider(metadata, typeManager),
pageSorter,
pageIndexerFactory);
pageIndexerFactory,
mbeanServer);

try (ThreadContextClassLoader _ = new ThreadContextClassLoader(connectorFactory.getClass().getClassLoader())) {
// TODO: connector factory should take CatalogName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.eventlistener.EventListenerFactory;

import javax.management.MBeanServer;

import static java.util.Objects.requireNonNull;

public class EventListenerContextInstance
implements EventListenerFactory.EventListenerContext
{
private final OpenTelemetry openTelemetry;
private final MBeanServer mbeanServer;
private final Tracer tracer;
private final String version;

public EventListenerContextInstance(String version, OpenTelemetry openTelemetry, Tracer tracer)
public EventListenerContextInstance(String version, OpenTelemetry openTelemetry, Tracer tracer, MBeanServer mbeanServer)
{
this.version = requireNonNull(version, "version is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
}

@Override
Expand All @@ -50,4 +54,10 @@ public Tracer getTracer()
{
return tracer;
}

@Override
public MBeanServer getMBeanServer()
{
return this.mbeanServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.management.MBeanServer;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -71,11 +73,17 @@ public class EventListenerManager
private final EventListenerContextInstance context;

@Inject
public EventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver, OpenTelemetry openTelemetry, Tracer tracer, NodeVersion version)
public EventListenerManager(
EventListenerConfig config,
SecretsResolver secretsResolver,
OpenTelemetry openTelemetry,
Tracer tracer,
MBeanServer mbeanServer,
NodeVersion version)
{
this.configFiles = ImmutableList.copyOf(config.getEventListenerFiles());
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.context = new EventListenerContextInstance(version.toString(), openTelemetry, tracer);
this.context = new EventListenerContextInstance(version.toString(), openTelemetry, tracer, mbeanServer);
}

public void addEventListenerFactory(EventListenerFactory eventListenerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.exchange.ExchangeManagerContext;

import javax.management.MBeanServer;

import static java.util.Objects.requireNonNull;

public class ExchangeManagerContextInstance
implements ExchangeManagerContext
{
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final MBeanServer mbeanServer;

public ExchangeManagerContextInstance(
OpenTelemetry openTelemetry,
Tracer tracer)
Tracer tracer,
MBeanServer mbeanServer)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
}

@Override
Expand All @@ -44,4 +49,10 @@ public Tracer getTracer()
{
return tracer;
}

@Override
public MBeanServer getMBeanServer()
{
return mbeanServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.spi.exchange.ExchangeManagerFactory;
import jakarta.annotation.PreDestroy;

import javax.management.MBeanServer;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -49,6 +51,7 @@ public class ExchangeManagerRegistry

private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final MBeanServer mbeanServer;
private final Map<String, ExchangeManagerFactory> exchangeManagerFactories = new ConcurrentHashMap<>();

private volatile ExchangeManager exchangeManager;
Expand All @@ -59,11 +62,13 @@ public class ExchangeManagerRegistry
public ExchangeManagerRegistry(
OpenTelemetry openTelemetry,
Tracer tracer,
MBeanServer mbeanServer,
SecretsResolver secretsResolver,
ExchangeManagerConfig config)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.configFile = config.getExchangeManagerConfigFile();
}
Expand Down Expand Up @@ -101,7 +106,7 @@ public synchronized void loadExchangeManager(String name, Map<String, String> pr

ExchangeManager exchangeManager;
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
exchangeManager = factory.create(secretsResolver.getResolvedConfiguration(properties), new ExchangeManagerContextInstance(openTelemetry, tracer));
exchangeManager = factory.create(secretsResolver.getResolvedConfiguration(properties), new ExchangeManagerContextInstance(openTelemetry, tracer, mbeanServer));
}

log.info("-- Loaded exchange manager %s --", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.trino.spi.spool.SpoolingManagerContext;
import io.trino.spi.spool.SpoolingManagerFactory;

import javax.management.MBeanServer;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -56,17 +58,25 @@ public class SpoolingManagerRegistry
private final boolean coordinator;
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final MBeanServer mbeanServer;
private final Node currentNode;
private volatile SpoolingManager spoolingManager;

@Inject
public SpoolingManagerRegistry(InternalNode currentNode, ServerConfig serverConfig, SpoolingEnabledConfig config, OpenTelemetry openTelemetry, Tracer tracer)
public SpoolingManagerRegistry(
InternalNode currentNode,
ServerConfig serverConfig,
SpoolingEnabledConfig config,
OpenTelemetry openTelemetry,
Tracer tracer,
MBeanServer mbeanServer)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.enabled = config.isEnabled();
this.coordinator = serverConfig.isCoordinator();
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.mbeanServer = requireNonNull(mbeanServer, "mbeanServer is null");
}

public void addSpoolingManagerFactory(SpoolingManagerFactory factory)
Expand Down Expand Up @@ -125,6 +135,12 @@ public Tracer getTracer()
return tracer;
}

@Override
public MBeanServer getMBeanServer()
{
return mbeanServer;
}

@Override
public boolean isCoordinator()
{
Expand Down
11 changes: 7 additions & 4 deletions core/trino-main/src/main/java/io/trino/testing/PlanTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@
import io.trino.type.TypeDeserializer;
import io.trino.util.FinalizerService;
import org.intellij.lang.annotations.Language;
import org.weakref.jmx.testing.TestingMBeanServer;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -392,7 +393,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
this.joinCompiler = new JoinCompiler(typeOperators);
this.hashStrategyCompiler = new FlatHashStrategyCompiler(typeOperators, new NullSafeHashCompiler(typeOperators));
PageIndexerFactory pageIndexerFactory = new GroupByHashPageIndexerFactory(hashStrategyCompiler);
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), secretsResolver, noop(), tracer, CURRENT_NODE.getNodeVersion());
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), secretsResolver, noop(), tracer, new TestingMBeanServer(), CURRENT_NODE.getNodeVersion());
this.accessControl = new TestingAccessControlManager(transactionManager, eventListenerManager, secretsResolver);
accessControl.loadSystemAccessControl(AllowAllSystemAccessControl.NAME, ImmutableMap.of());

Expand All @@ -410,7 +411,8 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
typeManager,
nodeSchedulerConfig,
optimizerConfig,
secretsResolver));
secretsResolver,
new TestingMBeanServer()));
this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), tracer, new QueryManagerConfig());
this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager));
this.pageSinkManager = new PageSinkManager(createPageSinkProvider(catalogManager));
Expand Down Expand Up @@ -475,13 +477,14 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
ImmutableSet.of(),
ImmutableSet.of(new ExcludeColumnsFunction()));

exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig());
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), new TestingMBeanServer(), secretsResolver, new ExchangeManagerConfig());
SpoolingManagerRegistry spoolingManagerRegistry = new SpoolingManagerRegistry(
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
new ServerConfig(),
new SpoolingEnabledConfig(),
noop(),
noopTracer());
noopTracer(),
new TestingMBeanServer());
this.pluginManager = new PluginManager(
(loader, createClassLoader) -> {},
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.eventlistener.EventListenerFactory;
import org.weakref.jmx.testing.TestingMBeanServer;

import javax.management.MBeanServer;

public class TestingEventListenerContext
implements EventListenerFactory.EventListenerContext
Expand All @@ -38,4 +41,10 @@ public Tracer getTracer()
{
return Tracing.noopTracer();
}

@Override
public MBeanServer getMBeanServer()
{
return new TestingMBeanServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.eventlistener.EventListenerFactory;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import org.weakref.jmx.testing.TestingMBeanServer;

import java.util.Collections;
import java.util.HashSet;
Expand All @@ -45,7 +46,7 @@ public static TestingEventListenerManager emptyEventListenerManager()
@Inject
public TestingEventListenerManager(EventListenerConfig config, SecretsResolver secretsResolver)
{
super(config, secretsResolver, OpenTelemetry.noop(), Tracing.noopTracer(), new NodeVersion("test-version"));
super(config, secretsResolver, OpenTelemetry.noop(), Tracing.noopTracer(), new TestingMBeanServer(), new NodeVersion("test-version"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.trino.sql.tree.Statement;
import io.trino.transaction.TransactionManager;
import org.junit.jupiter.api.Test;
import org.weakref.jmx.testing.TestingMBeanServer;

import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testSubmittedForDispatchedQuery()
JsonCodec.jsonCodec(OperatorStats.class),
JsonCodec.jsonCodec(ExecutionFailureInfo.class),
JsonCodec.jsonCodec(StatsAndCosts.class),
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new NodeVersion("test")),
new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new TestingMBeanServer(), new NodeVersion("test")),
new NodeInfo("node"),
new NodeVersion("version"),
new SessionPropertyManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import org.junit.jupiter.api.Test;
import org.weakref.jmx.testing.TestingMBeanServer;

import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -31,7 +32,7 @@ class TestEventListenerManager
@Test
public void testShutdownIsForwardedToListeners()
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new NodeVersion("test-version"));
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig(), new SecretsResolver(ImmutableMap.of()), noop(), noopTracer(), new TestingMBeanServer(), new NodeVersion("test-version"));
AtomicBoolean wasCalled = new AtomicBoolean(false);
EventListener listener = new EventListener()
{
Expand Down
Loading
Loading