图解 Kafka 源码之服务端启动流程

从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看 Kafka 服务端是如何启动的。

从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看 Kafka 服务端是如何启动的。

图解 Kafka 源码之服务端启动流程

前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的方方面面,从今天开始,我们来深度剖析 Kafka「Controller」的底层源码实现,这是 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的流程」,看看 Kafka 服务端是如何启动的。

图解 Kafka 源码之服务端启动流程

一、总体概述

在深入剖析Kafka「Controller」之前,我想你可能或多或少会有这样的疑问:

Kafka 服务端都有哪些组件,这些组件又是通过哪个类来启动的呢?

这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动:

# 1、启动 kafka 服务命令:
bin/kafka-server-start.sh config/server.properties &

那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。

二、kafka-server-start.sh

我们来看下里面的 shell 内容,如下:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注释说明该脚本的版权信息和使用许可。
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
# 2、检查命令行参数的个数,若小于 1 则输出脚本的使用方法并退出。
base_dir=$(dirname $0)
# 3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
    export JMX_RMI_PORT="10000"
fi
# 5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、检查命令行参数中 COMMAND 变量的值是否为 -daemon,若是则将 EXTRA_ARGS 变量的值添加 -daemon 选项。同时将命令行参数向左移一位,即从 $2 开始计算参数。
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。

三、kafka.Kafka 类

「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

图解 Kafka 源码之服务端启动流程

从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。

这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {
  // 创建一个命令行参数解析器
  val optionParser = new OptionParser(false)
  // 定义 --override 选项,用于覆盖 server.properties 文件中的属性
  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
    .withRequiredArg()
    .ofType(classOf[String])
   
  // 定义 --version 选项,用于打印版本信息并退出
  optionParser.accepts("version", "Print version information and exit.")
  // 若没有提供参数或者参数包含 --help 选项,则打印用法并退出
  if (args.length == 0 || args.contains("--help")) {
    CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
  }
  // 若参数中包含 --version 选项,则打印版本信息并退出
  if (args.contains("--version")) {
    CommandLineUtils.printVersionAndDie()
  }
  // 加载 server.properties 文件中的属性到 Properties 对象中
  val props = Utils.loadProps(args(0))
  // 若提供了其他参数,则解析这些参数
  if (args.length > 1) {
    // 解析参数中的选项和参数值
    val options = optionParser.parse(args.slice(1, args.length): _*)
    // 检查是否有非选项参数
    if (options.nonOptionArguments().size() > 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
    }
    // 将解析得到的选项和参数值添加到 props 对象中
    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
  }
  // 返回解析得到的属性集合
  props
}

该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。

如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。

如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {
    val config = KafkaConfig.fromProps(props, false)
    // 直接启动定时任务、网络层、请求处理层
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = false
      )
    } else {
      // 调用 BrokerServer 等来启动网络层和请求处理层
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None
      )
    }
}

在 kafka 2.8.x 版本中 新增了raft协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。

3、main

# 2.7.x 版本源码
def main(args: Array[String]): Unit = {
  try {
    // 1、解析命令行参数,获得属性集合
    val serverProps = getPropsFromArgs(args)
    // 2、从属性集合创建 KafkaServerStartable 对象
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    try {
      // 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandler
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      // 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法
    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
    // 4、启动 KafkaServerStartable 实例
    kafkaServerStartable.startup()
    // 5、等待 KafkaServerStartable 实例终止
    kafkaServerStartable.awaitShutdown()
  }
  catch {
    // 如果有异常发生,则记录日志并退出程序
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  // 6、正常终止程序
  Exit.exit(0)
}

该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。

# 2.8.x 版本
def main(args: Array[String]): Unit = {
    // 获取Kafka服务的配置信息
    val serverProps = getPropsFromArgs(args)
    // 根据配置信息构建Kafka服务
    val server = buildServer(serverProps)
    try {
      // 注册用于记录日志的信号处理器(若实现失败则退出)
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 挂载关闭处理器,用于捕获终止信号和常规终止请求
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown() // 关闭Kafka服务
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.") // 日志记录致命错误信息
          // 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁
          Exit.halt(1)
      }
    })
    try server.startup() // 启动Kafka服务
    catch {
      case _: Throwable =>
        // 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown()
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
    server.awaitShutdown() // 等待Kafka服务关闭
    Exit.exit(0) // 调用Exit.exit()设置退出状态码
}

这里最重要的是 「第 4 步」,调用kafkaServerStartable.startup()或者 server.startup() 来启动 kafka。

这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。

四、KafkaServerStartable

「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

图解 Kafka 源码之服务端启动流程

在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。

这里我们主要来看下 Class 类代码。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  // 创建 KafkaServer 实例
  // 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
  // 启动 KafkaServer
  // startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。
  def startup(): Unit = {
    try server.startup()
    catch {
      // 如果出现异常,则记录日志并退出程序
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }
  // 关闭 KafkaServer
  // shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。
  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      // 如果出现异常,则记录日志并强制退出程序
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }
  // setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }
  // 等待 KafkaServer 退出
  // awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。
  def awaitShutdown(): Unit = server.awaitShutdown()
}

KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。

跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。

五、KafkaServer 类

Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。

