二、图示说明

三、源码分析
def main(args: Array[String]): Unit = {try {//解析参数,返回一个Properties对象val serverProps = getPropsFromArgs(args)//根据给定的Properties对象,构建一个kafkaServerStartable对象val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)...//TODO 服务端启动核心代码kafkaServerStartable.startup()kafkaServerStartable.awaitShutdown()}...}
def startup() {.../TODO NIO的服务端,在这个里面创建了acceptor线程和processor线程// config就是解析参数获取到的KafkaConfig配置对象socketServer = new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors = false)...}
def startup(startupProcessors: Boolean = true) {this.synchronized {//ConnectionQuotas是更新连接配额统计信息的类connectionQuotas = new ConnectionQuotas(config, time)//创建并启动处理控制类请求的Acceptor和ProcessorcreateControlPlaneAcceptorAndProcessor(config.controlPlaneListener)//创建并启动处理数据类请求的Acceptor和Processor// numNetworkThreads:服务端Network线程数,默认为3createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)//传入startupProcessors参数为false,里面代码不执行if (startupProcessors) {startControlPlaneProcessor()startDataPlaneProcessors()}}
createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,//每个服务对应的processor线程数,默认为3endpoints: Seq[EndPoint]): Unit = synchronized {//在kafka目录的config/server.properties文件中可以配置多个kafka服务//比如:// hadoop01:9092// hadoop01:9093// hadoop01:9094//那么这里的endpoints就是这多个服务的集合,而endpoint就对应hadoop01:9092,hadoop01:9093...//但是一般不会设置多个endpoints.foreach { endpoint =>//增加一个监听器connectionQuotas.addListener(config, endpoint.listenerName)//创建Acceptor线程val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)//在Acceptor对象中添加一组processor线程addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)//传入Acceptor对象,启动线程KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()//阻塞线程dataPlaneAcceptor.awaitStartup()//将Acceptor对象放入集合,通常情况只有一个Acceptor对象dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)info(s"Created data-plane acceptor and processors for endpoint : $endpoint")}}
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix
private[kafka] class Acceptor(val endPoint: EndPoint,//kafka broker的连接信息,包括主机名和端口号等val sendBufferSize: Int,//设置发送数据的缓冲大小,默认100kval recvBufferSize: Int,//设置接收数据的缓冲大小,默认100k,如果client和broker端的网络延迟大,建议调大这两个参数brokerId: Int,connectionQuotas: ConnectionQuotas,metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {//创建底层的NIO Selector对象private val nioSelector = NSelector.open()//服务端创建对应的ServerSocketChannel实例,后序会把这个Channel向上一步创建的Selector对象注册val serverChannel = openServerSocket(endPoint.host, endPoint.port)//processor线程数组,Acceptor线程初始化时,会创建processor的线程池private val processors = new ArrayBuffer[Processor]()//标记processors是否启动private val processorsStarted = new AtomicBooleanprivate val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent","blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))...}
从定义看,创建Acceptor线程接收了6个参数,重要的主要是前3个:
endpoint:broker的连接信息,其中包含主机名,端口号等 sendBufferSize:设置发送数据的缓冲大小,默认100k。由broker端参数socket.send.buffer.bytes配置 recvBufferSize:设置接收数据的缓冲大小,默认100k。由broker端参数socket.receive.buffer.bytes配置
nioSelector:Java NIO中的Selector对象 processors:Processor线程池。Acceptor在初始化时,会创建对应的Processor线程池,所以Processor线程对象是由Acceptor对象管理的。
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
这个方法的主要作用就是循环创建一组Processor线程对象,循环的次数由参数newProcessorsPerListener决定,即3。然后将这些对象添加到Acceptor对象的processors线程池对象中,并启动这些processor线程。
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {val listenerName = endpoint.listenerNameval securityProtocol = endpoint.securityProtocolval listenerProcessors = new ArrayBuffer[Processor]()//循环创建processor线程,循环次数为newProcessorsPerListener(3)for (_ <- 0 until newProcessorsPerListener) {val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)//放入数组listenerProcessors += processor//在RequestChannel对象中添加processor线程对象dataPlaneRequestChannel.addProcessor(processor)nextProcessorId += 1}//遍历所有的processor,将其放入集合中listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))//给Acceptor对象添加一组processor,即3个,并启动processor线程acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)}
acceptor.addProcessors方法对应的代码如下:
private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {processors ++= newProcessors//添加一组新的processor线程到processors线程池if (processorsStarted.get)//如果processors线程池已经启动startProcessors(newProcessors, processorThreadPrefix)//启动新添加的processor线程}
通过startProcessors方法就启动了这些Processor线程对象。
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
dataPlaneAcceptor.awaitStartup()
private val startupLatch = new CountDownLatch(1)
protected def startupComplete(): Unit = {// Replace the open latch with a closed oneshutdownLatch = new CountDownLatch(1)startupLatch.countDown()}
2. Acceptor线程是如何工作的
def run() {//在Selector上注册一个OP_ACCEPT事件serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)//等待Acceptor线程启动完成startupComplete()try {//当前processor在线程池中的下标var currentProcessorIndex = 0//服务一直不断地循环while (isRunning) {try {//select方法查看是否有事件注册上来,即获取准备好的SelectionKey的数量val ready = nioSelector.select(500)if (ready > 0) {//获取SelectionKey的集合val keys = nioSelector.selectedKeys()val iter = keys.iterator()//遍历所有的SelectionKeywhile (iter.hasNext && isRunning) {try {val key = iter.next//移除SelectionKeyiter.remove()//如果是连接事件if (key.isAcceptable) {//处理连接请求,返回一个SocketChannelaccept(key).foreach { socketChannel =>//processors就是处理器的集合,这里先获取空闲处理器的数量var retriesLeft = synchronized(processors.length)var processor: Processor = null//给上面的SocketChannel分配一个Processor线程do {//数量减1retriesLeft -= 1//分配一个processor,即指定这个链接由哪个processor线程处理processor = synchronized {//从processors中取出一个processor,实现轮询的效果,currentProcessorIndex = currentProcessorIndex % processors.lengthprocessors(currentProcessorIndex)}currentProcessorIndex += 1//如果分配到Processor,将对应的SocketChannel对象放入newConnections集合} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))}} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")}...}
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
while (isRunning) {try {//select方法查看是否有事件注册上来,即获取准备好的SelectionKey的数量val ready = nioSelector.select(500)if (ready > 0) {//获取SelectionKey的集合val keys = nioSelector.selectedKeys()val iter = keys.iterator()//遍历所有的SelectionKeywhile (iter.hasNext && isRunning) {try {val key = iter.next//移除SelectionKeyiter.remove()
if (key.isAcceptable) {//处理连接请求,返回一个SocketChannelaccept(key).foreach { socketChannel =>//processors就是处理器的集合,这里先获取空闲处理器的数量var retriesLeft = synchronized(processors.length)var processor: Processor = null//给上面的SocketChannel分配一个Processor线程do {//数量减1retriesLeft -= 1//分配一个processor,即指定这个链接由哪个processor线程处理processor = synchronized {//从processors中取出一个processor,实现轮询的效果,currentProcessorIndex = currentProcessorIndex % processors.lengthprocessors(currentProcessorIndex)}currentProcessorIndex += 1//如果分配到Processor,将对应的SocketChannel对象放入newConnections集合} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
def accept(socketChannel: SocketChannel,mayBlock: Boolean,acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {val accepted = {//向队列中放入指定的SocketChannelif (newConnections.offer(socketChannel))true//判断是否有可用的Processor线程else if (mayBlock) {val startNs = time.nanoseconds//将SocketChannel放入队列newConnections.put(socketChannel)acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)true} elsefalse}if (accepted)wakeup()accepted}
Acceptor线程是在服务端程序启动的时候创建和启动的
每个Acceptor线程默认管理了3个Processor线程对象
Acceptor线程通过selector接收客户端的连接事件
一旦有连接事件发生,就创建一个SocketChannel对象
通过轮询的方式,将创建的SocketChannel对象分配给自己管理的Processor线程
每个Processor线程有一个保存SocketChannel的阻塞队列newConnections,该队列的容量为固定的20
综上,Acceptor线程做的事比较简单:接收客户端连接请求,创建对应的SocketChannel并轮询交给Processor线程处理
文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




