current position:Home>[kafka principle] Consumer offset __consumer_offsets_ related analysis
[kafka principle] Consumer offset __consumer_offsets_ related analysis
2022-08-06 19:32:14【InfoQ】
我们在kafka的log文件中发现了还有很多以
__consumer_offsets_
的文件夹;总共50个;
由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即
__consumer_offsets
topic,并且默认提供了
kafka_consumer_groups.sh
脚本供用户查看consumer信息.
__consumer_offsets
是 kafka 自行创建的,和普通的 topic 相同.它存在的目的之一就是保存 consumer 提交的位移.
__consumer_offsets
的每条消息格式大致如图所示

可以想象成一个 KV 格式的消息,key 就是一个三元组:
group.id+topic+分区号
,而 value 就是 offset 的值.
考虑到一个 kafka 生成环境中可能有很多
consumer
和
consumer group
,如果这些 consumer 同时提交位移,则必将加重 __consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个
group.id
做哈希求模运算
Math.abs(groupID.hashCode()) % numPartitions
,从而将负载分散到不同的 __consumer_offsets 分区上.
一般情况下,当集群中第一次有消费者消费消息时会自动创建
__consumer_offsets
,它的副本因子受
offsets.topic.replication.factor
参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过
offsets.topic.num.partitions
参数设置,默认值为50.
1. 消费Topic消息
打开一个session a,执行下面的消费者命令 ;指定了
消费组:szz1-group
;
topic:szz1-test-topic
bin/kafka-console-consumer.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic
2.产生消息
打开一个新的session b,执行生产消息命令
bin/kafka-console-producer.sh --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --topic szz1-test-topic
发送几条消息

然后可以看到刚刚打开的 session a 消费了消息;

3. 查看指定消费组的消费位置offset
bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group

可以看到图中 展示了每个
partition
对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个
partition
;
CURRENT-OFFSET: 当前消费组消费到的偏移量
LOG-END-OFFSET: 日志最后的偏移量
CURRENT-OFFSET
=
LOG-END-OFFSET
说明当前消费组已经全部消费了;
那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看;

我发送了2条消息之后,
partition-0
partition-1
的
LOG-END-OFFSET: 日志最后的偏移量
分别增加了1; 但是
CURRENT-OFFSET: 当前消费组消费到的偏移量
保持不变;因为没有被消费;
重新打开一个消费组 继续消费
*
重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了

4. 从头开始消费 --from-beginning
如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; That is to say, history will not be consumed. For example, we will open a new one below.session c ;消费组设置为
szz1-group3
bin/kafka-console-consumer.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3 --topic szz1-test-topic
查看消费情况
bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group3

可以看到
CURRENT-OFFSET
=
LOG-END-OFFSET
;
如何让新的消费组/者 从头开始消费呢? 加上参数
--from-beginning
5.如何确认 consume_group 在哪个__consumer_offsets-? 中
Math.abs(groupID.hashCode()) % numPartitions
6. 查找__consumer_offsets 分区数中的消费组偏移量offset
上面的
3. 查看指定消费组的消费位置offset
中,我们知道如何查看指定的topic消费组的偏移量;那还有一种方式也可以查询
先通过
consume_group
确定分区数; 例如
"szz1-group".hashCode()%50=32
; 那我们就知道
szz-group
消费组的偏移量信息存放在
__consumer_offsets_32
中;通过命令
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等
然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset;
7 .查询topic的分布情况
bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称
日常运维
、
问题排查
=>
滴滴开源LogiKM一站式Kafka监控与管控平台

在这里插入图片描述
copyright notice
author[InfoQ],Please bring the original link to reprint, thank you.
https://en.cdmana.com/2022/218/202208061921177127.html
The sidebar is recommended
- Vue family bucket - Vue-CLI2 and Vue-CLI3 hands-on teaching
- [Tips] Mac uses commands to view the size of sub-files or sub-directories in a directory
- Why build an index?
- Descartes set type and what is the effect of quantization coding?
- [Written in the romantic moment of Qixi Lang] The solution for obtaining data when encountering http codes 206 and 302 in Go
- [Operating System] Process Creation and Destruction
- AQS synchronization component - CountDownLatch analysis and case
- Why is there an index in quantization coding?
- What is the linear combination type in quantization coding?
- Arduino Painless Development _LED8*8 Dot Matrix Experiment (Detailed)
guess what you like
element ui table changes the default style, removes the border and changes the table background color
Data Structure ----- Quick Sort
Node.js test SMTP service
Create Nginx docker container reverse proxy https
Python batch get gitlab project code
Do phrases created by the second-class dictionary method have to have a specific meaning?
How do I select the quantitative method for the quantitative characteristics of the coding?
What is the result after straight-sum quantization?
What are the types of high-dimensional indexes?
Back-end writing Swagger interface management documentation
Random recommended
- Windows use Telnet to test smtp
- Docker - way to modify folder mapping
- 10 easy-to-use software on mac
- SSL/TLS protocol operating mechanism in https protocol
- What is the certificate chain of trust for HTTPS?Can't you publish it yourself?
- Nginx error 413 Request Entity Too Large solution
- js data manipulation problem solving?
- After changing the scale of the screen and the ratio of the layout, the Vue project feels very slow to change the transparency of the image?
- Hand in hand with you to get started weback4.0 (1)
- How to pass the data obtained by nodejs to the front desk for use (keyword - system file)
- Chapter 24 How much do you know about proxy knowledge in Spring AOP
- The prize pool experience is bad, very dark
- C + + string container
- RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn
- The vmware virtual machine is disconnected from the network (nat network) after a period of time
- RuntimeError: module compiled against API version 0xa but this version of numpy is 0x9
- Installing vivado2019.1 is always showing "There is no valid Xilinx installation that this Update can be applied to"
- What are the characteristics of the run-length encoding algorithm?
- Question about pygame
- Self-learning crawler encounters a bottleneck, hoping to get some advice
- Webpack5 packaging process source code analysis (1)
- How to understand the memory analysis of executing another method in a JAVA method.
- HTML5 and CSS web material download
- Chapter 215 Aspect-Oriented Programming Spring AOP Actual Configuration
- ASUS laptop software automatically goes to the recycle bin
- Tag attributes you don't know
- usgs download sentinel2
- andriodstudio packaging process without the steps in the tutorial
- msntfs can not be used!
- Chapter XXVIII Aspect-Oriented Programming of AOP Configuration Based on Spring Annotations
- Question about #vsuninstall#, how to solve it?
- Vue + Element tree form implement drag-and-drop sequence
- Are there any abnormal programs that ZTE R5300G4 server will start at 3:00 am every Saturday?Why does this time cause disk exception
- The error "There is no valid Xilinx installation that this Update can be applied to" has been reported when installing vivado2019.1.
- WeChat Mini Program - Simple Diet Recommendation (3)
- Failed to change color in vscode
- Chapter 217 Play Spring5.X Xml configuration conversion to advanced annotation configuration
- Chapter 216 Play with Spring5.X Xml configuration conversion to annotation configuration
- Linux Network Learning Part VII: Detailed Explanation of IP Protocol + Data Link Layer
- Advanced IO for Linux Learning: Five IO Models