Kafka - 生产者acks参数介绍与设置教程(附:如何保证数据不丢)
作者:hangge | 2024-05-29 08:34
1,数据通讯方式介绍
针对 producer 的数据通讯方式有同步发送和异步发送两种,它们区别如下:
- 同步发送:生产者发出数据后,等接收方发回响应以后再发送下个数据的通讯方式。
- 异步发送:生产者发出数据后,不等接收方发回响应,接着发送下个数据的通讯方式。
2,acks 参数介绍
(1)具体采用何种数据通讯策略(生产者决定何种写入语义)是由 acks 参数控制的,该参数有如下三种可选值:
- 1(默认值):表示需要 Leader 节点回复收到消息,这样生产者才会发送下一条数据
- -1(即 all):表示需要所有 Leader + 副本节点回复收到消息(acks=-1),这样生产者才会发送下一条数据
- 0:表示不需要任何节点回复,生产者会继续发送下一条数据
(2)以下图为例,我们在向 hello 这个 topic 生产数据的时候,可以在生产者中设置 acks 参数:
- 如果 acks 设置为 1,表示我们在向 hello 这个 topic 的 partition1 这个分区写数据的时候,只需要让 leader 所在的 broker1 这个节点回复确认收到的消息就可以了,这样生产者就可以发送下一条数据了
- 如果 acks 设置为 -1(即 all),则需要 partition1 的这两个副本所在的节点(包含 Leader)都回复收到消息,生产者才会发送下一条数据
- 如果 acks 设置为 0,表示生产者不会等待任何 partition 所在节点的回复,它只管发送数据,不管你有没有收到,所以这种情况丢失数据的概率比较高。
3,acks 参数设置
(1)以 SpringBoot 项目为例,如果使用的是 kafka-clients 组件与 Kafka 进行集成,则在代码中通过如下方式设置 acks 参数:
Properties prop = new Properties(); //指定kafka的broker地址 prop.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); //指定key-value数据的序列化格式 prop.put("key.serializer", StringSerializer.class.getName()); prop.put("value.serializer", StringSerializer.class.getName()); // acks设置 prop.put("acks", "1"); //指定topic String topic = "hello"; //创建kafka生产者 KafkaProducer <String, String> producer = new KafkaProducer<String,String>(prop); //向topic中生产数据 producer.send(new ProducerRecord<String, String>(topic, "hello hangge.com")); //关闭链接 producer.close();
(2)如果我们使用的是 spring-kafka 这个组件,只要在项目 application.properties 文件中添加如下配置进行设置即可:
spring.kafka.producer.acks=1
4,ISR
(1)ISR(In-Sync Replicas)是指和主分区保持了主从同步的所有从分区。
- 比如说,一个主分区本身有 3 个从分区,但是因为网络之类的问题,导致其中一个从分区和主分区失去了联系,没办法同步数据,那么对于这个主分区来说,它的 ISR 就是剩下的 2 个分区。
(2)我们能够通过 min.insync.replicas 这个参数来配置对 ISR 里面的分区数量。
- 比如说当我们设置 min.insync.replicas = 2 的时候,就意味着 ISR 里面至少要有两个从分区。如果分区数量不足,那么生产者在配置 acks = all 的时候,发送消息会失败。
(3)与 ISR 对应的一个概念是 OSR,也就是不在 ISR 里面的分区集合。
附一:消息丢失的各种场景
1,生产者发送
(1)一个消息丢失的场景就是生产者把 acks 设置成 0,然后发送消息。这个时候虽然生产者能够拿到消息客户端返回的成功响应,但是事实上 broker 可能根本没收到,或者收到了但是处理新消息的时候遇到 Bug 了。
(2)如果我们启用了批量发送功能,而且批次比较大的话,那么还可能发生的情况就是,Kafka 客户端连请求都没有发送出去,服务就整个崩溃了,这种情况也会引起消息丢失。
2,主从同步
(1)而在 acks=1 的时候,只要求写入主分区就可以。所以假设在写入主分区之后,主分区所在 broker 立刻就崩溃了。这个时候发起新的主分区选举,不管是哪个从分区被选上,它都缺了这条消息。
(2)如果只要使用 acks=all 也不能保证肯定不会有问题。因为 Kafka 里面还有一种 unclean 选举。在允许 unclean 选举的情况下,如果 ISR 里面没有任何分区,那么 Kafka 就会选择第一个从分区来作为主分区。这种 unclean 选举机制本身主要是为了解决你希望尽可能保证 Kafka 依旧可用,并且等待从分区重新进入 ISR 的问题。类似地,unclean 选出来的新的主分区也可能少了部分数据。
(3)那么如果我设置 acks=all 并且禁用 unclean 选举,还是不能保证万无一失,因为还有下面的刷盘问题。
3,刷盘
(1)为了减少磁盘写入的次数,broker 会将消息暂时缓存起来(存储在 page cache 中),当消息的个数达到一定阈值或者过了一定的时间间隔后,再 flush 到磁盘。当遇到断电或机器故障的情况,PageCache 上的数据可能未来得及刷新到磁盘,会造成消息丢失。
- 当 acks=1 的时候,主分区返回写入成功的消息,但是这个时候消息可能还在操作系统的 page cache 里面。
- 而当 acks=all 的时候,主分区返回写入成功的消息,不管是主分区还是 ISR 中的从分区,这条消息都可能还在 page cache 里面。
(2)而在 Kafka 中,控制刷盘的参数有三个:
- log.flush.interval.messages:控制消息到多少条就要强制刷新磁盘。Kafka 会在写入 page cache 的时候顺便检测一下。
- log.flush.interval.ms:间隔多少毫秒就刷新数据到磁盘上。
- log.flush.scheduler.interval.ms:间隔多少毫秒,就检测数据是否需要刷新到磁盘上。
(3)参数说明:
- 后面两个是要配合在一起的。举个例子,假如说 log.flush.internval.ms 设置为 500,而 log.flush.schduler.interval.ms 设置为 200。也就是说,Kafka 每隔 200 毫秒检查一下,如果上次刷新到现在已经过了 500 毫秒,那么就刷新一次磁盘。
- 而 log.flush.interval.messages 和 log.flush.interval.ms 都设置了的话,那么就是它们俩之间任何一个条件满足了,都会刷新磁盘。
提示:总之,Kafka 要么是定量刷,要么是定时刷。
4,消费者提交
消费者提交是指消费者提交了偏移量,但是最终终却没有消费的情况。
- 比如说线程池形态的异步消费,消费者线程拿到消息就直接提交,然后再转交给工作线程。在转交之前,或者工作线程正在处理的时候,消费者都有可能宕机。于是一个消息本来并没有被消费,但是却被提交了,这也叫做消息丢失。
附二:如何保证消息不丢失
1,确保发送方一定发了消息
(1)在发送方,我们可以采用本地消息表解决方案。简单来说,就是在业务操作的过程中,在本地消息表里面记录一条待发消息,做成一个本地数据库事务。然后尝试立刻发送消息,如果发送成功,那么就把本地消息表里对应的数据删除,或者把状态标记成已发送。
(2)如果这个时候失败了,就可以立刻尝试重试。同时,还要有一个异步的补发机制,扫描本地消息表,找出已经过了一段时间,比如说三分钟,但是还没有发送成功的待发消息,然后补发。
(3)而异步补发机制可以简单理解成有一个线程定时扫描数据库,找到需要发送但是又没有发送的消息发送出去。更直观的来说,就是这个线程会执行一个类似如下的 SQL。
(4)整个过程如下图所示,并且图里标注了三个易出错点:
# 找出三分钟前还没发送出去的消息,然后补发 SELECT * FROM msg_tab WHERE create_time < now() - 3min AND status = '未发送'
(4)整个过程如下图所示,并且图里标注了三个易出错点:
- 1. 如果已经提交事务了,那么即便服务器立刻宕机了也没关系。因为我们的异步补发机制会找出这条消息,进行补发。
- 2. 如果消息发送成功了,但是还没把数据库里的消息状态更新成已发送,也没关系,异步补发机制还是会找出这条消息,再发一次。也就是说,在这种情况下会发送两次。
- 3. 如果在重试的过程中,重发成功了但是还没把消息状态更新成已发送,和第 2 点一样,也是依赖于异步补发机制。
(5)上面三点都依赖于异步补发机制。但是,重试也要控制住重试间隔和重试次数。所以我们可以在本地消息表里面额外增加了一个新的列,用来控制重试的间隔和重试的次数。如果最终补发都失败了就会告警。这个时候就需要人手工介入了。
那么这时候本地消息表至少有两个关键列:一个是消息体列,里面存储了消息的数据;另一个是重试机制列,里面可以只存储重试次数,也可以存储重试间隔、已重试次数、最大重试次数。剩余的列,我们就可以根据自己的需要随便加,不关键。
(6)总之,这种解决方案其实就是把一个分布式事务转变成本地事务 + 补偿机制。
- 在这里案例里面,我们的分布式事务是要求执行业务操作并且发消息,那么就转化成一个本地事务,这个本地事务包含了业务操作,以及下一步做什么。
- 然后,补偿机制会查看本地事务提交的数据,找出需要执行但是又没有执行成功的下一步,执行。这里的下一步,就是发送消息。
2,确保消息队列不丢失
(1)要确保消息发送到了消息队列上,而且不会丢失。就要把 acks 设置成 all 并且禁用 unclean 选举。
(2)进一步考虑刷盘的问题,那就需要调整 log.flush.interval.messages、log.flush.interval.ms 和 log.flush.scheduler.interval.ms 三个会影响刷盘的参数。比如我们可以将其设置为 10000、2000、3000。
提示:理论上来说,这种配置确实还是有一点消息丢失的可能,但是概率已经非常低了。只有一种可能,就是消息队列完成主从同步之后,主分区和 ISR 的从分区都没来得及刷盘就崩溃了,才会丢失消息。这个时候真丢失了消息,就只能人手工补发了。
3,确保消费者肯定消费
要确保消费者肯定消费消息,大多数时候都不需要额外做什么,但是如果业务上有使用异步消费机制就要小心一些。
- 比如说我的 A 业务里面就采用了异步消费来提高消费速率,我利用批量消费、批量提交来保证异步消费的同时,也不会出现未消费的问题。
全部评论(0)