Kafka-服务端-网络层-源码流程

整体架构如下所示:

在这里插入图片描述

responseQueue不在RequestChannel中,在Processor中,每个Processor内部有一个responseQueue

  1. 客户端发送的请求被Acceptor转发给Processor处理
  2. 处理器将请求放到RequestChannel的requestQueue中
  3. KafkaRequestHandler取出requestQueue中的请求
  4. 调用KafkaApis进行业务逻辑处理
  5. KafkaApis将响应结果放到对应的Processor的responseQueue中
  6. processor从responseQueue中取出响应结果
  7. processor将响应结果返回给客户端

KafkaServer是Kafka服务端的主类,KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议,具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。

class KafkaServer(val config: KafkaConfig) {
       def startup() {
       	socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startupProcessors = false)
                /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, ...)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, ...)
          }
     }

SocketServer

  def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      ...
      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
      if (startupProcessors) {
        startProcessors()
      }
    }
      
  private def createAcceptorAndProcessors(processorsPerListener: Int,
                                          endpoints: Seq[EndPoint]): Unit = synchronized {
	...
    endpoints.foreach { endpoint =>
	  ...
      val acceptor = new Acceptor(endpoint, ...)
      addProcessors(acceptor, endpoint, processorsPerListener)
      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
      acceptor.awaitStartup()
      acceptors.put(endpoint, acceptor)
    }
  }

可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor,每个acceptor关联数个processor。这是一种典型的Reactor模式,acceptor负责与客户端建立连接,并将连接分发给processor,processor负责所分连接后续的所有读写交互。

Acceptor

  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable) {
                  val processor = synchronized {
                    currentProcessor = currentProcessor % processors.size
                    processors(currentProcessor)
                  }
                  accept(key, processor)
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread, mod(numProcessors) will be done later
                currentProcessor = currentProcessor + 1
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
          // to a select operation on a specific channel or a bad request. We don't want
          // the broker to stop responding to requests from other clients in these scenarios.
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }

上面是Acceptor的run()方法,可以看出,Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件(OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,客户端监听OP_CONNECT事件,负责发起连接,服务端监听OP_CONNECT事件,负责建立连接),负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。

Processor

每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示:

  override def run() {
    startupComplete()
    try {
      while (isRunning) {
        try {
          // 在新分到的客户端连接上注册OP_READ事件
          configureNewConnections()
          // 从responseQueue中取响应,赋值给KafkaChannel的send,等待poll时发送
          processNewResponses()
          // selector轮询各种事件,读取请求或者发送响应
          poll()
          // 封装selector.completedReceives中的请求,放入requestQueue
          processCompletedReceives()
          // 处理selector.completedSends响应(移除inflightResponses中的记录;执行响应的回调函数)
          processCompletedSends()
          processDisconnected()
        } catch {
          ...
        }
      }
    } finally {
      ...
    }
  }

Processor线程的名字中有kafka-network字样,可以通过jstack -l pid | grep kafka-network进行筛选。

KafkaRequestHandlerPool

KafkaServer会创建请求处理线程池(KafkaRequestHandlerPool),在请求处理线程池中会创建并启动多个请求处理线程(KafkaRequestHandler)。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理,在内部实际处理会交给KafkaApis完成。

class KafkaRequestHandlerPool(val brokerId: Int, ...) {
  ...
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(..., requestChannel, apis, time)
    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
  }
}

KafkaRequestHandler的run()方法如下:

