返回 导航

大数据

hangge.com

Kafka - 延迟消息的实现方案(附:RabbitMQ延迟消息)

作者:hangge | 2024-06-04 08:18

一、基本介绍

1,延迟队列

延迟队列是一种特殊的队列。它里面的每个元素都有一个过期时间:
  • 当元素还没到过期时间的时候,如果我们试图从队列里面获取一个元素,我们会被阻塞。
  • 当有元素过期的时候,我们就会拿到这个过期的元素。也可以这样想,我们拿到的永远是最先过期的那个元素。
提示:很多语言本身就提供了延迟队列的实现,比如说在 Java 里面的 DelayQueue

2,延迟消息

    基于消息队列的延迟队列,也叫做延迟消息。具体来说,延迟消息是指消息不是立刻被消费的,而是在经过一段时间之后,才会被消费。在到时间之前,这个消息一直都被存储在消息队列的服务器上。
注意RabbitMQ 有插件支持延迟消息功能,而 RocketMQ Kafka 则只能自己研发。

二、RabbitMQ 实现延迟消息

1,使用插件实现

(1)RabbitMQ 有一个延迟消息的插件 rabbitmq_delayed_message_exchange,只需要启用这个插件就可以使用延迟消息。这个插件的基本原理也比较简单,就是实现了一个 exchange。这个 exchange 控制住了消息什么时候会被真的投递到队列里。
  • 如下图所示,消息会先暂时存储在 exchange 里面。它使用的是 Mnesia 来存储。如果你不知道 Mnesia 是什么,就直观地把它看作一个基于文件的数据库。
  • 当延迟的时间满足条件之后,这些存储的数据就会被投递到真正的消息队列里面。紧接着消费者就可以消费到这个消息了。
提示:我们可以从这里得到一个启发,就是如果要实现一个延迟队列,是可以借助数据库的。

(2)那么这个插件本身也是有很多限制的,在它的官网主页里面就有说明,其中有两个最突出的限制:
  • 消息在真的被投递到目标消息队列之前,是存放在接收到了这个消息的服务端本地的 Mnesia 里面。也就是说,如果这个时候还没有刷新磁盘,那么消息就会丢失;如果这个节点不可用了,那么消息也同样会丢失。
  • 不支持高并发、大数据量。显然,现实中很多场景都是要在高并发大数据量场景下使用延迟消息的,比如说订单超时取消。因此这个缺点也限制了这个插件被广泛使用。

2,使用 ttl 和死信队列实现

(1)除了使用插件外,开发者也可以自己手动实现延迟消息。这就要利用到 RabbitMQ ttlTime To Live,也就是过期时间)功能和所谓的死信队列了。死信队列是一种逻辑上的概念,也就是说它本身只是一个普通的队列。而死信的意思是指过期的无法被消费的消息,这些消息会被投送到这个死信队列。
  1. 简单来说,就是开发者准备一个队列 delay_queue,为这个 delay_queue 设置过期时间,这个 delay_queue 不需要消费者。然后把你真实的业务 biz_queue 绑定到这个 delay_queue,作为它的死信队列。
  2. 生产者发送消息到 delay_queue,因为没有消费者,所以消息会过期。过期之后的消息被转发到死信队列,也就是 biz_queue 里面。这时候消费者就能拿到消息了。

(2)这种方案并没有插件的那两个缺点。但是 ttl 的设置是在队列级别上,也就是说一个 delay_queue 的延时是固定的,不能做到随机。
  • 比如说我这一条消息延后三分钟,另外一条消息延后五分钟,这是不可能的。因此,我们可能需要创建很多不同 ttl delay_queue 才能满足业务需要。

三、Kafka 实现延迟消息方式1:利用定时任务调度

1,方案介绍

(1)利用定时任务来实现延迟消息是最好、最简单的办法。
  • 对于一个延迟消息来说,一个延迟到 30 分钟后才可以被消费的消息,也可以认为是 30 分钟后才可以发送。也就是说,我们可以设定一个定时任务,这个任务会在 30 分钟后把消息发送到消息服务器上。

