走进Kafka系列1 consumer group
走进Kafka系列1 consumer group
本篇文章基于Kafka3.0,为Kafka系列的开篇之作,接下来的一段时间内,会着重投入研究Kafka。本期讲解的是consumer group概念,可以算是Kafka中比较重要的一个部分。
1 Kafka基础概念
最基础的概念,因为本篇重点是consumer group,至于其他内容的详尽介绍在之后的文章中会提及。
一个Kafka集群中有若干的broker(基础组成单位),而producer和consumer则是使用了Kafka协议的外部调用者。
Kafka为了维护consumer 与 集群之间的关系,抽象出了一个consumer group的概念(这在MQ中是个非常非常棒的概念,他可以使得开发者可以自由控制广播还是单播) 因为consumer group是个组的概念,所以在其内部也有选举、同步、故障恢复等一系列的措施存在,这也同样提供给了消费者开箱即用的高可用能力。所以下面来具体看下consumer group的实现原理
2 consumer加入group流程
当一个新的consumer要连接集群时,首先会向集群发送requestApiKeyName = METADATA 和 FIND_COORDINATOR 的请求,METADATA是获取topic的元数据信息,以及这个topic的leader信息。
FIND_COORDINATOR是为了获取consumer group对应的Coordinator元数据(地址、端口),然后与其建立链接进行通信。
拿到topic以及Coordinator的元数据以后,开始进行加入group的操作。
在Kafka中,consumer加入group的总体过程如下
1.consumerA 向组所对应的Coordinator发送joinGroup请求,申请加入group。
2.由于group还没有创建,要先创建group,并且指定consumerA为组leader(第一个申请加入组的consumer会被选为leader)。组有了leader以后就能开始初始化流程了,这里Kafka使用了延迟任务,尽可能等到足够多的组员都发起joinGroup了才开始进行rebalance,因为rebalance是个比较耗性能的操作。
每个consumer加入进来,Coordinator都会注册一个DelayedOperation任务,但是他们的key都是相同的,使用的都是 join-<group名称>,也就意味着,实际上Kafka的延迟队列里只会存在一个延时任务(Kafka的purgatory后面再写,因为比较复杂,并不是这里的重点)。
当DelayedOperation到达超时时间后,Coordinator回向现有的组成员返回joinGroup请求的响应,并向leader发送所有的组成员信息。
3.leader(consumerA)收到joinGroup响应,会使用组成员信息以及设定的分配方案来进行分区的分配。之后就会向Coordinator发起SyncGroup请求并发送分区的分配结果,再由Coordinator向所有的组员发送各自的分区信息syncGroupResponse。
4.Coordinator向所有组成员广播完分配结果后,组状态变为Stable开始对所有的Consumer提供正常服务。
源代码具体位置可参考附录1
2.1 consumer分区概念
在Kafka的一个topic中,会存在分区的概念,分区数可以由开发者配置指定,并且在之后还可以进行动态添加、删除。
consumer会在consumer group正常运行后,被分配到指定的分区(一个分区不会分配到两个consumer),consumer会顺序拉取分区内的消息,也就是单个分区内顺序消费。
所以在消费者看来,分区是个很重要的信息,才会需要这么复杂的joinGroup流程来确保分区能正常分配到组内的每个consumer。
2.2 consumer group状态图
图上显示的还只是正常情况的状态流转,事实中还存在很多异常场景,最典型的就是网络抖动或者延迟而导致Coordinator在错误的状态下收到了consumer的回应。对于这种状况怎么处理呢?Kafka主要采取了以下两种简单的做法
- 向consumer返回错误,让其重试。
- 直接让consumer触发rejoin流程。
以上两种都会让group进行重平衡(Rebalance),来使整个group数据重新同步,保持一致性。
2.3 group Rebalance
当组内的成员变动、分区变动、offset提交等操作引起的超时、心跳超时都会触发group的Rebalance机制,来让消费者们的状态有一致性保障(可以理解为碰到问题就重启)
比如说有一个成员A宕机,这时会触发以下流程
Coordinator与A心跳超时,将A移出group
更改group状态为PreparingRebalance,并发起一个joinGroup请求用于触发group的后续流程。
如果组内存在其他组员B,则在B与Coordinator的下次心跳请求时,Coordinator会给B发送errorCode = 27,告知B要重新加入group。
当组内不存在其他组员时,则group状态会回退到Empty
之后就简单了,就是加入组的流程再走一遍即可。
附录
附录1 加入组关键源代码路径
- GroupCoordinator处理joinGroup请求 kafka.server.KafkaApis#handleJoinGroupRequest
- GroupCoordinator返回响应joinGroup kafka.coordinator.group.GroupCoordinator#onCompleteJoin
- GroupCoordinator处理SyncGroup请求 kafka.server.KafkaApis#handleSyncGroupRequest
- GroupCoordinator向组员广播分配方案 kafka.coordinator.group.GroupCoordinator#setAndPropagateAssignment
附录2 客户端请求类型
当前版本kafka客户端请求的类型(requestApiKey)参考org.apache.kafka.common.message.ApiMessageType
附录3 Kafka错误枚举
org.apache.kafka.common.protocol.Errors
1 | REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.", |