class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {
  ...
  def run() {
    while (!stopped) {

      val req = requestChannel.receiveRequest(300)

      req match {
        case RequestChannel.ShutdownRequest =>
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            apis.handle(request)
          } catch {
            case e: FatalExitError =>
              shutdownComplete.countDown()
              Exit.exit(e.statusCode)
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            request.releaseBuffer()
          }

        case null => // continue
      }
    }
    shutdownComplete.countDown()
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/763616.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python:Python简介

一、Python简介 1.Python的诞生 诞生&#xff1a;1989年圣诞节期间&#xff0c;Guido van Rossum为了打发圣诞节假期的无聊&#xff0c;便开始了Python语言的编写。 命名&#xff1a;Python第一个发行版本是在1991年&#xff0c;起名为Python是源自于Guido喜欢的一档电视节目…

英伟达经济学:云服务商在GPU上每花1美元 就能赚7美元

NVIDIA超大规模和 HPC 业务副总裁兼总经理 Ian Buck 近日在美国银行证券 2024 年全球技术大会上表示&#xff0c;客户正在投资数十亿美元购买新的NVIDIA硬件&#xff0c;以跟上更新的 AI 大模型的需求&#xff0c;从而提高收入和生产力。 Buck表示&#xff0c;竞相建设大型数据…

在 PostgreSQL 中强制执行连接顺序#postgresql认证

让我们首先创建一些表&#xff1a; PgSQL plan# SELECT CREATE TABLE x || id || (id int) FROM generate_series(1, 5) AS id;?column? --------------------------CREATE TABLE x1 (id int)CREATE TABLE x2 (id int)CREATE TABLE x3 (id int)CREATE TABLE…

Centos7网络配置(设置固定ip)

文章目录 1进入虚拟机设置选中【网络适配器】选择【NAT模式】2 进入windows【控制面板\网络和 Internet\网络和共享中心\更改适配器设置】设置网络状态。3 设置VM的【虚拟网络编辑器】4 设置系统网卡5 设置虚拟机固定IP 刚安装完系统&#xff0c;有的人尤其没有勾选自动网络配置…

解锁机器学习算法面试挑战课程

在这个课程中&#xff0c;我们将从基础知识出发&#xff0c;系统学习机器学习与算法的核心概念和实践技巧。通过大量案例分析和LeetCode算法题解&#xff0c;帮助您深入理解各种面试问题&#xff0c;并掌握解题技巧和面试技巧。无论是百面挑战还是LeetCode算法题&#xff0c;都…

VUE3解决跨域问题

本文基于vue3 vite element-plus pnpm 报错&#xff1a;**** has been blocked by CORS policy: No Access-Control-Allow-Origin header is present on the requested resource. 原因&#xff1a;前端不能直接访问其他IP&#xff0c;需要用vite.config.ts &#xff0…

仿美团饿了么程序,外卖人9.0商业版外卖订餐源码(PC+微信)

仿美团饿了么程序,外卖人9.0外卖订餐源码,PC微信WAP短信宝,多城市多色版 非常不错的独立版外卖跑腿网站源码&#xff0c;喜欢的可以下载调试看看吧&#xff01;&#xff01; 仿美团饿了么程序,外卖人9.0外卖订餐源码

鸿蒙开发Ability Kit(程序访问控制):【向用户申请单次授权】

申请使用受限权限 受限开放的权限通常是不允许三方应用申请的。当应用在申请权限来访问必要的资源时&#xff0c;发现部分权限的等级比应用APL等级高&#xff0c;开发者可以选择通过ACL方式来解决等级不匹配的问题&#xff0c;从而使用受限权限。 举例说明&#xff0c;如果应…

【面试干货】Static关键字的用法详解

【面试干货】Static关键字的用法详解 1、Static修饰内部类2、Static修饰方法3、Static修饰变量4、Static修饰代码块5、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java编程语言中&#xff0c;static是一个关键字&#xff0c;它可…

【多模态LLM】以ViT进行视觉表征的多模态模型1(BLIP-2、InstructBLIP)

note CLIP和BLIP的区别&#xff1a; CLIP&#xff1a;通过对比学习联合训练&#xff0c;预测图像和文本之间的匹配关系。即使用双塔结构&#xff0c;分别对图像和文本编码&#xff0c;然后通过计算cos进行图文匹配。BLIP&#xff1a;包括两个单模态编码器&#xff08;图像编码…

【TB作品】温湿度监控系统设计,ATMEGA16单片机,Proteus仿真

题2:温湿度监控系统设计 功能要求: 1)开机显示时间(小时、分)、时分可修改; 2)用两个滑动变阻器分别模拟温度传感器(测量范 围0-100度)与湿度传感器(0-100%),通过按键 可以在数码管切换显示当前温度值、湿度值; 3)当温度低于20度时,红灯长亮; 4)当湿度高于70%时,黄灯长亮; 5)当…