(2)定时任务调度最好是通过解决了持久化的分布式任务平台。那么业务发送者就相当于注册一个任务,这个任务就是在 30 分钟之后发送一条消息到 Kafka 上。之后业务消费者就能够消费了。

2,方案的缺点

    这个方案的昀大缺点是支撑不住高并发。这是因为绝大多数定时任务中间件都没办法支撑住高并发、大数据的定时任务调度,所以只有应用规模小,延迟消息也不多的话,才可以考虑使用这个方案。如果想要支持高并发、大数据的延迟方案,还是要考虑利用消息队列。

四、Kafka 实现延迟消息方式2:分区设置不同延迟时间

1,方案介绍

(1)该方案的基本架构图如下,这里关键的角色是 delay_topic 和延迟消费组:
  • 创建了一个 delay_topic,这个topicN 个分区,每个分区设定了不同的延迟时间,用来接收不同延迟时间的消息。比如说在下图中分成了 p0p1p2 三个分区,分别用于接收延迟时间为 1min3min 10min 的消息。
  • 然后我们创建了一个消费组去消费这个 delay_topic,延迟消费组会创建出和分区数量一样的消费者,每一个消费者消费一个分区。每个消费者在读取到一条消息之后,就会根据消息里面的延迟时间来等待一段时间。等待完之后,再把消息发送到业务方真正的 topic 上(如下图的 biz_topic

(2)其中分区 N 是根据业务来设计,比如:
  • 5 个分区:延迟时间分别是 1min3min5min10min30min
  • 10 个分区:延迟时间分别是 1min3min5min10min15min30min60min90min120min180min

2,rebalance 问题

(1)在这个方案中,因为消费者睡眠了,睡眠期间不会消费消息,所以 Kafka 就会判定这个消费者已经崩溃了,从而触发 rebalance。等睡眠结束之后,重新消费,就不一定还是消费原本的那个分区了。

(2)为了避免这个问题,就需要确保在睡眠期间不会触发 rebalance。因此需要利用 Kafka 的暂停(Pause)功能,在睡眠结束之后,再恢复(Resume)。
提示Kafka 的暂停消费,并不是不再发起 Poll 请求,而是 Poll 了但是不会真的拉消息,或者说相当于拉取 0 条数据。这样可以让 Kafka 始终认为消费者还活着。

(3)所以整体逻辑如下:
  • 拉取一条消息,假如说 offset = N,查看剩余的延迟时间 t
  • 暂停消费,睡眠一段时间 t
  • 睡眠结束之后,恢复消费,继续从 offset = N 开始消费。

3,一致性问题

(1)从服务端拉取到消息,然后转发到 biz_topic 里面这个流程中,是先提交消息,还是先转发?
  • 如果是先提交,那么就会出现消息提交了,但是还来不及转发 biz_topic 就宕机的情况,这显然不能容忍。
  • 但是如果先转发 biz_topic,然后提交。那如果提交之前宕机了,后面恢复过来,又会转发一次。

(2)要解决一致性问题,必须先转发 biz_topic,然后再提交。同时还需要业务方的配合。
  • 也就是说,如果在转发 biz_topic 之后,提交失败了,下一次就还可以重试,那么 biz_topic 就可能收到两条同样的消息。在这种场景下,就只能要求消费者做到幂等。
  • 当然,即便不用延迟消息,消费者最好也要做到幂等的。因为发送方为了确保发送成功,本身就可能重试。

4,方案的优缺点

(1)优点:这个方案最大的优点就是足够简单,对业务方的影响很小,业务方只需要根据自己的延迟时间选择正确的分区就可以了。

(2)缺点:这个方案也有两个突出的缺点,就是延迟时间必须预先设定好、分区间负载不均匀。
  • 第一个是延迟时间必须预先设定好,比如只能允许延迟 1min3min 或者 10min 的消息,不支持随机延迟时间。在一些业务场景下,需要根据具体业务数据来计算延迟时间,那么这个就不适用了。
  • 第二个是分区之间负载不均匀。比如很多业务可能只需要延迟 3min,那么 1min10min 分区的数据就很少。这会进一步导致一个问题,就是负载高的分区会出现消息积压的问题。
提示:在这里,很多解决消息积压的手段都无法使用,所以只能考虑多设置几个延迟时间相近的分区,比如说在 3min 附近设置 2min30s3min30s 这种分区来分摊压力。

五、Kafka 实现延迟消息方式3:基于MySQL转储方案

1,方案介绍

(1)该方案的实现逻辑如下:
  • 创建一个 delay_topic,业务发送者把消息发送到这个 topic 里面,消息里面带上了需要延迟的时间。
  • 有一个延迟消费者,它会消费 delay_topic 里面的消息,转储到数据库中。
  • 还有一个延迟发送者,它会轮询数据库里的消息,把已经可以转发出去的消息转发到真正的 biz_topic 上。发送完之后,延迟发送者把数据库的状态更新成已发送。
  • 最后业务消费者消费 biz_topic

(2)如何支撑住高并发是该方案要解决的一个痛点。由于此方案最明显的性能瓶颈就是 MySQL,因为这个场景是一个写密集的场景。所以要想撑住高并发就要想办法提高 MySQL 的性能。
  • 当然最佳的策略还是换一个存储结构,比如说换 TiDB 或者 Elasticsearch
  • 如果一定要使用 MySQL 的话,可以采用分区表、表交替、分库分表、批量操作几个优化方案。

2,分区表

(1)最简单的优化方案就是使用分区表,因为延迟消息是一个时效性很强的数据,也就是说,我们完全可以按照发送时间,也就是延迟之后具体的发送时间点来分区。

(2)分区表根据并发量选择按月分、按周分、按天分(例如在并发不是很高的时候,可以按照周来分区。在并发很高的时候,可以按照天来分区)。历史分区就可以直接清理掉。

3,表交替

表交替方案的意思是准备两个表,然后交替写、交替查询。
  • 比如准备两个表 tab_0tab_1,今天用 tab_0,明天用 tab_1。当用 tab_1 的时候就可以直接清空(TRUNCATEtab_0 的数据,反过来也是这样。TRUNCATE 本身很快,所以没什么性能问题。
  • 这种按天交替的方案对延迟时间是有限制的,延迟时间不能超过一天。


4,分库分表

(1)如果并发确实非常高,那么就只能考虑采用分库分表的方案了。这里分库分表也很简单,只需要按照 biz_topic 的名字来分库分表就可以了。而且每一张表可以叠加前面的分区表和交替表的方案,进一步提高性能。

(2)不过这个方案也有一定的隐患,第一个问题就是不同 topic 的并发度不一样,比如说 biz_topic_1 的并发只有 100,而 biz_topic_2 的并发有 10000,那么按照 biz_topic 来分,就会出现不同库不同表的压力差异很大的问题。
  • 要解决这个问题,如果不考虑消息有序性的问题,那么也可以考虑轮询。比如说分库分表是 4 * 8 = 32 张表,那么就可以要求每一个延迟消费者,轮流往这些表里插入数据。因为延迟消息有一个很显著的特点,就是查找的时候只会按照发送时间来找,所以随机插入都没问题。
  • 比如说我有一个消息发送给 biz_topic_1,要求是一分钟后发出去。那么不管这个消息被存在哪个表,延迟发送者都可以找出来,然后转发到biz_topic_1

5,批量操作

(1)我们还可以利用批量操作来减轻 MySQL 的压力。对于延迟消费者来说,它可以消费了一批数据之后再批量插入到数据库里面,然后再提交这一批消息。对于延迟发送者来说,当发送了一批数据之后,再批量把这些消息更新为已发送。

(2)批量操作方案类似于分区设置不同延时时间方案,会使数据一致性问题更加严重,不过只需要消费者做到幂等就可以了。因为本质上,这里的一致性问题要么是因为延迟消费者重复消费,要么就是因为延迟发送者重复发送。但不管是哪个原因,消费者幂等都可以解决问题。
评论

全部评论(0)

回到顶部