「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报
                  ) extends Logging with KafkaMetricsGroup {
  //标识节点已经启动完成
  private val startupComplete = new AtomicBoolean(false)
  //标识节点正在执行关闭操作
  private val isShuttingDown = new AtomicBoolean(false)
  //标识节点正在执行启动操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主线程等待 KafkaServer 的关闭
  private var shutdownLatch = new CountDownLatch(1)
  //日志上下文
  private var logContext: LogContext = null
  var metrics: Metrics = null
  //记录节点的当前状态
  val brokerState: BrokerState = new BrokerState
  //API接口类,用于处理数据类请求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于处理控制类请求
  var controlPlaneRequestProcessor: KafkaApis = null
  //权限管理
  var authorizer: Option[Authorizer] = None
  //启动socket,监听9092端口,等待接收客户端请求 
  var socketServer: SocketServer = null
  //数据类请求处理线程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令类处理线程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null
  //副本管理器
  var replicaManager: ReplicaManager = null
  //topic增删管理器
  var adminManager: AdminManager = null
  //token管理器
  var tokenManager: DelegationTokenManager = null
  //动态配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  //分组协调器
  var groupCoordinator: GroupCoordinator = null
  //事务协调器
  var transactionCoordinator: TransactionCoordinator = null
  //集群控制器
  var kafkaController: KafkaController = null
  //定时任务调度器
  var kafkaScheduler: KafkaScheduler = null
  //集群分区状态信息缓存
  var metadataCache: MetadataCache = null
  //配额管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null
  //zk客户端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null
  def clusterId: String = _clusterId
  // Visible for testing
  private[kafka] def zkClient = _zkClient
  private[kafka] def brokerTopicStats = _brokerTopicStats
  ....
}

1、startup

该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    try {
      info("starting")
      // 是否已关闭
      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
      // 是否已启动
      if (startupComplete.get)
        return
      // 是否可以启动
      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) { // 设置broker状态为Starting
        brokerState.newState(Starting)
        /* setup zookeeper */
        // 连接ZK,并创建根节点
        initZkClient(time)
        /* initialize features */
        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
        if (config.isFeatureVersioningSupported) {
          _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
        }
        /* Get or create cluster_id */
        // 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")
        /* load metadata */
        // 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
        // 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        // 配置logger
        this.logIdent = logContext.logPrefix
        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 初始化AdminZkClient,支持动态修改配置 
        config.dynamicConfig.initialize(zkClient)
        /* start scheduler */
        // 初始化定时任务调度器
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
        kafkaScheduler.startup()
        /* create and configure metrics */
        // 创建及配置监控,默认使用JMX及Yammer Metrics
        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
        kafkaYammerMetrics.configure(config.originals)
        val jmxReporter = new JmxReporter()
        jmxReporter.configure(config.originals)
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(jmxReporter)
        val metricConfig = KafkaServer.metricConfig(config)
        val metricsContext = createKafkaMetricsContext()
        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats
        // 初始化配额管理器
        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
        // 用于保证kafka-log数据目录的存在
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        /* start log manager */
        // 启动日志管理器,kafka的消息以日志形式存储
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        // 启动日志清理、刷新、校验、恢复等的定时线程
        logManager.startup()
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        // SCRAM认证方式的token缓存
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 启动socket,监听9092端口,等待接收客户端请求 
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startProcessingRequests = false)
        /* start replica manager */
        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
        // 启动副本管理器,高可用相关
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()
        brokerToControllerChannelManager.start()
        // 将broker信息注册到ZK上
        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)
        // Now that the broker is successfully registered, checkpoint its metadata
        // 校验 broker 信息
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
        /* start token manager */
        // 启动 token 管理器
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
        tokenManager.startup()
        /* start kafka controller */
        // 启动Kafka控制器,只有 Leader 会与ZK建连
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
        kafkaController.startup()
        // admin管理器
        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 启动集群群组协调器
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 启动事务协调器
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()
        /* Get the authorizer and initialize it if one is specified.*/
        // ACL
        authorizer = config.authorizer
        authorizer.foreach(_.configure(config.originals))
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
              ep -> cs.toCompletableFuture
            }
          case None =>
            brokerInfo.broker.endPoints.map { ep =>
              ep.toJava -> CompletableFuture.completedFuture[Void](null)
            }.toMap
        }
        // 创建拉取管理器
        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
        /* start processing requests */
        // 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
        // 初始化数据类请求处理的线程池  
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          // 初始化控制类请求的 KafkaApis
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
          // 初始化控制类请求的线程池
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
        }
        Mx4jLoader.maybeLoad()
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        // 启动动态配置处理器
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
        // 启动请求处理线程
        socketServer.startProcessingRequests(authorizerFutures)
        // 更新broker状态
        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
 }

这里总结下该方法都启动了哪些组件:

  • initZkClient(time)初始化 Zk。
  • kafkaScheduler 定时器。
  • logManager 日志模块。
  • MetadataCache 元数据缓存。
  • socketServer 网络服务器。
  • replicaManager 副本模块。
  • kafkaController 控制器。
  • groupCoordinator协调器用于和ConsumerCoordinator 交互
  • transactionCoordinator事务相关
  • fetchManager 副本拉取管理器。
  • dynamicConfigManager 动态配置管理器。

2、Broker 状态

这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning:初始状态,标识当前 broker 节点未运行。
  • Starting:标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RuningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。

这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。

六、总结

这里,我们一起来总结一下这篇文章的重点。

  • 文章开头通过对「kafka-server-start.sh」内容进行剖析,引出了 「kafka.Kafka」类。
  • 在「kafka.Kafka」的 main 方法中调用了「KafkaServerStartable」尝试启动 Kafka 服务器。
  • 接着在 「KafkaServerStartable」的 startup 方法中调用了 「KafkaServer」的 startup 方法启动服务器需要的各种组件类。

图解 Kafka 源码之服务端启动流程

下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年8月14日 19:02
下一篇 2023年8月14日 19:03

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信