Kafka

消息队列

为什么要用消息队列?

解耦 异步 削峰

解耦:

在分布式的电商平台里,订单业务和库存业务作为不同的模块,部署在不同的服务器上。那么用户在下订单以后,需要远程调用库存模块去处理库存。 那么有了消息队列以后。订单创建完后,发送到队列中,然后库存系统从队列中慢慢取出去执行就行了。这样,上游模块火爆不会影响下游模块的处理。

异步:

比如用户在注册的时候,试想一下,如果没有用消息队列来做异步,那么用户在注册时,申请了验证码后,后台接口要等待用户收到验证码,然后再填写完,才能继续做处理。这样,服务器的接口就会一直处于等待状态,十分影响性能。 现在,把注册接口和发送验证码的接口分开,用户申请验证码后,后端发送验证码的接口,只需要把验证码发送到消息队列中,然后就不用再管后续的操作,用户邮箱从消息队列中接受消息,用户短信也可以从消息队列里获得验证码,然后用户再填写完。

削峰

秒杀系统,用于限制流量,把秒杀成功的用户的购物请求放到队列里,然后后续创建订单和付费从队列里慢慢取出。

队列模型了解吗?

什么是事务型消息?

事务型消息区别于普通消息。它由 producer 发送给消息中间件后,中间件会判断发送的整个链路是否正常,是否有异常,如果发生异常,会回滚掉消息。

Kafka 是什么?

Kafka 是一种分布式流式处理平台。 三大功能

可以作为消息队列:支持生产者生产消息,消费者消费消息。 可以持久化存储消息流:Kafka 会把消息持久化在硬盘上,有效地避免消息丢失的风险。 可以作为流式处理平台:Kafka 提供完整的流式处理类库。 两大使用场景

消息队列:建立实时流处理管道,以可靠地在系统或应用程序之间获取数据。 数据处理:构建实时的流数据处理程序来转换或处理数据流。

Kafka 对比其他的消息队列的优势是什么?

性能更好:Kafka 是基于 Scala 和 Java 开发的,设计中使用了大量异步和批量处理的思想,最高可以每秒处理千万级别的消息。 生态系统更好:尤其在大数据和流计算领域,有更好的生态兼容性。

为什么 Kafka 能承载这么大的 QPS?为什么 Kafka 这么快

Kafka 最核心的功能是:【存储】数据,需要的时候【读取】出来。 Kafka 能这么快的原因就是实现了 partition 并行处理、存储和读取消息时充分利用操作系统 cache、顺序写和零拷贝技术

零拷贝技术:「一次读写」需要 2 次 DMA 拷贝,2 次 CPU 拷贝。而使用 mmap 可以减少一次 CPU 的拷贝。使用 sendfile+DMA Scatter/Gather 技术,可以实现 CPU 0 拷贝。

Producer -> Broker:用 mmap 技术,减少一次 CPU 拷贝。 Broker -> Consumer:用 sendFile 技术,实现 0 拷贝。

零拷贝技术

当磁盘中的文件要发送到远程服务器上时,需要经历四个步骤(四次拷贝)

拷贝一:从磁盘中读取文件,加载到内核缓冲区。 拷贝二:内核缓冲区将文件拷贝到用户空间缓冲区。 拷贝三:用户空间缓冲区的数据拷贝到内核的 Socket Buffer 中。 拷贝四:内核的 Socket Buffer 把数据拷贝到网卡缓冲区。

零拷贝技术:就是为了减少不必要的拷贝。

DMA:Direct Memory Access,只需要两次拷贝

拷贝一:文件从磁盘拷贝到内核的 Read Buffer。 文件描述符(包含数据位置和长度信息)加载在 Socket Buffer 拷贝二:数据从内核中拷贝到网卡中。 MMAP:文件映射机制。文件直接从磁盘映射到内存中,用户修改内存就能修改磁盘中的文件。

Kafka 组成机制

基本组成

组成:

producer:生产者,生产消息到 broker consumer:消费者,主要从 broker 里消费数据 broker:kafka 实例,多个 broker 可以组成 kafka 集群

概念:

topic:主题,生产者向 topic 发送消息,消费者通过订阅 topic 来消费消息。 partition:分区,一个主题可以有多个分区,同一主题的不同分区可以分布在不同的 broker 上。

consumer group:消费者组。

多个从同一个主题的消费者可以假如一个消费者组,消费者组里的消费者公用一个 group id。 消费者组均衡地给消费组里的消费者分配分区,一个分区只能由消费者组里的一个消费者消费。

