Kafka
Chapter 4. Kafka Consumers: Reading Data from Kafka
Deep Dive Into Apache Kafka | Storage Internals
How Kafka’s Storage Internals Work
Kafka消息时间戳(kafka message timestamp)
A Practical Introduction to Kafka Storage Internals
architecture
![[Pasted image 20221208220120.png]]
-
producer
-
consumer group
-
broker: kafka 集群节点
-
topic: 一组消息的集合
- partition: subset of topic,locate in different broker
- segement: subset of partition
-
zookeeper: meta data
- brokerList
- topList
- consumer
- consumer list
- consumer offset
replicas
what: 一份数据多个备份
quick
why:
- 顺序写入;
- zero coy
- partition: 并发写入/读取
message
produce
partition
- partition
- key is null: round-robin
- key, hash(key) mod partitionCount
consumer
-
how partitions: [partitin0, partition1,partition]; consumers: [consumer1, consumer2,consumer2];
one consumer to [one partiion OR many partitin]
-
case 1: [p1-p4:c1-c2] c1:p1,p2 c2:p3,p4;
-
case 2: [p1-p4,c1-c6] c1:p1…c4:p4, (c 5,c6 are idle);
-
-
stratetgy
-
rangeAssignor distribute over partions by order;
-
roundRoubin evenly distribute
-
1. consumer rebalance
再平衡机制,确保分区能尽可能均匀分配到消费者上 触发条件
- consumer group
- 订阅topic变化
- 订阅topic分区数变化
2. PartitionAssignor Strategies
分配给消费者的策略
- RangeAssignor
- RoundRobinAssignor
3.auto.offset.reset
a new consumer group have not have s stored offset in kafka;
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
config
produce config
acks: 0:不等待任何ack 1:等待leader ack all: 等待所有ack
broker config
- log.message.timestamp.type
Define whether the timestamp in the message is message create time or log append time. The value should be either CreateTime
or LogAppendTime
Type: | string |
---|---|
Default: | CreateTime |
Valid Values: | [CreateTime, LogAppendTime] |
Importance: | medium |
Update Mode: | cluster-wide |
how kafka retrive message
-
通过offset二分查找对应的segment;
-
-
在index 文件上查找对应的relativeOffset; = offset-baseoffse(文件名)+1;
并没有为每个offset都建立索引;只能先查找与它最接近的relative; 再顺序扫描log文件
-
找到对应relative offset 对应的position; 再在log文件中查找
timeStamp
引入时间戳主要解决3个问题:
- 日志保存(log retention)策略:Kafka目前会定期删除过期日志(log.retention.hours,默认是7天)。判断的依据就是比较日志段文件(log segment file)的最新修改时间(last modification time)。倘若最近一次修改发生于7天前,那么就会视该日志段文件为过期日志,执行清除操作。但如果topic的某个分区曾经发生过分区副本的重分配(replica reassigment),那么就有可能会在一个新的broker上创建日志段文件,并把该文件的最新修改时间设置为最新时间,这样设定的清除策略就无法执行了,尽管该日志段中的数据其实已经满足可以被清除的条件了。
- 日志切分(log rolling)策略:与日志保存是一样的道理。当前日志段文件会根据规则对当前日志进行切分——即,创建一个新的日志段文件,并设置其为当前激活(active)日志段。其中有一条规则就是基于时间的(log.roll.hours,默认是7天),即当前日志段文件的最新一次修改发生于7天前的话,就创建一个新的日志段文件,并设置为active日志段。所以,它也有同样的问题,即最近修改时间不是固定的,一旦发生分区副本重分配,该值就会发生变更,导致日志无法执行切分。(注意:log.retention.hours及其家族与log.rolling.hours及其家族不会冲突的,因为Kafka不会清除当前激活日志段文件)
- 流式处理(Kafka streaming):流式处理中需要用到消息的时间戳
mq problem
order consume
- 全局有序
|
|
- 局部有序: 同一个partition下有序
- same key
avoid lost
ack:
- producer ack:
- 0: no response
- 1: leader response
- all: all replica response
|
|
- topic:set min.insync.replicas
|
|
- consumer: ack
|
|
duplicate
Idempotence:
excute mutiple time and get same result , f(f(x)) = f(x)
when:
- producer retries
- consumer retries: fail to commit
how:
- unique key: insert
- global id:
|
|