综述
1. Kafka 性能高的原因
- 日志文件追加写
- 文件写到内核缓冲区
- 零拷贝:从 broker 到 consumer
Ⅱ. Broker
2.1 日志模块
2.1.1 日志文件
Kafka 日志在磁盘上的组织架构如下图所示:
Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括:
- 消息日志文件(.log)
- 位移索引文件(.index)
- 时间戳索引文件(.timeindex)
- 已中止(Aborted)事务的索引文件(.txnindex)。
一个 Kafka 主题有很多分区,每个分区就对应一个 Log
对象,在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题 test-topic,那么,Kafka 在磁盘上会创建两个子目录:test-topic-0 和 test-topic-1。而在服务器端,这就是两个 Log 对象。每个子目录下存在多组日志段,也就是多组 .log、.index、.timeindex 文件组合,只不过文件名不同,因为每个日志段的起始位移不同。
每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的 offset。日志文件和2个索引文件都是根据 baseOffset 命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个 LogSegment 的基准偏移量为0,对应的日志文件为00000000000000000000.log。如下图所示,是某个分区目录下的文件:
每一个 Log 目录中,只有最后一个 LogSegment 才能执行写入操作。
2.1.2 日志文件切分
如果满足下面的几个条件之一,日志分段文件就会切分:
- 当前日志分段文件的大小超过了 broker 的参数
log.segment.bytes
的值,默认是1G - 当前日志分段中消息的最大时间戳与最小时间戳差值大于
log.roll.ms
或log.roll.hours
时,默认只配置了 log.roll.hours,默认值为168(7天)。如果同时配置 log.roll.ms 和 log.roll.hours,那么 log.roll.ms 优先级高 - 索引文件和时间戳索引文件的大小达到了 broker 的参数
log.index.size.max.bytes
,默认10M - “消息的偏移量”和“当前日志分段的 baseOffset” 的差值大于 Integer.MAX_VALUE。这个要求的原因可以查看[2.1.3 索引文件的格式](#2.1.3 索引文件的格式)
2.1.3 索引文件
索引文件使用稀疏索引的方式,每当在日志文件中写入一定量的消息时,就会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。
索引文件中每个索引项占用8个字节,分为2部分:
- relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用4个字节
- position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
索引项中没有直接使用绝对偏移量而改为只占用4个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间。举个例子,一个日志分段的 baseOffset 为32,那么其文件名就是 00000000000000000032.log,offset 为35的消息在索引文件中的 relativeOffset 的值为35-32=3。
所以这就解释了日志分段文件切分的第4个条件:如果彼此的差值超过了 Integer.MAX_VALUE,那么 relativeOffset 就不能用4个字节表示了,进而不能享受这个索引项的设计所带来的便利了。
索引文件的格式如下:
虽然是以16进制数表示的,但参考索引项的格式可以知道如下内容(可以使用 bin/kafka-dump-log.sh
这个脚本来解析索引文件):
[root@node1 kafka_2.11-2.0.0]# bin/kafka-dump-log.sh --files /tmp/kafka-logs/ topic-log-0/00000000000000000000.index
Dumping /tmp/kafka-logs/topic-log-0/00000000000000000000.index
offset: 6 position: 156
offset: 14 position: 459
offset: 22 position: 656
offset: 26 position: 838
offset: 31 position: 1050
我们这里给出 00000000000000000000.index 和 00000000000000000000.log 的对照图来做进一步的陈述,如下图所示:
Kafka 强制要求索引文件大小必须是索引项大小的整数倍,如果 broker 端设置
log.index.size.max.bytes=67
,那么 Kafka 内部会将其转换为64。
和日志模块相关的类是:
- LogSegment.scala:LogSegment.scala 是伴生类,就和 Java 中的普通类类似
- LogSegment.object:LogSegment.object 对象是一个单例对象,用于保存一些静态变量和静态方法。
2.1.4 日志清理
Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 提供了两种日志清理策略:
- 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段
- 日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
我们可以通过 broker 参数 log.cleanup.policy
来设置日志清理策略,默认值为 “delete”(即使用 Log Retention策略)。如果要使用日志压缩,那么就需要设置 log.cleanup.policy=compact
,同时还需要设置 log.cleaner.enable=true
。设置 log.cleanup.policy=delete,compact
可以同时支持日志删除和日志压缩两种策略。
2.1.4.1 Log Retention
在 Kafka 的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 参数 log.retention.check.interval.ms
来配置,默认值为5分钟。当前日志分段的保留策略有3种:
- 基于时间的保留策略
- 基于日志大小的保留策略(基于 Log 目录中所有日志文件的总大小)
- 基于日志起始偏移量的保留策略
2.1.4.2 Log Compaction
Log Compaction 对于有相同 key 的不同 value 值,只保留最后一个版本。如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 的日志压缩功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
Log Compaction 会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件来组织。Log Compaction 执行过后的偏移量不再是连续的,不过这并不影响日志的查询。
Kafka 中用于保存消费者消费位移的主题 __consumer_offsets 使用的就是 Log Compaction 策略。
2.2 Kafka 中的时间轮
2.2.1 时间轮概述
Kafka 中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。
JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O(nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1)。
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务 TimerTask:
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度 tickMs
。时间轮的时间格个数是固定的,可用 wheelSize
来表示,那么整个时间轮的总体时间跨度 interval
可以通过公式 tickMs × wheelSize
计算得出。时间轮还有一个表盘指针 currentTime
,用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskList 的所有任务。
2.2.2 添加延期任务到时间轮中
1. 原理
如上图所示,在将 TimerTaskEntry 添加到 TimerTaskList 后,会设置 TimerTaskList 的 expiration
变量(也就是这个 TimerTaskList 对应的时间格的到期时间,如[下例](#2. 举例)中第三层的时间轮的时间格 1 的 expiration 就是 400ms),设置完成后,会判断这个 TimerTaskList 的 expiration 变量是否改变过,如果改变,说明这个 TimerTaskList 是被新建的或是被重用的,此时就需要将这个 TimerTaskList 重新添加到 DelayQueue 中。
2. 举例
若时间轮的 tickMs=1ms,wheelSize=20,那么可以计算得出 interval 为 20ms。初始情况下表盘指针 currentTime 指向时间格 0,此时有一个延迟为 2ms 的任务插入进来会存放到时间格为 2 的TimerTaskList中。随着时间的不断推移,指针 currentTime 不断向后推进,过了 2ms 之后,当到达时间格 2 时,就需要执行时间格 2 所对应的 TimeTaskList 中的所有任务。此时如果有一个定时为 19ms 的任务插入进来怎么办?新来的 TimerTaskEntry 会复用原来的 TimerTaskList ,所以它会插入到原本已经到期的时间格 1 中。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在 currentTime 和 currentTime+interval 之间。
如果此时有个定时为350ms的任务该如何处理?如果任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中:
第一层的时间轮 tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval ,即为 20ms。每一层时间轮的 wheelSize 是固定的,都是20,那么第二层的时间轮的总体时间跨度 interval 为 400ms。以此类推,这个 400ms 也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为 8000ms(8s)。
对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格 17 所对应的 TimerTaskList 中。如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格 1 的 TimerTaskList 中。
2.2.3 从时间轮中取出到期任务执行
1. 原理
Kafka中的定时器借助了JDK中的 DelayQueue 来协助推进时间轮。
DelayQueue 会根据 TimerTaskList 对应的超时时间 expiration 来排序,最短 expiration 的 TimerTaskList 会被排在DelayQueue 的队头。Kafka 中会有一个线程 ExpiredOperationReaper
来获取 DelayQueue 中的到期的任务列表(通过 DelayQueue#poll()
)。当这个线程获取到 DelayQueue 中的超时的任务列表 TimerTaskList 后,会执行下面的操作:
- 根据 TimerTaskList 的 expiration 来推进时间轮的时间:也就是更新所有时间轮的
currentTime
字段 - 遍历 TimerTaskList 中的所有 TimerTaskEntry,将 TimerTaskEntry 和当前的 TimerTaskList 解绑,然后再对每个 TimerTaskEntry 执行 reinsert 操作(和[添加 TimerTaskEntry](#2.2.2 添加延期任务到时间轮中) 的执行逻辑是一样的):
- 如果 TimerTaskEntry 中的延期任务已到期,则会直接把这个任务交给线程池去执行
- 如果延期任务没有到期,则会把这个 TimerTaskEntry 重新插入到时间轮中
从上面的原理可知,真正执行延迟任务是通过将延迟任务重新添加进时间轮的过程中,判断延迟任务到期,此时会将延迟任务提交给线程池去执行。
2. 举例
对于第三层的时间轮的时间格 1 到期时(此时是 400ms),原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms, 60ms) 的时间格中。
再经历了 40ms 之后,此时这个任务又被“察觉”到,不过还剩余 10ms ,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中。
之后再经历 10ms 后,此任务到期,会再次从 DeplayQueue 中被 poll 出来,此时再将这个任务添加进时间轮时,会判断这个任务已经到期,于是就将这个任务提交给线程池去执行。
2.3 延时操作
Kafka 中有很多延时操作,比如:
- 生产者发送消息的时候将 acks 参数设置为 -1,那么就需要等待 ISR 集合中的所有副本都确认收到消息后才能正确收到响应结果,或者捕获超时异常
- 延时拉取:在拉取消息时,如果收集不到足够多(由参数
fetch.min.bytes
配置,默认为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待啦渠道足够数量的消息- 消费者客户端拉取最新的日志
- follower 副本拉取最新的日志
延时操作都会被放到专门的延时操作管理器(DelayedOperationPurgatory)中负责管理,延时操作有可能会超时,每个延时操作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
延时操作可以由超时触发或外部事件触发二倍执行。超时触发就是等到超时时间之后触发执行;外部事件触发比如:
- 消费者拉取最新的日志的延迟操作可以由“HW 的增长”来触发
- follower 副本拉取最新的日志的延时操作可以由“消息追加到 leader 副本的本地日志文件”触发
2.4 Controller
III. Consumer
3.1 消费端分区分配策略
Kafka 提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略,共有3中分配策略:
- RangeAssignor 分配策略:对于每一个 topic,都会按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
- 举例:假设总共2个 topic,每个 topic 3个 partition,消费组中共有2个消费者,都同时订阅了这2个 topic。那么对于 topic0,它的分配策略是:t0p0 和 t0p1 分配给 c0;t0p2 分配给 c1。topic1 的分配策略也是:t1p0 和 t1p1 分配给 c0;t1p2 分配给 c1。这种情况下 c0 分配的分区为 t0p0,t0p1,t1p0,t1p1,c1 分配的分区为 t0p2,t1p2,分区不平衡
- RoundRobinAssignor 分配策略:把所有 topic 的所有 partition 总体轮询地分配给每个消费者
- 如果消费组中所有的消费者订阅的 topic 都相同,那么这种分配策略会比较均衡;但是如果消费者订阅的 topic 不同,那么还是可能会造成分配不平衡
- StickyAssignor 分配策略:这种分配策略有2个主要目的:1. 分区的分配要尽可能均匀;2. 分区的分配尽可能与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。
默认情况下,Kafka 使用的是 RangeAssignor 分配策略。
IV. Kafka 的高级特性
4.1 事务
4.1.1 综述
一般而言,消息中间件的消息传输保障有3个层级,分别如下。
- at most once:至多一次。消息可能会丢失,但绝对不会重复传输
- at least once:最少一次。消息绝不会丢失,但可能会重复传输
- exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
当生产者向 Broker当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。如果生产者发送消息到 Kafka 之后,遇到了网络问题而造成通信中断,生产者可以进行多次重试来确保消息已经写入 Kafka,这个重试的过程中有可能会造成消息的重复写入,所以这里 Kafka 提供的消息传输保障为 at least once。
对消费者而言,消费者处理消息和提交消费位移的顺序在很大程度上决定了消费者提供哪一种消息传输保障:
- 如果消费者在拉取完消息之后,先处理消息后提交消费位移,那么在消息处理之后且在位移提交之前消费者宕机了,待它重新上线之后,会从上一次位移提交的位置拉取,这样就出现了重复消费,因为有部分消息已经处理过了只是还没来得及提交消费位移,此时就对应 at least once
- 如果消费者在拉完消息之后,先提交消费位移后进行消息处理,那么在位移提交之后且在消息处理完成之前消费者宕机了,待它重新上线之后,会从已经提交的位移处开始重新消费,但之前尚有部分消息未进行消费,如此就会发生消息丢失,此时就对应 at most once
Kafka 从 0.11.0.0 版本开始引入了幂等和事务这两个特性,以此来实现 exactly once。
4.1.2 幂等
1. 幂等的原理
每个生产者实例在初始化的时候都会被分配一个 PID,这个 PID 对用户而言是完全透明的。对于每个 PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将 <PID,分区> 对应的序列号的值加1。
broker 端会在内存中为每一对 <PID,分区> 维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比 broker 端中维护的对应的序列号的值(SN_old)大1(即 SN_new = SN_old + 1)时,broker 才会接收它。如果 SN_new< SN_old + 1,那么说明消息被重复写入,broker 可以直接将其丢弃。如果 SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction() 等方法的调用都会抛出 IllegalStateException 的异常。
引入序列号来实现幂等也只是针对每一对 <PID,分区> 而言的,也就是说,Kafka 的幂等只能保证单个生产者中每个分区的幂等。
2. 如何使用幂等
在生产者客户端中配置 enable.idempotence=true:
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
同时还需要保证 acks
,retries
,max.in.flight.requests.per.connection
这几个参数不配置错(使用默认配置就可以,不需要显式配置):
- 如果显式配置
retries
,那么值必须> 0
,KafkaProducer 默认会将这个参数设置为 Integer.MAX_VALUE - 如果显式配置
acks
,那么值必须为 -1(all) - 如果显式配置
max.in.flight.requests.per.connection
,那么值必须<= 5
4.1.3 事务
事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。
Kafka中的事务可以使应用程序将生产消息、消费消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
4.2 副本机制
这里先简要的整理下副本以及与副本相关的 AR、ISR、HW 和 LEO的概念:
- 副本是相对于分区而言的,即副本是特定分区的副本。
- 一个分区中包含一个或多个副本,其中一个为 leader 副本,其余为 follower 副本,各个副本位于不同的 broker 节点中。只有 leader 副本对外提供服务,follower 副本只负责数据同步。
- 分区中的所有副本统称为 AR,而 ISR 是指与 leader 副本保持同步状态的副本集合,当然 leader 副本本身也是这个集合中的一员。
- LEO 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO,ISR 中最小的 LEO 即为 HW,俗称高水位,消费者只能拉取到 HW 之前的消息。
4.2.1 失效副本
正常情况下,分区的所有副本都处于 ISR 集合中,但是难免会有异常情况发生,从而某些副本被剥离出 ISR 集合中。
Kafka 从 0.9.x 版本开始就只能通过 broker 端参数 replica.lag.time.max.ms
来判断,当 ISR 集合中的一个 follower 副本滞后 leader 副本的时间超过此参数指定的值时则判定为同步失败,需要将此 follower 副本剔除出 ISR 集合,具体可以参考下图。replica.lag.time.max.ms 参数的默认值为10000。
如果通过工具增加了副本因子,那么新增加的副本在赶上 leader 副本之前也都是处于失效状态的。如果一个 follower 副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上 leader 副本之前也处于失效状态。
4.2.2 ISR的收缩
Kafka 在启动的时候会开启两个与ISR相关的定时任务:
- isr-expiration
- isr-change-propagation
isr-expiration 任务会周期性地检测每个分区是否需要缩减其 ISR 集合。这个周期和 replica.lag.time.max.ms
参数有关,大小是这个参数值的一半,默认值为 5000ms。当检测到 ISR 集合中有失效副本时,就会收缩 ISR 集合。如果某个分区的 ISR 集合发生变更,则会将变更后的数据记录到 ZooKeeper 对应的 /brokers/topics/<topic>/partition/<parititon>/state
节点中。
节点中的数据如下:
除此之外,当 ISR 集合发生变更时还会将变更后的记录缓存到 isrChangeSet 中,isr-change-propagation 任务会周期性(固定值为 2500ms)地检查 isrChangeSet,如果发现 isrChangeSet 中有 ISR 集合的变更记录,那么它会在 ZooKeeper 的 /isr_change_notification
路径下创建一个以 “isr_change_” 开头的持久顺序节点(比如 /isr_change_notification/isr_change_0000000000),并将 isrChangeSet 中的信息保存到这个节点中。Kafka 控制器为 /isr_change_notification 添加了一个 Watcher,当这个节点中有子节点发生变化时会触发 Watcher 的动作,以此通知控制器更新相关元数据信息并向它管理的 broker 节点发送更新元数据的请求,最后删除 /isr_change_notification
路径下已经处理过的节点。
4.2.3 ISR的扩张
此某个失效副本的 LEO 不小于 leader 副本的 HW 时,这个失效副本就会重新进入 ISR 列表。
ISR 列表扩充之后同样会更新 ZooKeeper 中的 /brokers/topics/<topic>/partition/<parititon>/state
节点和 isrChangeSet,之后的步骤就和 ISR 收缩时的相同。
4.2.4 LEO 和 HW
从图中可以看出,follower 从 leader 同步一条数据需要发送2个 fetch 请求:
- 把数据同步过来,更新 follower 的 LEO
- 更新 leader 的 remote LEO 和 HW,更新 follower 的 HW
第一次fetch请求,分leader端和follower端:
leader端:
- 读取底层log数据。
- 根据fetch带过来的offset=0的数据(就是follower的LEO,因为follower还没有写入数据,因此LEO=0),更新remote LEO为0。
- 尝试更新HW,做min(leader LEO,remote LEO)的计算,结果为0。
- 把读取到的log数据,加上leader HW=0,一起发给follower副本。
follower端:
- 写入数据到log文件,更新自己的LEO=1。
- 更新HW,做min(leader HW,follower LEO)的计算,由于leader HW=0,因此更新后HW=0。
可以看出,第一次fetch请求后,leader和follower都成功写入了一条消息,但是HW都依然是0,对消费者来说都是不可见的,还需要第二次fetch请求。
第二次fetch请求,分leader端和follower端:
leader端:
- 读取底层log数据。
- 根据fetch带过来的offset=1的数据(上一次请求写入了数据,因此LEO=1),更新remote LEO为1。
- 尝试更新HW,做min(leader LEO,remote LEO)的计算,结果为1。
- 把读取到的log数据(其实没有数据),加上leader HW=1,一起发给follower副本。
follower端:
- 写入数据到log文件,没有数据可以写,LEO依然是1。
- 更新HW,做min(leader HW,follower LEO)的计算,由于leader HW=1,因此更新后HW=1。
这个时候,才完成数据的写入,并且分区HW(分区HW指的就是leader副本的HW)更新为1,代表消费者可以消费offset=0的这条消息了,上面的过程就是 Kafka 处理消息写入和备份的全流程。
4.2.5 Leader Epoch
从 [4.2.4 LEO 和 HW](#4.2.4 LEO 和 HW) 的介绍中我们可以知道 leader 和 follower 的 HW 更新是有一定的时间差的,如果在这个时间差中发生主从切换,有可能造成数据丢失和数据不一致的现象。
1. 数据丢失
假设 Replica B 是 leader 副本,Replica A 是 follower 副本。
Replica B 中的 LEO 和 HW 都是2,Replica A 中的 LEO 是2,HW 是1(也就是 follower 还没收到第二次响应,没有来得及更新自己的 HW)。如下图:
此时如果 A 宕机后重启了,会根据自己的 HW 值来进行截断,所以 A 中的 m2 消息就没了,A 向 B 发送 FetchRequest,此时如果 B 也宕机了,那么 A 就成了 leader,B 恢复之后会成为 follower,由于 follower 副本 HW 不能比 leader 副本的 HW 高,所以还会做一次日志截断,以此将 HW 调整为1。这样一来 m2 这条消息就丢失了(就算B不能恢复,这条消息也同样丢失)。
2. 数据不一致
假设 Replica A 是 leader 副本,Replica B 是 follower 副本。
Replica A 中的 LEO 和 HW 都是2,Replica B 中的 LEO 和 HW 都是1。如下图:
此时 A 宕机,B 成为新的 leader,B 又收到了一条新消息 m3,B 的 LEO 和 HW 都更新为2。此时 A 也恢复,成为了 follower,需要根据 HW 截断日志及发送 FetchRequest 至 B,不过此时 A 的 HW 正好也为2,那么就可以不做任何调整了,此时 A 和 B 就出现了数据不一致的情况。如下图所示:
3. Leader Epoch 的原理
数据丢失和数据不一致的根本原因是 follower 的日志不可靠,因此 follower 恢复后,不能以自己的 HW 为准,而是需要去 leader 那里进行确认。
为了解决上述两种问题,Kafka 从 0.11.0.0 开始引入了 leader epoch 的概念。leader epoch 代表 leader 的纪元信息(epoch),初始值为0。每当 leader 变更一次,leader epoch 的值就会加1,相当于为 leader 增设了一个版本号。
与此同时,每个副本中还会增设一个矢量 <LeaderEpoch, StartOffset>
,其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。每个副本的 Log 下都有一个 leader-epoch-checkpoint 文件,在发生 leader epoch 变更时,会将对应的矢量对追加到这个文件中。
4. 使用 Leader Epoch 解决数据丢失问题
假设 Replica B 的 leader epoch 为0。
Replica A 恢复后不会再根据自己的 HW 来截断日志,而是先发起 LeaderEpochRequest 询问 Replica B(leader 副本)leader epoch 为0的最新偏移量是多少,Replica B 判断请求中的 leader epoch 和自己当前的 leader epoch 相等,于是就将自己当前的 LEO(为2)返回给 Replica A,Replica A 就知道了消息 m2 不能被截断。
之后 B 发生了宕机,A 成为新的 leader,那么对应的 LE=0 也变成了 LE=1,对应的消息 m2 此时就得到了保留,这是原本所不能的,如下图所示。之后不管 B 有没有恢复,后续的消息都可以以 LE1 为 LeaderEpoch 陆续追加到 A 中。
5. 使用 Leader Epoch 解决数据不一致问题
假设 Replica A 的 leader epoch 为0。
Replica B 恢复后,写入消息 m3,并将 LEO 和 HW 更新至2,如下图所示。注意此时的 LeaderEpoch 已经从 0 增至 1了:
紧接着 A 也恢复过来成为 follower 并向 B 发送 OffsetsForLeaderEpochRequest 请求,此时 A 的 LeaderEpoch 为 LE0。B 根据 LE0 查询到对应的 offset 为1并返回给 A,A 就截断日志并删除了消息 m2,如下图所示。之后 A 发送 FetchRequest 至 B 请求来同步数据,最终A和B中都有两条消息 m1 和 m3,HW 和 LEO都为2,并且 LeaderEpoch 都为 LE1,如此便解决了数据不一致的问题。
4.3 Kafka 为什么不支持读写分离
主写从读虽然可以让从节点分担主节点的负载压力,预防主节点负载过重而从节点却空闲的情况发生。但是主写从读也有2个很明显的缺点:
- 主从节点数据一致性的问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致
- 延时问题:主从节点同步数据需要一定的时间
虽然 Kafka 的读写都是在 leader 节点,但是 Kafka 可以通过它的分区机制来实现负载均衡。Kafka 的一个 topic 可以有多个分区,分布在不同的 broker 上,只要分区分配的尽量均衡,再搭配监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。
总的来说,Kafka 只支持主写主读有几个优点:
- 可以简化代码的实现逻辑,减少出错的可能;
- 将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
- 没有延时的影响;
- 不会出现数据不一致的情况。
V. Kafka 生产经验
5.1 异步刷盘 or 同步刷盘
Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可以通过 log.flush.interval.messages
、log.flush.interval.ms
等参数来控制。
同步刷盘可以提高消息的可靠性,防止由于机器掉电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。
5.2 可靠性分析
5.2.1 Producer 端的可靠性分析
-
需要考虑生产者客户端参数 acks:相比于0和1,acks = -1(客户端还可以配置为 all,它的含义与-1一样,以下只以-1来进行陈述)可以最大程度地提高消息的可靠性。
-
消息发送有3种模式,即发后即忘、同步和异步。如果要提升可靠性,那么生产者可以采用同步或异步的模式,在出现异常情况时可以及时获得通知,以便可以做相应的补救措施。
-
默认情况下,retries 参数设置为0(即不进行重试),对于高可靠性要求的场景,需要将这个值设置为大于0的值,与 retries 参数相关的还有一个 retry.backoff.ms 参数,它用来设定两次重试之间的时间间隔,以此避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。重试存在2个缺点:
-
可能引起消息重复:对于重复消息,可以使用 Kafka 的幂等消息机制 + consumer 幂等消费来解决
-
可能造成消息乱序:因为
max.in.flight.requests.per.connection
的默认值为5,可能会影响消息的顺序性,对此要么放弃客户端内部的重试功能,要么将 max.in.flight.requests.per.connection 参数设置为1,但是这样也就放弃了吞吐量。
-
5.2.2 broker 端的可靠性分析
- 就 Kafka 而言,越多的副本数越能够保证数据的可靠性,一般而言,设置副本数为3即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka 时就会设置副本数为5。
- 同时还需要考虑 min.insync.replicas (默认值为1)参数,这个参数指定了 ISR 集合中最小的副本数,如果不满足条件就会抛出 NotEnoughReplicasException 或 NotEnoughReplicasAfterAppendException。注意 min.insync.replicas 参数在提升可靠性的时候会从侧面影响可用性。试想如果 ISR 中只有一个 leader 副本,那么最起码还可以使用,而此时如果配置 min.insync.replicas>1,则会使消息无法写入。
- 与可靠性有关的还有一个参数
unclean.leader.election.enable
。这个参数的默认值为 false,如果设置为 true 就意味着当 leader 下线时候可以从非 ISR 集合中选举出新的 leader,这样有可能造成数据的丢失。如果这个参数设置为 false,那么也会影响可用性,非 ISR 集合中的副本虽然没能及时同步所有的消息,但最起码还是存活的可用副本。 - 在 broker 端还有两个参数 log.flush.interval.messages 和 log.flush.interval.ms,用来调整同步刷盘的策略,默认是不做控制而交由操作系统本身来进行处理。同步刷盘是增强一个组件可靠性的有效方式,但是也是会影响 Kafka 的吞吐量。
5.2.3 consumer 端的可靠性分析
- 在消费端
enable.auto.commit
参数的默认值为 true,即开启自动位移提交的功能,这个会带来重复消费和消息丢失的问题,对于高可靠性要求的应用来说显然不可取,所以需要将 enable.auto.commit 参数设置为 false 来执行手动位移提交。
VI. 问题
1. Kafka 的用途有哪些?使用场景如何?
解耦,削峰,异步
2. Kafka 中的 ISR、AR 又代表什么?ISR 的伸缩又指什么
- AR:分区中所有的副本
- ISR:所有与 leader 副本保持同步状态的副本
Kafka 在启动的时候会开启1个和 ISR 相关的线程 isr-expiration。 这个线程会周期性地检测每个分区是否需要缩减其 ISR 集合。这个周期和 replica.lag.time.max.ms
参数有关,大小是这个参数值的一半,默认值为 5000ms。当检测到 ISR 集合中有失效副本时(也就是 follower 副本落后 leader 副本超过这个参数限制后),就会收缩 ISR 集合。
当某个失效的 follower 副本向 leader 副本拉取日志时,会判断一下,如果这个失效副本的 LEO 不小于 leader 副本的 HW 时,这个失效副本就会重新进入 ISR 列表。
3. Kafka 中的 HW、LEO、LSO、LW 等分别代表什么?
- HW:ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置上一条信息
- LEO:LogEndOffset 当前日志文件中下一条待写信息的offset
- LW:AR 中最小的 LEO 作为 LW
4. Kafka 中是怎么体现消息顺序性的?
kafka 每个 partition 中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1。
5. Kafka生产者客户端中使用了几个线程来处理?分别是什么?
2个,主线程和Sender线程。主线程负责创建消息,然后通过拦截器、序列化器、分区器后缓存到累加器 RecordAccumulator中。Sender 线程负责将 RecordAccumulator 中的消息发送到 kafka broker 中.
6. “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据**”这句话是否正确?如果不正确,那么有没有什么hack**的手段?
不正确,通过自定义分区分配策略,可以将一个consumer指定消费所有partition。
7. 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
offset+1
8. 有哪些情形会造成重复消费?
- producer
- producer 重试导致重复消费
- consumer
- 消费完成后没有及时提交 offset
- 自动提交 offset
9. 那些情景下会造成消息丢失?
-
消息在 producer 缓冲区的时候,producer 直接就挂了,此时写出去的数据就丢失了
-
acks=-1
且min.insync.replicas=1
,此时需要 leader 和 follower 都写成功才算成功,如果 follower 先宕机了,ISR 中只剩下一个 leader,leader 写入消息成功就算成功了,但是此时 leader 挂了,数据还是会丢失 -
consumer poll 到数据了,但是还没来得及处理,就自动提交了 offset,此时 cosnumer 宕机了,消息就会丢失
10. KafkaConsumer 是非线程安全的,那么怎么样实现多线程消费?
- 一个Consumer一个线程。该方案最大的优点便是实现很简单;但最大的缺点是消费线程受限于 Topic 分区数,只能通过不断增加 Partition 数量来提升消费能力,而Partition的增长最终还是要受限于broker实例所在机器的资源限制
- 将原来的消费线程分拆为2类线程(拉取线程、工作线程),通过这2类线程的分工合作来完成消息的消费:
- 拉取线程:只负责kafka消息的拉取、分发和消息offset的提交(可选),但不负责消息的业务处理。
- 工作线程:只负责消息的业务处理,但不负责kafka消息的拉取和offset的提交。拉取线程拉取到消息后,便可将消息分发给工作线程进行处理。比如拉取线程一次拉取到100个消息,分发给20个工作线程并行异步处理。
11. 简述消费者与消费组之间的关系
消费者从属于消费组,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。
12. 创建 topic 时如何选择合适的分区数?
根据集群的机器数量和需要的吞吐量来决定适合的分区数
13. 简述 Kafka 的日志目录结构
每个partition一个文件夹,包含四类文件:
- .index
- .log
- .timeindex
- leader-epoch-checkpoint
.index .log .timeindex 三个文件成对出现 前缀为上一个segment的最后一个消息的偏移。.log文件中保存了所有的消息 index文件中保存了日志的稀疏索引 timeindex保存的则是时间索引
leader-epoch-checkpoint中保存了每一任 leader 开始写入消息时的offset
follower 被选为 leader 时会根据这个确定哪些消息可用。
14. 如果我指定了一个 offset,Kafka 怎么查找到对应的消息?
- 在 index 索引文件中根据二分查找算法找到小于等于该offset的最大索引项的绝对偏移地址 xx
- 在对应的 .log 文件中从xx开始顺序搜寻记录,直到找到位移为offset的消息记录为止;
15. 如果我指定了一个 timestamp,Kafka 怎么查找到对应的消息?
- 首先从 timestamp 文件中找到小于等于timestamp 的最大timestamp,得到这个timestamp对应的索引位置index;
- 在位移索引文件中根据二分查找算法找到小于等于该index的最大索引项;
- 在对应的 .log 文件中从xx开始顺序搜寻记录,直到找到位移为offset的消息记录为止;
16. 聊一聊你对Kafka的Log Retention的理解(留存期)
在 Kafka 有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期默认值为5分钟。当前日志分段的保留策略有3种:
- 基于时间的保留策略
- 基于日志大小的保留策略(基于 Log 目录中所有日志文件的总大小)
- 基于日志起始偏移量的保留策略
17. 聊一聊你对Kafka的Log Compaction的理解
如果应用只关心 key 对应的最新 value 值,则可以开启 Kafka 的日志压缩功能,Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
Log Compaction 会生成新的日志分段文件,日志分段中每条消息的物理位置会重新组织。Log Compaction 执行过后的 offset 不再是连续的,不过这并不影响日志的查询。
Kafka 中用于保存消费者消费位移的主题 __consumer_offsets 使用的就是 Log Compaction 策略。
18. 聊一聊Kafka的延时操作的原理
延时操作创建之后会被加入到延时操作管理器中来做专门的处理,每个延时操作管理器都会配备一个定时器来做超时管理,定时器的底层就是采用时间轮实现的。
19. 聊一聊Kafka控制器的作用:
- controller生成:Kafka集群启动时,第一个在zookeeper的/controller目录下新建一个临时子节点的broker被选举为该kafka集群的控制器;
- controller高可用:通过zookeeper的临时节点+watcher事件实现,/controller下的子节点为临时节点,controller所在的broker崩溃就会消失,这样其他broker通过watcher事件收到消息,又会进行新的controller选举(第一个在zookeeper的/controller目录下新建子节点的broker为新的controller);
- controller职责:负责管理集群中所有分区和读本状态,当某个分区的leader副本出现故障时,由controller负责为该分区选举出新的leader副本,检测到某个分区的ISR集合发生变化时,由controller负责将元数据信息同步到Kafka集群中,他还负责topic的增加于删除,分区的重分配,优先副本的选举,topic的分区扩展,Kafka集群的扩展,broker崩溃,受控关闭,controller leader的选举等等事项;
- 总结:就是处理集群中所有的管理事务,并向集群中各个broker同步元数据信息的。
20. Kafka中的幂等性是怎么实现的
- producer id:PID,每个生产者在初始化时都会被分配一个PID,这个过程对用户而言是完全透明的;
- 序列号:对于每个 PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将 对应的序列号的值加1;
- broker端:broker 端会在内存中为每一对
<PID, Partition>
维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比 broker 端中维护的对应的序列号的值(SN_old)大1(即 SN_new = SN_old + 1)时,broker 才会接收它。
21. 失效副本是指什么?有那些应对措施
- 失效副本:与 leader 副本不保持同步关系的 follower 副本,即落后 leader 副本超过 replica.lag.time.max.ms 时间的 follower 副本
- 应对措施:
- 找到副本追赶不上的原因,还是follower副本端程序有问题,还是机器配置问题,还是网络问题等,找出原因后解决。
- 副本失效场景:
- follower 副本进程卡住,在一段时间内根本没有向 leader 副本发起同步请求,比如频繁的 Full GC;
- follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 I/O 开销过大;
- 如果一个 follower 副本由于某些原因(比如宕机)而下线,之后又上线,在追赶上 leader 副本之前也处于失效状态;
22. 为什么Kafka不支持读写分离
主写从读虽然可以让从节点分担主节点的负载压力,预防主节点负载过重。但是主写从读也有2个很明显的缺点:
- 主从节点数据一致性的问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致
- 延时问题:主从节点同步数据需要一定的时间
虽然 Kafka 的读写都是在 leader 节点,但是 Kafka 可以通过它的分区机制来实现负载均衡。Kafka 的一个 topic 可以有多个分区,分布在不同的 broker 上,只要分区分配的尽量均衡,再搭配监控、告警、运维相结合的生态平台,在绝大多数情况下 Kafka 都能做到很大程度上的负载均衡。
总的来说,Kafka 只支持主写主读有几个优点:
- 可以简化代码的实现逻辑;
- 将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
- 没有延时的影响;
- 不会出现数据不一致的情况。
23. Kafka在可靠性方面做了哪些改进
- HW:高水位,消费者只能消费位移再HW之前的消息,分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW;
- leader epoch:leader epoch 代表 leader 的纪元信息(epoch),初始值为0。每当 leader 变更一次,leader epoch 的值就会加1,相当于为 leader 增设了一个版本号。epoch是一对值,每个leader会保存一个缓存,定期将其写入一个检查点文件中,如果leader首次写消息,则会在缓存中增加一个条目,负责不做更新,每次副本称为新的leader是会查询这部分缓存,获取对应的leader版本位移
24. Kafka的哪些设计让它有如此高的性能
- 页缓存:大量使用页缓存,内存操作比磁盘操作快很多,数据写入直接写道页缓存,由操作系统负责刷盘,数据读取也是直接命中页缓存,从内存中直接拿到数据;
- 顺序读写:Kafka的数据是顺序追加的,避免了低效率的随机读写;
- 零拷贝:数据会从页缓存直接发送到网卡进行数据传输,省略了用户态和内核态的切换以及多次的数据拷贝;
- 优秀的文件存储机制:日志是分段的,避免了大文件读写的低效率;
- 索引文件:Kafka含有.index和.timeindex索引,以稀疏索引的方式进行构造,查找时可以根据二分法在索引文件中快速定位到目标数据附近位置,然后在.log文件中顺序读取到目标数据;
- producer端的批处理:提高并行度;
25. Kafka有什么优缺点
- kafka的优点
- 支持broker的横向拓展;
- 副本机制,实现数据冗余,保证数据不丢失;
- 十万级的单机吞吐量
- 基于磁盘实现数据的持久化;
- 毫秒级消息延迟,因为消息读取时首先从页缓存中进行命中,且采用了零拷贝策略,省略了消息传递时的IO时间,以及消息传递时再用户态和内核态切换的时间;
- 社区活跃
- 缺点
- 1.由于是批量发送,所以数据达不到真正的实时;
- 2.只能支持同一分区内消息有序,无法实现全局消息有序;
- 3.需要配合 zookeeper 进行元数据管理;
- 当 topic 和 partition 数量变多时,Kafka 的性能会急剧下降
26. Kafka 和 RocketMQ 的对比
- 性能
- Kafka单机写入 TPS 号称在百万条/秒;
- RocketMQ 大约在10万条/秒。
- 可靠性
- RocketMQ支持异步/同步刷盘;
- Kafka 仅支持异步刷盘
- 消费失败重试机制
- Kafka消费失败不支持重试
- RocketMQ消费失败支持定时重试,每次重试间隔时间顺延
- 定时/延时消息
- Kafka不支持定时消息
- RocketMQ支持定时消息
- 分布式事务消息
- Kafka不支持分布式事务消息
- RocketMQ 支持事务消息
- 支持的队列数
- Kafka单机超过64个队列/分区,消息发送性能降低严重
- RocketMQ 单机支持最高5万个队列,性能稳定
26.1 为什么 Kafka 不能支持过多的 topic/partition
RocketMQ
RocketMQ 使用Topic混合追加方式
,即一个 CommitLog 文件中会包含分给此 Broker 的所有消息,不论消息属于哪个 Topic 的哪个 Queue 。
所以所有的消息过来都是顺序追加写入到 CommitLog 中,并且建立消息对应的 CosumerQueue ,然后消费者是通过 CosumerQueue 得到消息的真实物理地址再去 CommitLog 获取消息的。可以将 CosumerQueue 理解为消息的索引。
然后拉消息的时候严格的说对于 CommitLog 来说读取是随机的,因为 CommitLog 的消息是混合的存储的,**但是从整体上看,消息还是从 CommitLog 顺序读的,都是从旧数据到新数据有序的读取。**并且一般而言消息存进去马上就会被消费,因此消息这时候应该还在页缓存中,所以不需要读盘。
Kafka
Kafka 的日志存储和 RocketMQ 不一样,它是一个分区一个文件。
Kafka 的消息写入对于单分区来说也是顺序写,如果分区不多的话从整体上看也算顺序写,它的日志文件并没有用到 mmap,而索引文件用了 mmap。但发消息 Kafka 用到了零拷贝。
但是分区多了的话,写入需要频繁的在多个文件之间来回切换,对于每个文件来说是顺序写入的,但是从全局看其实算随机写入,并且读取的时候也是一样,算随机读。而就一个文件的 RocketMQ 就没这个问题。
参考链接