基本参数

数据存储设计

(1) 数据文件

partition 的每条 message 的属性

offset:数据在partition里的偏移量 messageSize:消息内容data的大小 data:message的实际内容

(2) 数据文件分段

数据文件分段 segment

partition 物理上由多个 segment 组成,每个 segment 大小相等,且按照顺序读写。 每个 segment 数据文件以该段中最小的 offset 命名,文件扩展名为 .log。 使用二分查找查找指定 offset 的 message。

(3) 数据文件索引

kafka为每个数据文件分段都建立了索引文件,(数据文件名.index)

采用稀疏存储的方式,每隔一定字节的数据建立一条索引。

生产者

1.生产者设计

负载均衡

producer 可以通过轮询或者 hash 的方法,将消息发送到不同的 partition 上,以实现负载均衡。

批量发送

生产者可以在内存中合并多条消息后,再通过一次请求将消息批量发送给 broker,以减少 IO 的次数,提高吞吐量。

压缩

生产者可以把消息通过 GZIP 或 Snappy 格式进行压缩,然后消费者端进行解压减轻网络传输的压力。

2.Kafka如何保证发送消息不丢失、不重复和有序性

不丢失

使用带有回调方法的api 设置好参数

acks:发送成功后,是否等待 broker 返回 ack retires:重试次数 retry.backoff.ms:重试间隔 1)在同步模式的时候,确认机制设置为 -1,也就是让消息成功写入leader分区和所有的副本分区。 2)在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了:可以在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,等待缓存区。

不重复

可以让消费端建立去重表。

有序性

生产者发送完一条数据后,要等待 ack 确认后,才能接着发下一条数据。

消费者

1.消费者设计

负载均衡

RangeAssignor 按照topic的维度平均分配到不同的消费组里 RoundRobinAssignor 轮询。按顺序一个一个分发给消费组。 StickyAssignor

主题分区的分配要尽可能的均匀; 当Rebalance 发生时,尽可能保持上一次的分配方案。

2.Kafka如何保证消费消息不丢失、不重复和有序性

不丢失

对于多线程情况,关闭offset自动更新,让消费者自己去维护offset。(Kafka Client)

先消费消息,再更新offset。

不重复

做数据幂等性处理。

PG 的主键和 MySQL 的主键一一对应。 建立去重表,比如 Redis 的 set,每次消费判断是否消费过。

有序性

offset

深入解析Kafka的offset管理尚硅谷铁粉的博客-CSDN博客kafka的offset

数据一致性

(1) Kafka 怎么保证消费顺序?

为什么会消费无序?

Kafka 的一个 topic 会有很多个 partition,而 producer 发送的消息会经过 hash 计算,分配到不同的 partition 里。而一个消费者组里的每一个消费者只能订阅一个 partition,所以,消费者组数据可能会没有按照发送的顺序进行消费。

怎么解决无序的问题?

方法一:自定义数据分区算法,把指定的 key 的数据发送到指定的 partition(把同一批数据路由到指定一个 partition 里),并且设置专门的消费者消费这一个 partition。 方法二:给每个数据添加时间戳,消费完根据时间戳重新排序。

(2) Kafka 怎么保证不重复消费?

Consumer 端建立去重表,重复的消息不处理。

比如在 redis 中保存消费过的消息的 id。

(3) Kafka 怎么保证消费不丢失?

producer 生产丢失的情况

生产者发送消息后,因为网络问题没有发送出去。

consumer 消费丢失的情况

offset 被自动提交,但消费者还没消费消息。

broker 丢失的情况

leader 突然挂掉,而 leader 中的一些数据还没有同步到 follower。

解决办法

producer 生产丢失的解决办法

使用带有回调方法的 api 设置好参数

request.required.acks:发送成功后,是否等待 broker 返回 ack

0:不会等待 ack 确认,直接发送下一条消息。 1:会等待 leader 的 ack 确认,然后发送下一条消息。 -1:会等待 leader 和所有备份的 partition 都确认收到消息后,再发送下一条消息。 retires:重试次数 retry.backoff.ms:重试间隔 1)在同步模式的时候,确认机制设置为 -1,也就是让消息成功写入 leader 分区和所有的副本分区。 2)在异步模式下,如果消息发出去了,但还没有收到确认的时候,缓冲池满了:可以在配置文件中设置成不限制阻塞超时的时间,也就说让生产端一直阻塞,等待缓存区。

发送模式由 producer.type 指定

