为了应对面试官的提问:Kafka如何选举啊😣
控制器选举
控制器作用
本质就是一个broker,负责和ZK进行主要交互,管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR(in-sync replicas)集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
- 主题管理(创建、删除、增加分区)
- 分区重分配
- 首选Leader选举
- 集群成员管理
依赖ZK自动检测新增 Broker、Broker 主动关闭及被动宕机 - 数据服务
保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新各自内存中的缓存数据。从ZK中读取元数据但是不保存状态。
选举方式
冷启动:集群中第一个启动的Broker会通过在Zookeeper中创建临时节点/controller来让自己成为控制器,其他Broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在Zookeeper中创建watch对象,便于它们收到控制器变更的通知。
首领异常:如果控制器被关闭或者与ZK段龛链接,ZK上的临时节点就会小时。集群里的其他Broker通过watch对象得到控制节点消失的通知,然后它们会尝试创建一个新的临时节点使自己成为控制器。同样的,这些broker中第一个成为新控制器的就会使得其他broker收到“节点已存在”的异常,其他broker就会再在这个新的控制器上创建watch对象。
防止脑裂:如果控制器所在broker挂掉了或者Full GC停顿时间太长超过ZK的会话时间出现假死,Kafka集群必须选举出新的控制器,但如果之前被取代的控制器又恢复正常了,它依旧是控制器身份,这样集群就会出现两个控制器,就会控制器脑裂问题。为了解决脑裂问题,ZK中还有一个与Controller有关的持久节点/controller_epoch,存放的是一个整形值的epoch number,集群中每选举一次控制器,就会通过Zookeeper创建一个数值更大的epoch number,类似于generation,如果有broker收到比这个epoch数值小的数据,就会忽略消息。
分区副本Leader选举
选举过程:当控制器发现一个broker离开集群,知悉失去首领的分区需要一个新首领
-
从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合。
-
调用配置的分区选择算法在ISR中选择分区的leader。
(尚未仔细了解,后续再研究一下这些算法的不同○| ̄|_)
- 向所有包含新Leader或现有Follower的broker发送请求,表明谁是新Leader谁是Follower
Follower如何同步Leader:Follower主动请求消息向Leader获取数据,等价于消费者获取消息的请求,拿到的偏移量是有序的。一般来讲,如果Follower在10s内没有请求任何消息,或者没有请求最新的消息,那么它就会被认为不同步的。
脏选举:Kafka broker端提供了一个参数unclean.leader.election.enable
,用于控制是否允许非同步副本参与leader选举;如果开启,则当ISR为空时就会从这些非同步的副本中选举新的leader。
首选首领(Preferred Leader)选举
目的:使得各broker在动态的变化中尽量保持均衡
首选首领:创建主题时选定每个分区的首领。在创建分区的时候,会在broker之间均衡首领使得broker间的负载得到均衡,也就是使得首领和副本尽量不在同一个broker上。
配置:auto.leader.rebalance.enable
选举过程:控制器会检查首选首领是不是当前首领,如果不是,并且该副本(首选首领的副本)是同步的,就会触发首选首领选举,让首选首领成为当前首领。
消费组Leader选举
在Kafka的消费端,会有一个消费者协调器以及消费组,组协调器(Group Coordinator)需要为消费组内的消费者选举出一个消费组的leader。
如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,选举方式如下:
1 | private val members = new mutable.HashMap[String, MemberMetadata] |
在组协调器中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。而leader的取值为HashMap中的第一个键值对的key(等同于随机)。
消费者协调器(ConsumerCoordinator)
Consumer一个私有成员变量,不同消费者之间不可见。
作用:
- 处理更新消费者缓存的 Metadata 请求
- 向组协调器发起加入消费者组的请求
- 请求离开消费者组(例如当消费者取消订阅时)
- 向组协调器发送提交偏移量的请求
- 通过一个定时的心跳检测任务来让组协调器感知自己的运行状态
- Leader消费者的Consumer
组协调器(GroupCoordinator)
服务端实例,每个Broker启动时都会创建一个组协调器实例,负责监控这个消费组里的所有消费者的心跳以及判断是否宕机然后组织Rebalance。
作用:
- 负责对其管理的组员(消费者)提交的相关请求进行处理
- 与消费者之间建立连接,并从与之连接的消费者之间选出一个 leader
- 当 leader 分配好消费者与分区的订阅关系后,会把结果发送给组协调器,组协调器再把结果返回给各个消费者
- 管理与之连接的消费者的消费偏移量的提交,将每个消费者的消费偏移量保存到kafka的内部主题中
- 通过心跳检测消费者与自己的连接状态
- 启动组协调器的时候创建一个定时任务,用于清理过期的消费组元数据以及过去的消费偏移量信息
Rebalance
- Join。所有成员都向Group Coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,Coordinator会从中选择一个Consumer担任消费组Leader的角色,并把组成员信息以及订阅信息发给消费组Leader。leader负责消费分配方案的制定。
- Sync。这一步leader开始分配消费方案,即哪个Consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给GroupCoordinator,非消费组Leader也会发SyncGroup请求,只是内容为空。GroupCoordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个Consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。