win11自动删除文件的问题,安全中心提示

win11自动删除文件的问题&#xff0c;解决方法&#xff1a; 1.点击任务栏上的开始图标&#xff0c;在显示的应用中&#xff0c;点击打开设置。 或者点击电脑右下角的开始也可以 2.点击设置。也可以按Wini打开设置窗口。 3.左侧点击隐私和安全性&#xff0c;右侧点击Windows安全…

如何开启Linux内核中的debug打印信息

如何开启Linux内核中的debug打印信息 Linux 内核中&#xff0c;日志等级定义在 include/linux/kern_levels.h 文件中。数值越小等级越高。 级别 对应内核日志级别 说明 0 KERN_EMERG 紧急消息。系统崩溃之前提示&#xff0c;表示系统已不可用。 1 KERN_ALERT 报告消息。表示必…

Redis 7.x 系列【13】数据类型之地理位置(Geospatial)

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Redis 版本 7.2.5 源码地址&#xff1a;https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 常用命令2.1 GEOADD2.2 GEODIST2.3 GEORADIUS2.4 GEOPOS2.5 GEORADIUSBYMEM…

安卓实现微信聊天气泡

一搜没一个能用的&#xff0c;我来&#xff1a; 布局文件&#xff1a; <?xml version"1.0" encoding"utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android"http://schemas.android.com/apk/res/android"xml…

使用Git从Github上克隆仓库,修改并提交修改

前言 本次任务主要是进行github提交修改的操作练习实践&#xff0c;本文章是对实践过程以及遇到的问题进行的一个记录。 在此之前&#xff0c;我已经简单使用过github&#xff0c;Git之前已经下好了&#xff0c;所以就省略一些步骤。 步骤记录 注册github账号&#xff0c;gi…

使用PHP解析和处理HTML/XML以创建Web爬虫的示例

使用PHP解析和处理HTML/XML以创建Web爬虫的示例 引言&#xff1a; Web爬虫是一种自动化工具&#xff0c;用于从万维网&#xff08;World Wide Web&#xff09;上抓取数据。PHP作为一种流行的服务器端脚本语言&#xff0c;具有丰富的库和功能&#xff0c;可以方便地解析和处理H…

简搭云可视化大屏设计:打造企业数据展示的新标杆

引言 在当今数字化时代&#xff0c;企业对于数据的可视化需求日益增长。如何高效、直观地展示海量数据&#xff0c;成为了企业决策者和数据分析师们关注的焦点。简搭云可视化大屏设计凭借其丰富的功能和强大的性能&#xff0c;成为了企业大屏可视化设计的首选工具。本文将为您…

Linux基础 - Ansible 服务实现自动化运维

目录 零. 简介 一. 安装 二. 模块介绍 三. 基本使用 零. 简介 Ansible 是一款极其强大且简单易用的开源 IT 自动化工具。 它的主要特点和优势包括&#xff1a; 无代理架构&#xff1a;Ansible 不需要在被管理的节点上安装代理程序&#xff0c;而是通过 SSH 协议与目标节点…

网站被浏览器提示“不安全”的解决办法

在互联网时代&#xff0c;网站的安全性直接关系到用户体验和品牌形象。当用户访问网站时&#xff0c;如果浏览器出现“您与此网站之间建立的连接不安全”的警告&#xff0c;这不仅会吓跑潜在客户&#xff0c;还可能对网站的SEO排名造成等负面影响。 浏览器发出的“不安全”警告…