current position:Home>[kafka principle] Consumer offset __consumer_offsets_ related analysis

[kafka principle] Consumer offset __consumer_offsets_ related analysis

2022-08-06 19:32:14InfoQ

我们在kafka的log文件中发现了还有很多以 
__consumer_offsets_
的文件夹;总共50个;
由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即
__consumer_offsets
 topic,并且默认提供了
kafka_consumer_groups.sh
脚本供用户查看consumer信息.
__consumer_offsets
 是 kafka 自行创建的,和普通的 topic 相同.它存在的目的之一就是保存 consumer 提交的位移.
__consumer_offsets
 的每条消息格式大致如图所示
null
可以想象成一个 KV 格式的消息,key 就是一个三元组:
group.id+topic+分区号
,而 value 就是 offset 的值.
考虑到一个 kafka 生成环境中可能有很多
consumer
 和 
consumer group
,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 
group.id
做哈希求模运算
Math.abs(groupID.hashCode()) % numPartitions
,从而将负载分散到不同的 __consumer_offsets 分区上.
一般情况下,当集群中第一次有消费者消费消息时会自动创建
__consumer_offsets
,它的副本因子受 
offsets.topic.replication.factor
 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 
offsets.topic.num.partitions
 参数设置,默认值为50.

1. 消费Topic消息

打开一个session a,执行下面的消费者命令 ;指定了
消费组:szz1-group
topic:szz1-test-topic
bin/kafka-console-consumer.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic

2.产生消息

打开一个新的session b,执行生产消息命令
bin/kafka-console-producer.sh --broker-list  xxx1:9092,xxx2:9092,xxx3:9092  --topic szz1-test-topic
发送几条消息
null
然后可以看到刚刚打开的 session a 消费了消息;
null

3. 查看指定消费组的消费位置offset

bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group

null
可以看到图中 展示了每个
partition
 对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个
partition
;
CURRENT-OFFSET: 当前消费组消费到的偏移量
LOG-END-OFFSET: 日志最后的偏移量
CURRENT-OFFSET
 = 
LOG-END-OFFSET
 说明当前消费组已经全部消费了;
那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;
null
我发送了2条消息之后, 
partition-0
 
partition-1
 的
LOG-END-OFFSET: 日志最后的偏移量
分别增加了1; 但是
CURRENT-OFFSET: 当前消费组消费到的偏移量
 保持不变;因为没有被消费;
重新打开一个消费组 继续消费
*
重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了
null

4. 从头开始消费 --from-beginning

如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; That is to say, history will not be consumed. For example, we will open a new one below.session c ;消费组设置为
szz1-group3
bin/kafka-console-consumer.sh --bootstrap-server   xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3    --topic szz1-test-topic
查看消费情况
 bin/kafka-consumer-groups.sh --bootstrap-server  xxx1:9092,xxx2:9092,xxx3:9092  --describe --group szz1-group3

null
可以看到
CURRENT-OFFSET
 = 
LOG-END-OFFSET
 ;
如何让新的消费组/者 从头开始消费呢? 加上参数 
--from-beginning

5.如何确认 consume_group 在哪个__consumer_offsets-? 中

Math.abs(groupID.hashCode()) % numPartitions

6. 查找__consumer_offsets 分区数中的消费组偏移量offset

上面的 
3. 查看指定消费组的消费位置offset
 中,我们知道如何查看指定的topic消费组的偏移量;那还有一种方式也可以查询
先通过 
consume_group
 确定分区数; 例如 
"szz1-group".hashCode()%50=32
; 那我们就知道 
szz-group
消费组的偏移量信息存放在 
__consumer_offsets_32
中;通过命令
 bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

null
前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等
然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset;

7 .查询topic的分布情况

bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称


日常运维
问题排查
=> 
滴滴开源LogiKM一站式Kafka监控与管控平台


null
在这里插入图片描述

copyright notice
author[InfoQ],Please bring the original link to reprint, thank you.
https://en.cdmana.com/2022/218/202208061921177127.html

Random recommended