走进Kafka系列2 集群选举

​ 上一篇讲的是consumer group的共识,其实顺序有点颠倒了,应该先讲集群共识的。但是因为时间关系,个人对于consumer group更熟悉一点,所以集群的选举就放到第二篇了。对于共识选举的学习建议大家从切面的角度看待,其实像ZK、Kafka、ES的选举都是有共通之处的。


1 Kraft选举实现

在Kafka3.0中的新特性就是有了Kraft模式来替代之前Zookeeper的依赖(其实从2.8开始就已经支持了)。

Zookeeper可以理解为一个分布式的KV系统,具有很高的一致性,所以普遍被用来储存元数据使用。

但因ZK自身缺陷,所以Kafka选择使用了自己内部的topic来储存元数据,在这基础上就是Kraft模式(使用了raft协议的思想)

1.1 Kraft相关基础概念

一般来说,中心化的分布式系统都需要进行集群选举来产生Leader,Leader主要作用是为了更容易控制整个集群状态的一致性。在Kraft中,把集群的元数据信息都储存到了Kafka自身的topic (__cluster_metadata) 中来摆脱对ZK的依赖性(之前是储存到ZK的)。

在Kafka的server.properties配置文件中,可以指明当前broker的角色列表

1
process.roles=broker,controller

如果角色列表中有指定controller,则表明当前broker要去参与选举。

1.2 Kafka选举相关数据结构

先大致分类一下,选举过程中的几个重要部分。

QuorumState

QuorumState在选举流程中非常重要,可以理解为就是Kraft中的状态机实现。

在KafkaRaftClient刚启动的时候,会去本地磁盘中加载上次的state状态进行初始化(持久化路径 logs/__cluster_metadata-0/quorum-state)格式如下

1
{"clusterId":"","leaderId":-1,"leaderEpoch":222,"votedId":1,"appliedOffset":0,"currentVoters":[{"voterId":1},{"voterId":2}],"data_version":0}

明文储存的,大家也能自己去看,主要有当前broker上次运行的集群状态快照,比如leader是谁,集群内都有哪些成员,上次选举的轮次是什么。

Kraft主要加载这些信息是为了进行后续的状态流转,判断我当前borker刚启动时的QuorumState是什么状态。

  1. 集群最新的epoch > 本地的leaderEpoch

    说明本地数据很落后了,初始化时状态为UnattachedState,使用集群最新的epoch

  2. 如果epoch相同,leaderId是自己

    初始化时状态为ResignedState

  3. epoch相同,votedId是自己

    初始化时状态为CandidateState

  4. epoch相同,votedId不是自己且不为空

    说明上次broker退出时,集群还在选举,初始化时状态为VotedState

  5. epoch相同,leaderId不是自己且不为空

    说明上次broker退出时,集群选举成功,且自己是Follower,继续用FollowerState。

  6. 都不命中,使用最原始的状态UnattachedState。

其实都可以无脑用UnattachedState,从最初始的状态开始流转。我个人猜测这么多判断是为了性能,重新选举流程可以更快一点,直接流转到最合适的状态。

状态流转

当QuorumState初始化完成后,就开始到集群节点的状态流转了。

image-20230105194002759

  1. UnattachedState 未进入选举状态

    节点知道了当前有选举在进行,但是自己还没有加入,也是所有节点默认的初始化状态

  2. CandidateState 候选人状态

    当前节点在选举人列表中,而且没有发出过选票,则当前节点会变为CandidateState,给自己的epoch自增1,并给自己投出一票。

  3. VotedState 投票人状态

    和CandidateState状态一样,都可以表示投过票的人,而CandidateState表示是leader的候选人,投的是自己,VotedState只是表示选举人,投的是别人。

  4. LeaderState 领导者状态

    当CandidateState-候选人获得超过半数节点同意后,候选人会成为领导者。

  5. FollowerState 追随者状态

    当有leader当选后,会向所有的voters发送BeginQuorumEpochRequest通知,主要是告诉自己成为了leader。voter收到通知后,如果发现leaderId和自己的id不一致,就会把自己变成FollowerState。

VoteRequestData选票结构

