返回 导航

大数据

hangge.com

Kafka - 有序消息的实现方案(消息顺序消费)

作者:hangge | 2024-06-03 09:35
    在消息队列里面,有序消息是指消费者消费某个 topic 消息的顺序,和生产者生产消息的顺序一模一样,它也叫做顺序消息。我在之前的文章(点击查看)中提到 Kafka 并不能保证不同分区之间的顺序。也就是说,如果业务上有先后顺序的消息被发送到不同的分区上,那么我们难以确定哪一个消息会先被消费。
  本文将分别从单分区、和多分区这两种情况讲解如何保证消息有序。
注意:本文方案保障的是业务内有序,而不是全局有序。
  • 比如说在下单的场景下,会产生创建订单消息和完成支付消息。业务上只会要求同一个订单的创建订单消息应该优先于完成支付消息,但是不会要求订单 A 的创建消息需要先于订单 B 的支付消息。
  • 如果我们要求的是全局有序,那除了换更加强大的机器就没别的办法了。

一、单分区实现有序消息

1,方案说明

(1)要保证消息有序,最简单的做法就是让特定的 topic 只有一个分区。这样所有的消息都发到同一个分区上,那么自然就是有序的。

(2)这种只有一个分区的方案性能差,没办法支撑高并发。
  • 对于生产端来说,所有的消息都在一个分区上,也同时意味着所有的消息都发送到了同一个 broker 上,这个服务器很可能撑不住压力;
  • 对于消费端来说,只有一个分区,那么就只能有一个消费者消费,很容易出现消息积压的问题。

2,使用异步消费提升性能

(1)在单分区方案里,最容易遇到的问题就是消息积压,因为你只有一个消费者。在遇到消息积压的情况下,我们可以考虑异步消费。具体逻辑如下:
  • 消费者线程从 Kafka 里获取消息,然后转发到内存队列里面。
    • 在转发的时候,要把同一个业务的消息转发到同一个队列里面。一般来说可以根据业务特征字段计算一个哈希值,比如说直接使用业务 id 作为哈希值。
    • 利用这个哈希值除以工作线程数量,然后取余数,得到对应的内存队列。
  • 然后工作线程从队列里面拿取消息,真正执行业务逻辑。

(2)但这种做法的缺陷就是存在消息未消费的问题。也就是消费线程取出来了,转发到队列之后,工作线程还没来得及处理,消费者整体就宕机了,那么这些消息就存在丢失的可能。

二、多分区实现有序消息

1,方案说明

(1)该方案就是直接扩展为使用多个分区,只需要确保同一个业务的消息发送到同一个分区就可以保证同一个业务的消息是有序的。

(2)要想确保同一个业务的消息都发送到同一个分区,那么只需要发送者自己根据业务特征,直接计算出来一个目标分区。比如说最简单的策略就是根据业务 ID 对分区数量取余,余数就是目标分区。

(3)该方案的优点是足够简单,业务方需要做的改动很小。但是缺点有两个,一个是数据不均匀,另一个是增加分区可能导致消息失序。

2,数据不均匀问题解决

(1)数据不均匀一般是业务造成的。在我们的方案里面,分区是根据业务特征来选择的,那么自然有一些分区有很多数据,有一些分区数据很少。
  • 比如说万一我们不小心把热点用户的消息都发到了同一个分区里面,那么这个分区的 QPS 就会很高,消费者也不一定来得及消费,就可能引起消息积压。

(2)要想解决这个问题,可以通过改进计算目标分区的方式来解决,比如说采用类似于 Redis 中槽和槽分配的机制,又或者说一致性哈希算法。
  • 第一种思路是借鉴 Redis 的槽与槽分配方案。不过 Redis 使用了 16384 个槽,一般的业务用不上那么多槽,所以可以考虑用 1024 个槽。根据业务的特征来计算一个哈希值,再除以 1024 取余就可以得到所在的槽。再根据不同槽的数据多少,合理地把槽分配到不同的分区。最好把槽和分区的绑定关系做成动态的,也就是说我可以随时调整槽到分区的映射关系,保证所有的分区负载都是均匀的。
提示:动态调整槽与分区的绑定关系,可以借助于配置中心来完成。比如说最开始你把槽 1 绑定到分区 2 上,后面在运行的时候你发现分区 2 数据太多,就把槽 1 重新绑定到了分区 3 上。
  • 另外一种思路是使用一致性哈希算法来筛选分区。首先要根据数据分布的整体情况,把分区分布在哈希环上,确保每一个分区上的数据分布大体上是均匀的。如果一部分哈希值上数据较多,就多插入几个分区节点。然后根据业务特征计算一个哈希值,从哈希环上找到对应的分区。

(3)这种槽分配和一致性哈希算法非常适合解决数据或者流量分布不均匀的问题,因为我们总是可以通过手工调整槽的映射关系或者哈希环上节点的分布来保证数据或者流量在每一个节点上的分布大体是均匀的。

3,增加分区引起消息失序问题解决

(1)该方案如果中间有增加新的分区,那么就可能引起消息失序。
  • 比如说最开始 id 3 的订单消息 msg1 发到分区 0 上,但是这时候很不幸分区 0 上积攒了很多消息,所以 msg1 迟迟得不到消费。
  • 紧接着我们扩容,增加了一个新的分区。如果这时候来了一个消息 msg2,那么它会被转发到分区 3 上。分区 3 上面没有积攒什么数据,所以消费者 3 直接就消费了这个消息。
  • 这时候我们发现,本来 msg1 应该先于 msg2 被消费。而增加分区之后 msg2 反而被先消费了。这就是一个典型的消息失序场景。

(2)要解决这个问题也很容易。对于新加入的分区,可以暂停消费一段时间。
  • 比如说在前面的例子中,如果我们估算 msg1 会在一分钟内被消费,那么新加入的分区的消费者可以在三分钟后再开始消费。那么大概率 msg1 就会先于 msg2 消费。
注意:这里的停顿三分钟的前提是你要先把积压的消息消费掉。如果积压的消息还需要三十分钟,那你这里就至少要停顿三十分钟。
  • 不过这种等待的解决方式并不能解决根本问题,只能说是很大程度上缓解了问题。但是本身增加分区也是一个很不常见的操作,再叠加消息失序的概率也很低,所以我们也可以通过监控发现这种失序场景,然后再手工修复一下就可以了。

附:RabbitMQ 和 RocketMQ 有序消息的实现

1,RabbitMQ

RabbitMQ 里面使用的是 queue,因为 RabbitMQ 没有分区的概念。

2,RocketMQ

RocketMQ 里面内置了有序消息的功能,底层原理也基本相似。
评论

全部评论(0)

回到顶部