0%

Kafka进阶

Kafka

1. 知识点

1.1 Kafka不能保证消息的全局有序,只能保证消息在partition内有序

1.2 每个partition对应于一个log文件,该log文件中存储的就是生产者生成的数据,生产者生成的数据会不断的追加到该log的文件末端,且每条数据都有自己的offset

1.3 kafka中的分片+索引

由于生产者生产的消息会不断追加到log文件的末尾,为防止log文件过大导致数据定位效率低下,Kafka采用分片和索引的机制,将每个partition分为多个segment,每个segment对应2个文件—-index文件和log文件,这2个文件位于一个相同的文件夹下,文件夹的命名规则为topic名称+分区序号。也就是partition下有多个segment文件夹,里面都有两个文件–index和log文件。Index文件中存储的数据的索引信息,第一列是offset,第二列是这个数据所对应的log文件中的偏移量。如果要去消费offset为3的数据,首先通过二分法找到数据在哪个index文件中,然后在通过index中offset找到数据在log文件中的offset;这样就可以快速的定位到数据并消费。所以kakfa虽然把数据存储在磁盘中,但是他的读取速度还是非常快的(类似于hbase对于行键来划分region,然后做的索引一样)

1.4 kafka如何保证数据可靠性呢?通过ack来保证。


为保证生产者发送的数据,能可靠的发送到指定的topic,topic的每个partition收到生产者发送的数据后,都需要向生产者发送ack(确认收到),如果生产者收到ack,就会进行下一轮的发送,否则重新发送数据。


那么kafka什么时候向生产者发送ack?
确保follower(就相当于是partition的leader的副本,避免partition的leader挂了,导致无法获取数据)和leader同步完成,leader再发送ack给生产者,这样才能确保leader挂掉之后,能在follower中选举出新的leader后,数据不会丢失。kafka采用的是全部follower都返回ack后由leader发送给生产者。为了避免其中一个follower因为某种故障一直无法同步,leader会维护一个动态的节点列表,对于那些超过一定时间(时间可以自定义)还未返回ack的follower就移除列表。leader故障后,也会从这个列表中来选举新的leader。节点的选择主要是满足两点,一是与leader的网络通信时间应该低时延,二是与leader数据差距,消息条数默认是10000条

1.5 Kafka如何保证消费数据的一致性?通过HW来保证。

LEO:指每个follower的最大的offset。HW(高水位):指消费者能见到的最大的offset,LSR队列中最小的LEO,也就是说消费者只能看到1~6的数据,后面的数据看不到,也消费不了。避免leader挂掉后,比如当前消费者消费8这条数据后,leader挂了,此时比如f2成为leader,f2根本就没有9这条数据,那么消费者就会报错,所以设计了HW这个参数,只暴露最少的数据给消费者,避免上面的问题。

1.6 zookeeper在kafka中的作用。

Kafka集群中有一个broker会被选举为controller,负责管理集群broker的上下线,所有的topic的分区副本分配和leader选举等工作。

1.7 增加分区

./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic 主题名称 --partitions 分区数量 --replication-factor 2

1.8 kafka分布式的情况下,如何保证消息的顺序?

Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton 和 key 是可选的。如果你指定了 partition,那就是所有消息发往同1个 partition,就是有序的。并且在消费端,Kafka 保证,1个 partition 只能被1个 consumer 消费。或者你指定 key(比如 order id),具有同1个 key 的所有消息,会发往同1个 partition。也是有序的。

1.9 kafka 增加分区后,spark 怎么办

总的解决方案如下:

  • MTDirectKafkaInputDStream继承DirectKafkaInputDStream,override compute方法,在每次生成KafkaRDD时,更新currentOffsets中的分区信息。
  • 在org.apache.spark.streaming.kafka路径下,新建一个KafkaUtils.scala文件,里面的代码直接将spark源码中的KafkaUtils源码复制过来。 修改新建的KafkaUtils.scala,将createDirectStream中new DirectKafkaInputDStream,替换为 new MTDirectKafkaInputDStream.

2. 进阶

  1. controller和follower
    controller会主动去监听zk中的元数据变化,然后同步给follower。broker会尝试去zk中创建目录,谁创建了目录,谁就是controller,follower会监听目录,如果controller挂了,follower会去创建目录,谁创建成功谁就是新的controller => 即所有broker中都存储了kafka集群元数据的信息

  2. 每个分区都有3个副本,副本分为leader和follower。leader会负责所有的读写,而follower只是用来同步leader的数据 => 这里的follower可能叫replica更合适

  3. controller kafka的主节点,只是用来同步元数据,副本的leader可以在任何一台broker上

  4. 每一个分区都有自己的log日志和index文件,111.log和111.index文件都是成对出现的,其中111指的是当前文件的第一条数据的offset。仿照的是跳表的设计,当查找offset=x的数据时,可以快速定位到index文件,index文件内部通过稀松索引,其实也类似于跳表的方式,去定位到offset=x的数据的偏移量位置,然后按照偏移量位置去log文件中去遍历去查找

  5. producer端的设计1:消息会封装为 ProducerRecord => 序列化 => 确定partition => 获取集群元数据 => 把数据写到缓冲区,有线程监听缓冲区,然后把数据打包batch上传到broker(16K大小或每100ms)

  6. 每一个batch发送完之后都没用了,都在等待gc,如果上传的数据特别多特别快,会导致频繁的gc,最终可能引发full gc。full gc会导致所有的读写都不可用,只会进行gc,执行完gc之后才能继续读写,影响性能

  7. producer端的设计2:将消息的batch仿照线程池的设计,设计一个内存池,将recordbatch每次上传成功后还给内存池。另外有好多batch要发送,如果有需要发送到同一个broker上面,会把这些batch再次封装成一个请求发送

  8. p2p模型,即同一条消息只能被一个消费者消费;发布订阅模型,即允许消息被多个consumer消费。同一个消费组是p2p模型,一个消息只能被一个消费组里的一个消费者消费。不同消费组是订阅模型。一个分区同一时间只能被一个消费组里的一个消费者消费。