走进Kafka系列2 集群选举
走进Kafka系列2 集群选举
上一篇讲的是consumer group的共识,其实顺序有点颠倒了,应该先讲集群共识的。但是因为时间关系,个人对于consumer group更熟悉一点,所以集群的选举就放到第二篇了。对于共识选举的学习建议大家从切面的角度看待,其实像ZK、Kafka、ES的选举都是有共通之处的。
1 Kraft选举实现
在Kafka3.0中的新特性就是有了Kraft模式来替代之前Zookeeper的依赖(其实从2.8开始就已经支持了)。
Zookeeper可以理解为一个分布式的KV系统,具有很高的一致性,所以普遍被用来储存元数据使用。
但因ZK自身缺陷,所以Kafka选择使用了自己内部的topic来储存元数据,在这基础上就是Kraft模式(使用了raft协议的思想)
1.1 Kraft相关基础概念
一般来说,中心化的分布式系统都需要进行集群选举来产生Leader,Leader主要作用是为了更容易控制整个集群状态的一致性。在Kraft中,把集群的元数据信息都储存到了Kafka自身的topic (__cluster_metadata) 中来摆脱对ZK的依赖性(之前是储存到ZK的)。
在Kafka的server.properties配置文件中,可以指明当前broker的角色列表
1 | process.roles=broker,controller |
如果角色列表中有指定controller,则表明当前broker要去参与选举。
1.2 Kafka选举相关数据结构
先大致分类一下,选举过程中的几个重要部分。
QuorumState
QuorumState在选举流程中非常重要,可以理解为就是Kraft中的状态机实现。
在KafkaRaftClient刚启动的时候,会去本地磁盘中加载上次的state状态进行初始化(持久化路径 logs/__cluster_metadata-0/quorum-state)格式如下
1 | {"clusterId":"","leaderId":-1,"leaderEpoch":222,"votedId":1,"appliedOffset":0,"currentVoters":[{"voterId":1},{"voterId":2}],"data_version":0} |
明文储存的,大家也能自己去看,主要有当前broker上次运行的集群状态快照,比如leader是谁,集群内都有哪些成员,上次选举的轮次是什么。
Kraft主要加载这些信息是为了进行后续的状态流转,判断我当前borker刚启动时的QuorumState是什么状态。
集群最新的epoch > 本地的leaderEpoch
说明本地数据很落后了,初始化时状态为UnattachedState,使用集群最新的epoch
如果epoch相同,leaderId是自己
初始化时状态为ResignedState
epoch相同,votedId是自己
初始化时状态为CandidateState
epoch相同,votedId不是自己且不为空
说明上次broker退出时,集群还在选举,初始化时状态为VotedState
epoch相同,leaderId不是自己且不为空
说明上次broker退出时,集群选举成功,且自己是Follower,继续用FollowerState。
都不命中,使用最原始的状态UnattachedState。
其实都可以无脑用UnattachedState,从最初始的状态开始流转。我个人猜测这么多判断是为了性能,重新选举流程可以更快一点,直接流转到最合适的状态。
状态流转
当QuorumState初始化完成后,就开始到集群节点的状态流转了。
UnattachedState 未进入选举状态
节点知道了当前有选举在进行,但是自己还没有加入,也是所有节点默认的初始化状态
CandidateState 候选人状态
当前节点在选举人列表中,而且没有发出过选票,则当前节点会变为CandidateState,给自己的epoch自增1,并给自己投出一票。
VotedState 投票人状态
和CandidateState状态一样,都可以表示投过票的人,而CandidateState表示是leader的候选人,投的是自己,VotedState只是表示选举人,投的是别人。
LeaderState 领导者状态
当CandidateState-候选人获得超过半数节点同意后,候选人会成为领导者。
FollowerState 追随者状态
当有leader当选后,会向所有的voters发送BeginQuorumEpochRequest通知,主要是告诉自己成为了leader。voter收到通知后,如果发现leaderId和自己的id不一致,就会把自己变成FollowerState。
VoteRequestData选票结构
在Kraft中,临时构造VoteRequestData.PartitionData来充当选票。VoteRequestData.PartitionData主要结构如下,在Kafka中,选票投出后,则无法进行更改,只能进行到下一轮选举(raft协议特色)
1 | int partitionIndex; |
candidateEpoch:当一个broker状态变为CandidateState时(CandidateState构造函数中要传epoch),broker会把当前的epoch自增1。
broker的Quorum每次状态变化时都会持久化到本地磁盘中,来保证epoch的自增不会发生意外而丢失。
lastOffset每次集群元数据有变动时(集群自身的选举操作、对topic的操作)都会增加,主要用于保证当选的leader要是最新的节点(offset最大)
1.2 选举异常情况
- 任一节点超过quorum.election.timeout.ms时间后,当前节点没有收到多数选票,则开始新一轮选举。新一轮选举同时收quorum.election.backoff.max.ms配置影响,延迟随机时间开始,防止选举僵局。
- 当节点成为leader后,如果超过quorum.fetch.timeout.ms时间,没有收到大多数follwer的fetch请求,则会重新触发选举,来避免leader僵死情况发生(因为是follwer向leader fetch的,leader并不能感知自己已经被踢出集群了,可以认为fetch操作就是心跳)
- follower超过quorum.fetch.timeout.ms时间没有收到leader的fetchResponse响应,则开始新一轮选举。
2. 选举后集群元数据同步
Kafka中,所有topic中的消息都是保存在本地磁盘上的,在Kafka系统内也称为log(更多的关于log的描述可以参看以后的文章)
而Kafka的集群元数据在Kraft协议中是使用名称为__cluster_metadata的topic来保持集群间内数据一致的,所以要简单介绍一下Kafka topic的几个概念
COMMITTED & UNCOMMITTED :和大多数分布式系统一样,只有一条log被多数节点拉取到后,才能认为该log是COMMITTED(已提交)是经过共识的数据,否则就是UNCOMMITTED(未提交)状态。
LEO:log end offset,单个节点的topic中最后一条记录的位置,包含未提交和已提交的数据。
HW:highWatermark,已提交的记录中最后一条记录的位置。
同时,在leaderState中,有两个数据结构保存了集群中其他节点抓取的数据情况(voterStates 和 observerStates),用于帮助判断HW。
1 | private Optional<LogOffsetMetadata> highWatermark; |
在ReplicaState中主要就是记录了节点fetch数据的一些信息
1 | final int nodeId; // 节点id |
2.1 元数据同步流程
在某时刻,集群(id = 1、id = 2、id = 3)进行新的选举状态,选举了新leader(id = 1)
此时leader下的voterStates,肯定只有leader在,因为还没有感知到2节点、3节点的位点信息,所以leader的HW = 0,LEO = 5。
第一次fetch : 当2节点成为Follower后,则会开始无间隔的从leader拉取集群元数据信息,也就是fetch操作。2节点进行fetch操作时,会带上自己的LEO信息(LEO = 2),也代表follower要从leader的元数据topic-offset = 2的位置开始拉取数据。此时leader向2节点返回三条数据。虽然此时voterStates已经过半节点有值了,但是节点2的offset不是当前leader任期内产生的数据,所以leader不会更新HW。
第二次fetch : 本次fetch,2节点的LEO信息为5(LEO = 5),所以leader下的voterStates再次更新位点信息。
此时,过半节点已经收到了offset = 5,并且这个记录还是在任期内产生的,所以leader更新hw = 5,并向节点2返回本次的HW。节点2收到新的HW,也要更新自己的HW。