diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index db528b8c7179d..e4f40e87d171b 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -120,6 +120,7 @@ + diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 973d586b01093..2289c00e952a2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -19,19 +19,16 @@ package kafka.network import java.io.IOException import java.net._ -import java.nio.ByteBuffer import java.nio.channels.{Selector => NSelector, _} import java.util import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic._ -import kafka.network.Processor._ import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse} import kafka.server.{BrokerReconfigurable, KafkaConfig} import org.apache.kafka.common.message.ApiMessageType.ListenerType import kafka.utils._ import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate} @@ -43,7 +40,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.internals.LogContext import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable} -import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, Request, SocketServer => JSocketServer, SocketServerConfigs, TooManyConnectionsException} +import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, Request, SocketServerConfigs, TooManyConnectionsException, Processor => JProcessor, SocketServer => JSocketServer} import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory} import org.apache.kafka.server.config.QuotaConfig @@ -500,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor") private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName( "AcceptorBlockedPercent", - MetricsUtils.getTags(ListenerMetricTag, endPoint.listener)) + MetricsUtils.getTags(JProcessor.LISTENER_METRIC_TAG, endPoint.listener)) private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) private var currentProcessorIndex = 0 private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]() @@ -762,7 +759,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, credentialProvider, memoryPool, logContext, - Processor.ConnectionQueueSize, + JProcessor.CONNECTION_QUEUE_SIZE, isPrivilegedListener, apiVersionManager, name, @@ -770,24 +767,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, } } -private[kafka] object Processor { - private val IdlePercentMetricName = "IdlePercent" - val NetworkProcessorMetricTag = "networkProcessor" - val ListenerMetricTag = "listener" - val ConnectionQueueSize = 20 - - private[network] def parseRequestHeader(apiVersionManager: ApiVersionManager, buffer: ByteBuffer): RequestHeader = { - val header = RequestHeader.parse(buffer) - if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) { - header - } else if (header.isApiVersionSupported()) { - throw new InvalidRequestException(s"Received request for disabled api with key ${header.apiKey.id} (${header.apiKey().name}) and version ${header.apiVersion}") - } else { - throw new UnsupportedVersionException(s"Received request for api with key ${header.apiKey.id} (${header.apiKey().name}) and unsupported version ${header.apiVersion}") - } - } -} - /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selector @@ -833,17 +812,17 @@ private[kafka] class Processor( private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]() private[kafka] val metricTags = mutable.LinkedHashMap( - ListenerMetricTag -> listenerName.value, - NetworkProcessorMetricTag -> id.toString + JProcessor.LISTENER_METRIC_TAG -> listenerName.value, + JProcessor.NETWORK_PROCESSOR_METRIC_TAG -> id.toString ).asJava - metricsGroup.newGauge(IdlePercentMetricName, () => { + metricsGroup.newGauge(JProcessor.IDLE_PERCENT_METRIC_NAME, () => { Option(metrics.metric(metrics.metricName("io-wait-ratio", JSocketServer.METRICS_GROUP, metricTags))).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }, // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric // also includes the listener name) - MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString) + MetricsUtils.getTags(JProcessor.NETWORK_PROCESSOR_METRIC_TAG, id.toString) ) private val expiredConnectionsKilledCount = new CumulativeSum() @@ -1010,7 +989,7 @@ private[kafka] class Processor( try { openOrClosingChannel(receive.source) match { case Some(channel) => - header = parseRequestHeader(apiVersionManager, receive.payload) + header = JProcessor.parseRequestHeader(apiVersionManager, receive.payload) if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, () => time.nanoseconds())) trace(s"Begin re-authentication: $channel") @@ -1198,8 +1177,8 @@ private[kafka] class Processor( close(channel.id) } selector.close() - metricsGroup.removeMetric(IdlePercentMetricName, - MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)) + metricsGroup.removeMetric(JProcessor.IDLE_PERCENT_METRIC_NAME, + MetricsUtils.getTags(JProcessor.NETWORK_PROCESSOR_METRIC_TAG, id.toString)) } // 'protected` to allow override for testing @@ -1701,7 +1680,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time", JSocketServer.METRICS_GROUP, "Tracking average throttle-time, out of non-zero throttle times, per listener", - MetricsUtils.getTags(ListenerMetricTag, listener.value)) + MetricsUtils.getTags(JProcessor.LISTENER_METRIC_TAG, listener.value)) sensor.add(metricName, new Avg) sensor } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c3706c7bb0c35..bbd9556ede76f 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -29,7 +29,7 @@ import java.util.concurrent._ import javax.management.ObjectName import com.yammer.metrics.core.MetricName import kafka.api.SaslSetup -import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel} +import kafka.network.{DataPlaneAcceptor, RequestChannel} import kafka.security.JaasTestUtils import kafka.utils._ import kafka.utils.Implicits._ @@ -54,7 +54,7 @@ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.network.{Processor, SocketServerConfigs} import org.apache.kafka.raft.MetadataLogConfig import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} @@ -885,7 +885,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private def isProcessorMetric(metricName: MetricName): Boolean = { val mbeanName = metricName.getMBeanName - mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=") + mbeanName.contains(s"${Processor.NETWORK_PROCESSOR_METRIC_TAG}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=") } private def clearLeftOverProcessorMetrics(): Unit = { @@ -900,13 +900,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala - .filter(_.tags.containsKey(Processor.NetworkProcessorMetricTag)) - .groupBy(_.tags.get(Processor.ListenerMetricTag)) + .filter(_.tags.containsKey(Processor.NETWORK_PROCESSOR_METRIC_TAG)) + .groupBy(_.tags.get(Processor.LISTENER_METRIC_TAG)) assertEquals(2, kafkaMetrics.size) // 2 listeners // 2 threads per listener - assertEquals(2, kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) - assertEquals(2, kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size) + assertEquals(2, kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NETWORK_PROCESSOR_METRIC_TAG)).size) + assertEquals(2, kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NETWORK_PROCESSOR_METRIC_TAG)).size) KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala .filter(isProcessorMetric) diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala index 90569a9b15546..30d342052e128 100644 --- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala @@ -22,7 +22,6 @@ import java.util import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit} import java.util.Properties import com.yammer.metrics.core.Meter -import kafka.network.Processor.ListenerMetricTag import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigException @@ -30,7 +29,7 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics} import org.apache.kafka.common.network._ import org.apache.kafka.common.utils.Time -import org.apache.kafka.network.{ConnectionThrottledException, SocketServer, SocketServerConfigs, TooManyConnectionsException} +import org.apache.kafka.network.{ConnectionThrottledException, Processor, SocketServer, SocketServerConfigs, TooManyConnectionsException} import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.ServerTestUtils @@ -94,7 +93,7 @@ class ConnectionQuotasTest { listeners.keys.foreach { name => blockedPercentMeters.put(name, new KafkaMetricsGroup(metricsPackage, metricsClassName).newMeter( - s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, util.Map.of(ListenerMetricTag, name))) + s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, util.Map.of(Processor.LISTENER_METRIC_TAG, name))) } // use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on // system time; so using mock time will likely result in test flakiness due to a mixed use of mock and system time @@ -832,7 +831,7 @@ class ConnectionQuotasTest { val metricName = metrics.metricName( "connection-accept-throttle-time", SocketServer.METRICS_GROUP, - util.Map.of(Processor.ListenerMetricTag, listener)) + util.Map.of(Processor.LISTENER_METRIC_TAG, listener)) metrics.metric(metricName) } @@ -840,7 +839,7 @@ class ConnectionQuotasTest { val metricName = metrics.metricName( "ip-connection-accept-throttle-time", SocketServer.METRICS_GROUP, - util.Map.of(Processor.ListenerMetricTag, listener)) + util.Map.of(Processor.LISTENER_METRIC_TAG, listener)) metrics.metric(metricName) } @@ -848,7 +847,7 @@ class ConnectionQuotasTest { val metricName = metrics.metricName( "connection-accept-rate", SocketServer.METRICS_GROUP, - util.Map.of(Processor.ListenerMetricTag, listener)) + util.Map.of(Processor.LISTENER_METRIC_TAG, listener)) metrics.metric(metricName) } diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala deleted file mode 100644 index 51d3de412504a..0000000000000 --- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import org.apache.kafka.clients.NodeApiVersions -import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} -import org.apache.kafka.common.message.ApiMessageType.ListenerType -import org.apache.kafka.common.message.RequestHeaderData -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils} -import org.apache.kafka.metadata.KRaftMetadataCache -import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager, SimpleApiVersionManager} -import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} -import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue} -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.function.Executable -import org.mockito.Mockito.mock - -import java.util -import java.util.function.Supplier -import java.util.Optional - -class ProcessorTest { - - @Test - def testParseRequestHeaderWithDisabledApiVersion(): Unit = { - val requestHeader = RequestTestUtils.serializeRequestHeader( - new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0)) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true, - () => FinalizedFeatures.of(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0)) - val e = assertThrows(classOf[InvalidRequestException], - (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, - "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception") - assertTrue(e.toString.contains("disabled api")) - } - - @Test - def testParseRequestHeaderWithUnsupportedApi(): Unit = { - // We have to use `RequestHeaderData` since `ApiMessageType` doesn't support this protocol api - val headerVersion = 0.toShort - val requestHeaderData = new RequestHeaderData() - .setRequestApiKey(ApiKeys.LEADER_AND_ISR.id) - .setRequestApiVersion(headerVersion) - .setClientId("clientid") - .setCorrelationId(0) - val requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion)) - val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[Supplier[Optional[NodeApiVersions]]]), - BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true, Optional.empty) - val e = assertThrows(classOf[InvalidRequestException], - (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, - "LEADER_AND_ISR should throw InvalidRequestException exception") - assertTrue(e.toString.contains("Unsupported api")) - } - - @Test - def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = { - val requestHeader = RequestTestUtils.serializeRequestHeader( - new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0)) - val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[Supplier[Optional[NodeApiVersions]]]), - BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true, Optional.empty) - val e = assertThrows(classOf[UnsupportedVersionException], - (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, - "FETCH v0 should throw UnsupportedVersionException exception") - assertTrue(e.toString.contains("unsupported version")) - } - - /** - * We do something unusual with these versions of produce, and we want to make sure we don't regress. - * See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details. - */ - @Test - def testParseRequestHeaderForProduceV0ToV2(): Unit = { - for (version <- 0 to 2) { - val requestHeader = RequestTestUtils.serializeRequestHeader( - new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0)) - val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[Supplier[Optional[NodeApiVersions]]]), - BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true, Optional.empty) - val e = assertThrows(classOf[UnsupportedVersionException], - (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, - s"PRODUCE $version should throw UnsupportedVersionException exception") - assertTrue(e.toString.contains("unsupported version")) - } - } -} diff --git a/server/src/main/java/org/apache/kafka/network/Processor.java b/server/src/main/java/org/apache/kafka/network/Processor.java new file mode 100644 index 0000000000000..25b91766ece0c --- /dev/null +++ b/server/src/main/java/org/apache/kafka/network/Processor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.server.ApiVersionManager; + +import java.nio.ByteBuffer; + +public class Processor { + + public static final String IDLE_PERCENT_METRIC_NAME = "IdlePercent"; + public static final String NETWORK_PROCESSOR_METRIC_TAG = "networkProcessor"; + public static final String LISTENER_METRIC_TAG = "listener"; + public static final int CONNECTION_QUEUE_SIZE = 20; + + public static RequestHeader parseRequestHeader(ApiVersionManager apiVersionManager, ByteBuffer buffer) { + RequestHeader header = RequestHeader.parse(buffer); + if (apiVersionManager.isApiEnabled(header.apiKey(), header.apiVersion())) { + return header; + } else if (header.isApiVersionSupported()) { + throw new InvalidRequestException("Received request for disabled api with key " + header.apiKey().id + " (" + header.apiKey().name() + ") and version " + header.apiVersion()); + } else { + throw new UnsupportedVersionException("Received request for api with key " + header.apiKey().id + " (" + header.apiKey().name + ") and unsupported version " + header.apiVersion()); + } + } +} diff --git a/server/src/test/java/org/apache/kafka/network/ProcessorTest.java b/server/src/test/java/org/apache/kafka/network/ProcessorTest.java new file mode 100644 index 0000000000000..1bd37a1e5df4a --- /dev/null +++ b/server/src/test/java/org/apache/kafka/network/ProcessorTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.RequestHeaderData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.metadata.KRaftMetadataCache; +import org.apache.kafka.server.BrokerFeatures; +import org.apache.kafka.server.DefaultApiVersionManager; +import org.apache.kafka.server.SimpleApiVersionManager; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class ProcessorTest { + + @Test + public void testParseRequestHeaderWithDisabledApiVersion() { + ByteBuffer requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.INIT_PRODUCER_ID, (short) 0, "clientid", 0)); + SimpleApiVersionManager apiVersionManager = new SimpleApiVersionManager(ApiMessageType.ListenerType.CONTROLLER, true, + () -> FinalizedFeatures.of(MetadataVersion.latestTesting(), Map.of(), 0)); + Throwable e = assertThrows(InvalidRequestException.class, + () -> Processor.parseRequestHeader(apiVersionManager, requestHeader), + "INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception"); + assertTrue(e.toString().contains("disabled api")); + } + + @Test + public void testParseRequestHeaderWithUnsupportedApi() { + // We have to use `RequestHeaderData` since `ApiMessageType` doesn't support this protocol api + short headerVersion = 0; + RequestHeaderData requestHeaderData = new RequestHeaderData() + .setRequestApiKey(ApiKeys.LEADER_AND_ISR.id) + .setRequestApiVersion(headerVersion) + .setClientId("clientid") + .setCorrelationId(0); + ByteBuffer requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion)); + @SuppressWarnings("unchecked") + DefaultApiVersionManager apiVersionManager = new DefaultApiVersionManager(ApiMessageType.ListenerType.BROKER, mock(Supplier.class), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () -> KRaftVersion.LATEST_PRODUCTION), true, Optional.empty()); + Throwable e = assertThrows(InvalidRequestException.class, + () -> Processor.parseRequestHeader(apiVersionManager, requestHeader), + "LEADER_AND_ISR should throw InvalidRequestException exception"); + assertTrue(e.toString().contains("Unsupported api")); + } + + @Test + public void testParseRequestHeaderWithUnsupportedApiVersion() { + ByteBuffer requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.FETCH, (short) 0, "clientid", 0)); + @SuppressWarnings("unchecked") + DefaultApiVersionManager apiVersionManager = new DefaultApiVersionManager(ApiMessageType.ListenerType.BROKER, mock(Supplier.class), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () -> KRaftVersion.LATEST_PRODUCTION), true, Optional.empty()); + Throwable e = assertThrows(UnsupportedVersionException.class, + () -> Processor.parseRequestHeader(apiVersionManager, requestHeader), + "FETCH v0 should throw UnsupportedVersionException exception"); + assertTrue(e.toString().contains("unsupported version")); + } + + /** + * We do something unusual with these versions of produce, and we want to make sure we don't regress. + * See {@link ApiKeys#PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION} for details. + */ + @Test + public void testParseRequestHeaderForProduceV0ToV2() { + for (short version = 0; version <= 2; version++) { + ByteBuffer requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.PRODUCE, version, "clientid", 0)); + @SuppressWarnings("unchecked") + DefaultApiVersionManager apiVersionManager = new DefaultApiVersionManager(ApiMessageType.ListenerType.BROKER, mock(Supplier.class), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () -> KRaftVersion.LATEST_PRODUCTION), true, Optional.empty()); + Throwable e = assertThrows(UnsupportedVersionException.class, + () -> Processor.parseRequestHeader(apiVersionManager, requestHeader), + "PRODUCE " + version + " should throw UnsupportedVersionException exception"); + assertTrue(e.toString().contains("unsupported version")); + } + } + +}