在Kraft中,临时构造VoteRequestData.PartitionData来充当选票。VoteRequestData.PartitionData主要结构如下,在Kafka中,选票投出后,则无法进行更改,只能进行到下一轮选举(raft协议特色)

1
2
3
4
5
int partitionIndex;
int candidateEpoch; // CandidateState的epoch
int candidateId; // 当前broker nodeId
int lastOffsetEpoch; // 当前broker 本地磁盘日志中上一次leader选举时的epoch
long lastOffset; // 当前broker本地topic:__cluster_metadata中最大的offset

candidateEpoch:当一个broker状态变为CandidateState时(CandidateState构造函数中要传epoch),broker会把当前的epoch自增1。

broker的Quorum每次状态变化时都会持久化到本地磁盘中,来保证epoch的自增不会发生意外而丢失。

lastOffset每次集群元数据有变动时(集群自身的选举操作、对topic的操作)都会增加,主要用于保证当选的leader要是最新的节点(offset最大)

1.2 选举异常情况

  1. 任一节点超过quorum.election.timeout.ms时间后,当前节点没有收到多数选票,则开始新一轮选举。新一轮选举同时收quorum.election.backoff.max.ms配置影响,延迟随机时间开始,防止选举僵局。
  2. 当节点成为leader后,如果超过quorum.fetch.timeout.ms时间,没有收到大多数follwer的fetch请求,则会重新触发选举,来避免leader僵死情况发生(因为是follwer向leader fetch的,leader并不能感知自己已经被踢出集群了,可以认为fetch操作就是心跳)
  3. follower超过quorum.fetch.timeout.ms时间没有收到leader的fetchResponse响应,则开始新一轮选举。

2. 选举后集群元数据同步

Kafka中,所有topic中的消息都是保存在本地磁盘上的,在Kafka系统内也称为log(更多的关于log的描述可以参看以后的文章)

而Kafka的集群元数据在Kraft协议中是使用名称为__cluster_metadata的topic来保持集群间内数据一致的,所以要简单介绍一下Kafka topic的几个概念

  • COMMITTED & UNCOMMITTED :和大多数分布式系统一样,只有一条log被多数节点拉取到后,才能认为该log是COMMITTED(已提交)是经过共识的数据,否则就是UNCOMMITTED(未提交)状态。

  • LEO:log end offset,单个节点的topic中最后一条记录的位置,包含未提交和已提交的数据。

  • HW:highWatermark,已提交的记录中最后一条记录的位置。

同时,在leaderState中,有两个数据结构保存了集群中其他节点抓取的数据情况(voterStates 和 observerStates),用于帮助判断HW。

1
2
3
private Optional<LogOffsetMetadata> highWatermark;
private final Map<Integer, ReplicaState> voterStates = new HashMap<>();
private final Map<Integer, ReplicaState> observerStates = new HashMap<>();

在ReplicaState中主要就是记录了节点fetch数据的一些信息

1
2
3
4
final int nodeId; // 节点id
Optional<LogOffsetMetadata> endOffset; // 节点的LEO位置
OptionalLong lastFetchTimestamp;
boolean hasAcknowledgedLeader;

2.1 元数据同步流程

image-20230117144700666

​ 在某时刻,集群(id = 1、id = 2、id = 3)进行新的选举状态,选举了新leader(id = 1)

此时leader下的voterStates,肯定只有leader在,因为还没有感知到2节点、3节点的位点信息,所以leader的HW = 0,LEO = 5。

image-20230117144627006

第一次fetch : 当2节点成为Follower后,则会开始无间隔的从leader拉取集群元数据信息,也就是fetch操作。2节点进行fetch操作时,会带上自己的LEO信息(LEO = 2),也代表follower要从leader的元数据topic-offset = 2的位置开始拉取数据。此时leader向2节点返回三条数据。虽然此时voterStates已经过半节点有值了,但是节点2的offset不是当前leader任期内产生的数据,所以leader不会更新HW

第二次fetch : 本次fetch,2节点的LEO信息为5(LEO = 5),所以leader下的voterStates再次更新位点信息。

此时,过半节点已经收到了offset = 5,并且这个记录还是在任期内产生的,所以leader更新hw = 5,并向节点2返回本次的HW。节点2收到新的HW,也要更新自己的HW。