Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
<subpackage name="network">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.network.metrics" />
<allow pkg="org.apache.kafka.server" />
</subpackage>

</import-control>
43 changes: 11 additions & 32 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ package kafka.network

import java.io.IOException
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.{Selector => NSelector, _}
import java.util
import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
Expand All @@ -43,7 +40,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.internals.LogContext
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, Request, SocketServer => JSocketServer, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, Request, SocketServerConfigs, TooManyConnectionsException, Processor => JProcessor, SocketServer => JSocketServer}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
Expand Down Expand Up @@ -500,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
MetricsUtils.getTags(ListenerMetricTag, endPoint.listener))
MetricsUtils.getTags(JProcessor.LISTENER_METRIC_TAG, endPoint.listener))
private val blockedPercentMeter = backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
Expand Down Expand Up @@ -762,32 +759,14 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
credentialProvider,
memoryPool,
logContext,
Processor.ConnectionQueueSize,
JProcessor.CONNECTION_QUEUE_SIZE,
isPrivilegedListener,
apiVersionManager,
name,
connectionDisconnectListeners)
}
}

private[kafka] object Processor {
private val IdlePercentMetricName = "IdlePercent"
val NetworkProcessorMetricTag = "networkProcessor"
val ListenerMetricTag = "listener"
val ConnectionQueueSize = 20

private[network] def parseRequestHeader(apiVersionManager: ApiVersionManager, buffer: ByteBuffer): RequestHeader = {
val header = RequestHeader.parse(buffer)
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
header
} else if (header.isApiVersionSupported()) {
throw new InvalidRequestException(s"Received request for disabled api with key ${header.apiKey.id} (${header.apiKey().name}) and version ${header.apiVersion}")
} else {
throw new UnsupportedVersionException(s"Received request for api with key ${header.apiKey.id} (${header.apiKey().name}) and unsupported version ${header.apiVersion}")
}
}
}

/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
Expand Down Expand Up @@ -833,17 +812,17 @@ private[kafka] class Processor(
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

private[kafka] val metricTags = mutable.LinkedHashMap(
ListenerMetricTag -> listenerName.value,
NetworkProcessorMetricTag -> id.toString
JProcessor.LISTENER_METRIC_TAG -> listenerName.value,
JProcessor.NETWORK_PROCESSOR_METRIC_TAG -> id.toString
).asJava

metricsGroup.newGauge(IdlePercentMetricName, () => {
metricsGroup.newGauge(JProcessor.IDLE_PERCENT_METRIC_NAME, () => {
Option(metrics.metric(metrics.metricName("io-wait-ratio", JSocketServer.METRICS_GROUP, metricTags))).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
},
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)
MetricsUtils.getTags(JProcessor.NETWORK_PROCESSOR_METRIC_TAG, id.toString)
)

private val expiredConnectionsKilledCount = new CumulativeSum()
Expand Down Expand Up @@ -1010,7 +989,7 @@ private[kafka] class Processor(
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
header = parseRequestHeader(apiVersionManager, receive.payload)
header = JProcessor.parseRequestHeader(apiVersionManager, receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
() => time.nanoseconds()))
trace(s"Begin re-authentication: $channel")
Expand Down Expand Up @@ -1198,8 +1177,8 @@ private[kafka] class Processor(
close(channel.id)
}
selector.close()
metricsGroup.removeMetric(IdlePercentMetricName,
MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString))
metricsGroup.removeMetric(JProcessor.IDLE_PERCENT_METRIC_NAME,
MetricsUtils.getTags(JProcessor.NETWORK_PROCESSOR_METRIC_TAG, id.toString))
}

// 'protected` to allow override for testing
Expand Down Expand Up @@ -1701,7 +1680,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
JSocketServer.METRICS_GROUP,
"Tracking average throttle-time, out of non-zero throttle times, per listener",
MetricsUtils.getTags(ListenerMetricTag, listener.value))
MetricsUtils.getTags(JProcessor.LISTENER_METRIC_TAG, listener.value))
sensor.add(metricName, new Avg)
sensor
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.concurrent._
import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.api.SaslSetup
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.network.{DataPlaneAcceptor, RequestChannel}
import kafka.security.JaasTestUtils
import kafka.utils._
import kafka.utils.Implicits._
Expand All @@ -54,7 +54,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.{Processor, SocketServerConfigs}
import org.apache.kafka.raft.MetadataLogConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
Expand Down Expand Up @@ -885,7 +885,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup

private def isProcessorMetric(metricName: MetricName): Boolean = {
val mbeanName = metricName.getMBeanName
mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=")
mbeanName.contains(s"${Processor.NETWORK_PROCESSOR_METRIC_TAG}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=")
}

private def clearLeftOverProcessorMetrics(): Unit = {
Expand All @@ -900,13 +900,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners

val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala
.filter(_.tags.containsKey(Processor.NetworkProcessorMetricTag))
.groupBy(_.tags.get(Processor.ListenerMetricTag))
.filter(_.tags.containsKey(Processor.NETWORK_PROCESSOR_METRIC_TAG))
.groupBy(_.tags.get(Processor.LISTENER_METRIC_TAG))

assertEquals(2, kafkaMetrics.size) // 2 listeners
// 2 threads per listener
assertEquals(2, kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
assertEquals(2, kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NetworkProcessorMetricTag)).size)
assertEquals(2, kafkaMetrics("INTERNAL").groupBy(_.tags().get(Processor.NETWORK_PROCESSOR_METRIC_TAG)).size)
assertEquals(2, kafkaMetrics("EXTERNAL").groupBy(_.tags().get(Processor.NETWORK_PROCESSOR_METRIC_TAG)).size)

KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
.filter(isProcessorMetric)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
import java.util.Properties
import com.yammer.metrics.core.Meter
import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
import org.apache.kafka.common.network._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.{ConnectionThrottledException, SocketServer, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.network.{ConnectionThrottledException, Processor, SocketServer, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ServerTestUtils
Expand Down Expand Up @@ -94,7 +93,7 @@ class ConnectionQuotasTest {

listeners.keys.foreach { name =>
blockedPercentMeters.put(name, new KafkaMetricsGroup(metricsPackage, metricsClassName).newMeter(
s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, util.Map.of(ListenerMetricTag, name)))
s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, util.Map.of(Processor.LISTENER_METRIC_TAG, name)))
}
// use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on
// system time; so using mock time will likely result in test flakiness due to a mixed use of mock and system time
Expand Down Expand Up @@ -832,23 +831,23 @@ class ConnectionQuotasTest {
val metricName = metrics.metricName(
"connection-accept-throttle-time",
SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
util.Map.of(Processor.LISTENER_METRIC_TAG, listener))
metrics.metric(metricName)
}

private def ipConnThrottleMetric(listener: String): KafkaMetric = {
val metricName = metrics.metricName(
"ip-connection-accept-throttle-time",
SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
util.Map.of(Processor.LISTENER_METRIC_TAG, listener))
metrics.metric(metricName)
}

private def listenerConnRateMetric(listener: String) : KafkaMetric = {
val metricName = metrics.metricName(
"connection-accept-rate",
SocketServer.METRICS_GROUP,
util.Map.of(Processor.ListenerMetricTag, listener))
util.Map.of(Processor.LISTENER_METRIC_TAG, listener))
metrics.metric(metricName)
}

Expand Down
99 changes: 0 additions & 99 deletions core/src/test/scala/unit/kafka/network/ProcessorTest.scala

This file was deleted.

43 changes: 43 additions & 0 deletions server/src/main/java/org/apache/kafka/network/Processor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;

import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.server.ApiVersionManager;

import java.nio.ByteBuffer;

public class Processor {

public static final String IDLE_PERCENT_METRIC_NAME = "IdlePercent";
public static final String NETWORK_PROCESSOR_METRIC_TAG = "networkProcessor";
public static final String LISTENER_METRIC_TAG = "listener";
public static final int CONNECTION_QUEUE_SIZE = 20;

public static RequestHeader parseRequestHeader(ApiVersionManager apiVersionManager, ByteBuffer buffer) {
RequestHeader header = RequestHeader.parse(buffer);
if (apiVersionManager.isApiEnabled(header.apiKey(), header.apiVersion())) {
return header;
} else if (header.isApiVersionSupported()) {
throw new InvalidRequestException("Received request for disabled api with key " + header.apiKey().id + " (" + header.apiKey().name() + ") and version " + header.apiVersion());
} else {
throw new UnsupportedVersionException("Received request for api with key " + header.apiKey().id + " (" + header.apiKey().name + ") and unsupported version " + header.apiVersion());
}
}
}
Loading
Loading