同步模式发送 sync:逐条发送消息。 异步模式发送 async:以 batch 形式批量发送消息。

consumer 消费丢失的解决办法

手动提交 offset,消费消息完之后,再提交 offset(分为同步提交、异步提交)。 消费端主动维护 offset,每次消费的时候,指定分区和要读取的 offset 和读取的消息数量。    @Override

   public CommonResult> consumeMessage(List brokers, String topic, Integer partition, Boolean autoCommitOffset, String offsetReset, Long offset, Long limit) {

      ...

       Properties props = KafkaUtils.initConsumerProperties(brokers,

                   getGroupId(topic, partition), autoCommitOffset, offsetReset);

       try (KafkaConsumer consumer =  new KafkaConsumer<>(props)) {

           // 指定好要消费的 offset 和分区

           initConnection(consumer, topic, partition, offset);

           ConsumerRecords records = consumer.poll(Duration.ZERO.plusMillis(maxTimeOut));

           // 消费的消息保存在 list 中,并返回

           List ret = new ArrayList<>();

           for (ConsumerRecord record : records) {

               if (limit == 0) {

                   break;

              }

               ret.add(JSON.parse(record.value()));

               limit--;

          }

           return CommonResult.success(ret);

      }

      ...

  }

自动提交:

enable.auto.commit ,是否开启自动提交 offset 功能,默认是 true。 auto.commit.interval.ms,自动提交 offset 的时间间隔,默认是 5s;

手动提交:

commitSync(同步提交):必须等待 offset 提交完毕,再去消费下一批数据;

会阻塞当前线程,但是可以失败重试。 commitAsync(异步提交) :发送完提交 offset 请求后,就开始消费下一批数据了;

不会阻塞线程,但是增加出错的可能性。

broker 丢失的情况解决方法

acks = all:所有分区同步后再确认。

Kafka 高可用

Kafka 通过分区副本来实现高可用

Kafka 的多副本机制和多分区机制了解吗?带来了什么好处?

多副本机制

分区有多个副本,其中,一个是 leader,其他的是 follower 生产者和消费者只与 leader 进行交互。follower 只从 leader 进行数据的备份。 一旦 leader 发生故障,follower 会重新选举出新的 leader。 多分区机制

topic 中有多个 partition,同一个 topic 的不同 partition 会分配在不同的 broker 中。

好处:

多副本机制:提高了容灾能力。leader 崩溃了,其他分区能够选出新 leader。 多分区机制:实现负载均衡,增强了并发能力。

partition 负载均衡

partition 会均衡分不到不同 broker 上。

创建topic的分区分布情况

假设如果一个topic有5个分区,3个副本

随选挑选一个startindex,如图startindex=0,那么 startindex=0 位置的分区就是 leader startindex+1,并按照broker顺序开始生成第一个副本

如何选出Leader?

基于优先副本,也就是挑选出 AR 列表中的第一个 brokerId 上的副本成为leader。

分区自动重平衡

如果开启auto.leader.rebalance.enable=true,Controller会开启定时任务,每隔一段时间去轮询brokers,并计算每个broker的分区不平衡率,如果不平衡率超过阈值,则自动进行分区迁移。

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

ack机制

为保证 producer 发送的数据,能可靠地发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledge 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

调优策略

Kafka 常用命令

1.生产者和消费者的命令行是什么?

bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topic Hello-Kafka

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic Hello-Kafka --from-beginning

2.如何获取topic主题列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

Kafka 和 Zookeeper

Zookeeper 在 Kafka 中的作用知道吗?

(1) broker 注册

zookeeper 会有一个节点,专门用来存放 broker 服务器列表记录的信息。 每个 broker 启动时,都会到 zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。 每个 broker 的 ip 地址和端口号就会记录到节点上去。

(2) topic 注册

同一个 topic 的多个分区可以注册在不同的 broker 上,那么,分区信息和 broker 的对应关系也由 zookeeper 管理。 /brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1

(3) 负载均衡

在Kafka 0.9 前,Committed Offset 信息保存在 zookeeper 的 consumers/{group}/offsets/{topic}/{partition}目录中(zookeeper其实并不适合进行大批量的读写操作,尤其是写操作)。

而在 0.9 之后,所有的 offset 信息都保存在了 Broker 上的一个名为 __consumer_offsets 的 topic 中。

Kafka 和 RabbitMQ

1.Kafka和RabbitMQ的区别

(1) RabbitMQ

broker 由 Exchange、Binding、Queue 组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。

(2) Kafka

kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式