深入理解Kafka
分区中所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),ISR 集合是 AR 集合中对应一个子集。
一定程度同步指 follower 副本在从 leader 副本中拉取数据进行同步时,同步期间相对于 leader 副本会有一定程度的滞后。
这个一定程度的同步是指可忍受的滞后范围,这个范围可以通过参数进行调整。与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR(Out-Sync Replicas),因此 AR = ISR + OSR。正常情况下 OSR 为空,AR = ISR。
HW: High Watermark高水位,标识了一个特定的消息偏移量,消费者只能拉取到这个 offset 之前的消息
消息 offset 从 0 开始,有 9 条消息,那么最后一条消息的 offset = 8。HW = 6,则表示消费者只能拿到 0 - 5 的消息,offset=6 的消息获取不到
LEO: Log End Offset,标识当前日志文件在下一条待写入消息的 offset。在上面的例子中,LEO = 9,即下一条消息写入的 offset 应该是 9
分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR集合中最小的 LEO 就是分区的 HW。
每一个分区只能被一个消费组中的一个消费者所消费
消息中间件两种消息投递方式:点对点、发布订阅。当所有消费者都都属于同一个消费组,则每条消息都会被每一个消费者处理,相当于前者;当所有消费者属于不同消费组,相当于后者
消费者消费 topic 三种订阅方式:集合订阅的方式
subscribe(Collection)
、正则表达式订阅的方式subscribe(Pattern)
和指定分区的订阅方式assign(Collection)
。在一个消费者中只能使用其中一种subscribe
订阅具有消费者自动均衡功能,即当消费组内的消费者增加或减少时,会自动调整分区分配关系,是因为subscribe
具有ConsumerRebalanceListener
类型的参数,而assign
没有消费到的 ConsumerRecord 包含的信息更丰富
timestampType 有两种类型:CreateTime 和 LogAppendTime,分别表示消息创建的时间戳和消息追加到日志的时间戳
对于消息在分区中的位置,称 offset 为 偏移量;对于消费者消费到的位置,称 offset 为 消费位移
offset 在之前是保存在 zk 中,现在是保存在kafka 内部的主题 `__consumer_offset`` 中
KafkaConsumer
类提供了position(TopicPartition)
和committed(TopicPartition)
两个方法如果当前消费者已经消费了 x 位置的消息,那么提交消费位移的时候是 x + 1,即下一次需要拉取数据的位置
postion = committed offset = lastConsumedOffset + 1,但是 position 和 committed offset 并不会一直相同
在
poll()
方法向服务端发起拉取请求之前都会检查是否可以进行消费位移提交,即位移提交是在下一次拉取的时候操作的offsetsForTimes()
方法返回时间戳大于等于查询时间的第一条消息对应的位置和时间戳KafkaProducer
线程安全;KafkaConsumer
非线程安全,在KafkaConsumer
中定义了一个acquire()
方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作,则会抛出ConcurrentModificationException
,在KafkaConsumer
中除了wakeup()
其他每个公用方法在执行前都会调用acquire()
方法acquire()
不会造成阻塞等待,可以看做一个轻量级锁,仅通过线程操作计数标记的方式来检测线程是否发生了并发操作主题和分区都是逻辑上的概念,分区有一至多个副本,每个副本对应一个日志文件夹,每个日志文件夹下面有一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等。
创建topic 名称会自动检测 . 和 _ 字符,因为 Kafka 内部做埋点时会根据主题名称来命名 metrics 的名称, 并且会将 . 替换为 _
先创建一个
topic.1_2
的主题,会提示 warning;再创建一个topic_1.2
的主题时,会因为 metrics 都是topic_1_2
名称冲突而报错分区重分配:先通过控制器为每个分区添加新副本,新的副本将从分区的 leader 副本复制所有的数据,数据是通过网络复制到新副本,所以会花一些时间。复制完成之后,控制器将旧副本从副本清单里移除。触发时机:某个节点下线或宕机,则该节点上的分区副本都已经处于失效的状态,需要重新分配;增加 broker 节点,只有新创建的主题分区才有可能会被分配到新的节点上,之前的主题分区不会自动分配到新加入的节点,这样新旧节点的负载就会不均衡,此时也需要重新分配。
复制限流:分区重分配本质是数据复制,先增加新副本,再进行数据同步,最后删除旧的副本。分区量大的话会影响业务,所以有一个限流的机制,可以对副本间的复制流量加以限制来保证不影响服务。两种方式:
kafka-config.sh
和kafka-reassign-partiton.sh
脚本,推荐后者简单,配合throttle
参数即可【主题、分区、副本之间的关系】
- Log 在物理上是以文件夹的形式存储,LogSegment 在物理上对应了磁盘上的一个日志文件和两个索引文件,以及可能的其他文件
- 只有最后一个 LogSegment 才能执行写入操作,即顺序追加日志,往.log文件里写入,同时也需要写入偏移量索引文件(.index文件)和时间戳索引文件(.timeindex文件)
日志索引
- 偏移量索引文件用来建立消息偏移量 offset 到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置
- 时间戳索引文件则根据指定的时间戳来查找对应的偏移量信息
- 索引文件以稀疏索引方式构造消息的索引,默认写入 4KB 之后增加新的索引项
- 稀疏索引通过
MappedByteBuffer
将索引文件映射到内存中,加快索引的查询速度
Kafka 速度快的原因
- 顺序写磁盘
- 理论上读写速度 寄存器 >> 缓存 >> 主存 >> 磁盘 >> 磁带。实际上顺序写磁盘的速度要大于随机写内存。原因是操作系统针对顺序写磁盘做了更深层次的优化,比如预读(提前将一个较大的磁盘块写入内存)和后写(批量写)。
- 页缓存
- 零拷贝
- 数据直接从磁盘文件复制到网卡设备,依赖于底层的 sendfile() 方法实现,减少 cpu 在内核态与用户态的切换以及文件的复制粘贴
- 顺序写磁盘
累加器
RecordAccumulator
中的 消息是以<分区, Deque< ProducerBatch>>
的形式进行缓存的时间轮(
TimingWheel
):一个存储定时任务的环形队列,底层采用数组实现,长度固定,数组中的每个元素都对应一个定时任务列表(TimerTaskList
),这个列表是一个环形的双向链表,链表中的每一项 表示的都是定时任务项(TimerTaskEntry
),其中封装了真正的定时任务 (TimerTask
)按照上图所示时间轮只能保存 20ms 内的任务,如果一个任务定时时间在 350ms,只用一个时间轮就不够了,所以有 层级时间轮 的概念,即第一层时间轮
tickMs=1,wheelSize=20,interval=20ms
,第二层tickMs是第一层时间轮的 interval,则第二层的interval=400ms
,第三层的interval=8000ms
会根据定时任务执行剩余时间来自动把定时任务移动到上层或下层时间轮
延时操作比如说延迟读取(即攒批读取)、延迟同步(即由 follower 拉取 leader 数据,acks ≠ -1)
控制器:broker 被选举为控制器,负责管理集群中所有分区和副本的状态。
eg:当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本;当某个分区的 ISR 集合发生变化时,由控制器负责通知所有 broker 更新元数据信息
优雅关闭 Kafka:通过
kill -s TERM $PIDS
或者kill -15 $PIDS
,不要用kill -9
原因是有一个关闭钩子,会在关系 Kafka 的时候做一些操作,比如让信息完全同步到磁盘RangeAssignor 分配策略按照[分区总数%消费者总数]作为跨度来进行平均分配;RoundRobinAssignor 分配策略通过轮询方式将同一个主题的分区依次分配个每个消费者;StickyAssignor 分配策略在分区重分配时尽可能让前后两次分配相同,即减少不必要的分区移动。
通过配置生产者客服端参数
enable.idempotence=true
可以实现单个生产者会话中单分区的幂等。这里的幂等不针对消息内容的幂等,即 send 两条相同的消息,对 kafka 而言是两条不同的消息,会分配不同的序列号幂等针对单个分区,事务可以保证多个分区写入操作的原子性。通过客户端参数
transactional.id
显示设置一般有两种情况导致副本失效,即 ISR 集合减少:
- follower 副本进行卡主,在一段时间内没有向 leader 副本发起同步请求,比如频繁 Full GC
- follower 副本进程同步过慢,在一段时间内都无法追赶上 leader 副本,比如 IO 开销过大
当 follower 副本的 LEO 不小于 leader 副本的 HW,就可以添加进 ISR 集合
Kafka 为什么不支持读写分离?
- 数据一致性问题。数据从主节点转到从节点会有一个延迟的时间窗口
- 延时问题。数据主从同步要比 Redis 的主从同步更加耗时,因为走磁盘
读写分离可以分摊一定的负载但不能做到完全的负载均衡。Kafka 通过 broker 也可以实现负载均衡,即 N个分区分布在 N个 broker 上,可以保证每个 broker 的读写是均衡的。也存在情况是不均衡的:
- broker 端的分区分配不均匀
- 生产者写入消息不均匀
- 消防者消费消息不均匀
- leader 副本的切换不均匀,比如 broker 宕机导致的主从副本切换或者分区副本重分配,导致各个 broker 中的 leader 副本分配不均
数据可靠性
- 理论上副本为 3 可以应对绝大多数场景,设置为 5 则可靠性会更高,但是副本越多,对网络、磁盘的浪费以及性能也会有影响
- acks 参数设置为-1,可以最大程度保证消息的可靠性,但是如果 leader 副本的消息流入很快,follower 副本的同步速度很慢,在某个时间点所有的 follower 副本都被剔除 ISR 集合,此时集合里只有 leader 副本,acks=-1 就变成 acks=1 的情况,也会加大消息丢失的风险
- 基于这一点kafka 提供了
min.insync.replicas
参数,指定了 ISR 集合中最小的副本数。通常副本为 3,参数设置为 2。这个参数会提高可靠性的同时侧面影响可用性。如果 ISR 只有一个 leader 副本,集群还可用,但是参数设置为 2,则会导致消息无法写入
- 基于这一点kafka 提供了
- 可以重新回溯消费某个时间点开始的数据,这一点可以提高可靠性
类比于 RabbitMQ的一些拓展功能,需自行开发:
- 延时队列:队列是存储消息的载体,延时队列存储的对象是延时消息,达到目标延时时间后才能消费。对于延时队列的封装实现,如果要求延时精度不是那么高,则建议使用延时等级的实现方案,毕竟实现起来简单明了。反之,如果要求高精度或自定义延时时间,那么可以选择单层文件时间轮的方案。
- 死信队列:由于某些原因消息无法被正确投递,为了确保消息不会丢失,将其放到一个特殊的队列,后续分析程序可以通过消费这个队列的数据来分析当时的异常情况,对系统进行优化。
- 死信:可以看作是消费者不能处理收到的消息,也可以看作消费者不想处理收到的消息,还可以看作是不符合处理要求的消息。
- 重试队列:由消费端触发,如果消费失败,又不想消息丢失,可以重新写入到kafka 中,比如第一次消费失败,设置 5s 后写入到kafka;第二次消费失败,社会 10s 后写入到 kafka;以此类推。当超过一定投递次数后就写入死信队列。
消息轨迹、消息审计