走进Kafka系列3 创建topic流程

    上篇文章描述了集群选举的一些概念,本章开始已创建topic为例,讲述一下元数据变更在不同集群间的同步过程。

1 前言

是真的没有想到创建一个topic流程居然如此复杂,涉及到了Kafka的很多核心概念,所以理解起来会比较吃力一点。

2 创建topic流程

在Kafka中,创建topic总体过程如下(一如既往一图流)

image-20230602151608686

当client端执行创建命令后

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic kraft1 --partitions 2 -replication-factor 2

集群leader会根据创建命令,预生成对应的topic和partition的records,随后对records生成event事件投递到队列中开始等待执行。

当event开始执行时,首先会写入到本地的元数据中(也就是之前提到的__cluster_metadata队列),从这里开始就可以分成两部分讲解了。

  1. leader写入元数据后,随后会将topic和partition应用到内存中,并开始对topic和partition进行初始化操作。最后等待当前leader的offset达到指定位置后,向客户端返回创建结果。

    等待offset:比方说上述例子,创建了一个topic和两个partition,也就是一共3条record元数据,假设一开始leader的hw = 5,则预期中topic和partition同步完成后,hw应该为8。而这个等待就是等到hw = 8的时刻进行回调。

  2. 当leader写入元数据时,follow还在不停地fetch元数据,当follow fetch到这3个record以后,就会持久化到本地的__cluster_metadata队列,并且推进hw的更新(详见上篇文章)。这样一来就完成了创建的topic动作向整个集群的同步。

3 topic相关概念

3.1 ISR

在Kafka的一个topic中,会存在分区的概念,分区数可以由开发者配置指定,并且在之后还可以进行动态添加、删除。

同时在每个分区中又存在副本的概念(为了保障高可用),副本分为了leader副本和follower副本,follower副本会不间断的向leader副本fetch数据,来保障不同副本间的数据一致性。

Kafka使用ISR机制来保障单个分区的HA,核心理念为:只有数据比较全的副本才能去竞争成为leader,而这些副本就是ISR

ISR内的所有副本需要至少满足以下两个条件中的一个

  1. 和leader的LEO一致
  2. 最近一次fetch数据的间隔小于 config.replicaLagTimeMaxMs

如果都不满足,则副本会被移除ISR队列,反之如果有新的分区满足条件也会被添加到ISR中。

剔除落后的副本

在ReplicaManager运行时,会启动一个轮询器,定时轮询每个topic来清除不满足条件的ISR分区。

scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)

添加满足条件的副本

当leader收到了follower的fetch请求时,会触发判断follower副本的LEO是否大于leader的HW,如果大于则该副本将会添加到ISR中,并且向其他controller发送ISR变更事件进行同步。