返回 导航

大数据

hangge.com

Kafka - 消息积压问题解决方案

作者:hangge | 2024-06-05 08:15

一、基本介绍

1,什么是消息积压?

    消息积压是指消息生产速率大于消费速率,所以消息会在 broker 上存放着。消息积压可能会导致消息要等很久才会被消费,这对于一些业务来说损害很大。特别是一些对消息消费时效性有要求的业务,几乎不能容忍任何程度的消息积压。

2,消息积压的原因

(1)消息积压也可以看作是分区数量不足引发的问题。在 Kafka 里面一个分区只能有一个消费者,但是一个消费者可以同时消费多个分区。
  • 比如我们有 N 个分区,那么最多只有 N 个消费者,这个时候再增加消费者已经不能提高消费速率了。如果不足 N 个消费者,那么就会有一些消费者同时从多个分区里面拉取数据。
(2)这种设计导致我们不能无限制地增加消费者来解决消息积压问题。反过来说,但凡没这种限制,也就没有消息积压这回事了。

3,临时性积压与永久性积压

(1)消息积压首先要看是临时性积压还是永久性积压。临时性积压是指突如其来的流量,导致消费者一时半会跟不上。而永久性积压则是指消费者的消费速率本身就跟不上生产速率。

(2)如果是临时性积压,并且评估最终消费者处理完积压消息的时间是自己能够接受的,那么就不需要解决。比如说偶发性的消息积压,需要半个小时才能处理完毕,而我完全等得起半小时,就不需要处理。

(3)但要是接受不了,又或者是永久性积压。就要尝试使用下面的方案进行解决。

二、消息积压的解决方案

1,增加消费者

(1)解决消息积压问题最简单的办法就是增加消费者,增加到和分区数量一样。
(2)不过大部分人在遇到消息积压问题的时候,消费者数量都已经和分区数量一样了。这时我们就要采用下面的一些方法了。

2,增加分区

(1)解决消息积压问题另外一种做法就是增加分区,比如说直接增加好几个分区。

(2)要确定新的分区数量理论上的最佳做法肯定是通过压测来确定的。不过实践中还是很少用的,因为在测试环境测了也不一定准,而且我们也不敢去生产环境测试。因此最简单的做法就是用平均生产者速率除以单一消费者的消费速率。
  • 比如说所有的生产者合并在一起,QPS3000。而一个消费者处理的 QPS200,那么 3000 / 200 = 15。也就是说我们需要 15 个分区。进一步考虑业务增长或者突发流量,可以使用 18 个或者 20 个。
为什么用平均生产者速率而不是峰值速率?
  • 这是因为本身消息队列就承担着削峰的功能,所以在业务巅峰可能会有少量的消息积压,但是这是正常现象,所以可以不考虑。当然,如果有钱有资源,那么就可以按照生产峰值速率来算。

3,创建新 topic

(1)如果我们不能增加分区的话,我们也可以准备一个新的 topic,这个 topic 会有更多的分区。前期消费老的 topic,同时也消费新的 topic。等老的 topic 上的数据都消费完毕之后,就可以完全切换到新的 topic了。

(2)这种做法还有一个变种,就是当我们创建了新 topic 之后,启动几个消费者,这些消费者会把老的 topic 上的消息转发到新的 topic 里面。同时启动消费者消费新的 topic

4,优化消费者性能

(1)在不能增加消费者数量也不能增加分区数量的时候,还可以考虑提高消费者的消费速率,也就是优化消费者性能。
(2)优化的思路大体上有两种:
  • 把消费者部署在更好的实例上,这属于花钱买性能。
  • 优化消费者的消费逻辑,这跟业务密切相关,本质上是一个性能优化的问题。
优化消费者的性能的一个案例:
  • 早期我在公司的时候就优化过消费者的性能。我们的业务逻辑也不是特别复杂,但是因为考虑同一个业务的消息可能被不同的消费者消费,所以在消费消息的时候引入了一个分布式锁。
  • 但是,实际上我们可以通过主动选择目标分区使相同的业务总是把消息发到同一个分区上,确保同一时间只有一个消费者处理一个业务的消息,这样就可以把分布式锁去掉。它带来的好处就是,当没有分布式锁的时候,也不会有消费者因为等待分布式锁而导致消费速率下降了。

5,消费者降级

(1)我们也可以利用微服务中的降级思路来解决消息积压的问题。但是这个降级本身并不是为了保护系统,而是为了加快消费速率。
  • 比如说如果我们的业务有快慢路径之分,那么可以考虑在消息积压的情况下只执行快路径。

(2)下面是一个具体的案例:
  • 在之前出现消息积压的场景里面,消费者的处理逻辑总体上可以认为就是调用几个接口,计算一个值,然后放到缓存里面,缓存过期时间就是 15 分钟。
  • 这些提前计算出来的结果就是给查询接口使用的,查询接口如果都自己算的话,性能会比较差。
  • 在不触发降级的时候,也就是没有消息积压的时候,就正常算。
  • 但是在消息积压的时候,如果缓存里面有对应的数据,那就不算,否则就重新计算一下。
  • 这种降级逻辑是基于这样一个底层逻辑,就是如果这个数据本身过期时间是十五分钟,那么我即便不更新,用户拿到的无非就是十五分钟以内的一个不准确的数据。这在我们业务上是可以接受的。
  • 而如果缓存不存在了,那么就确实需要重新计算一遍,避免查询接口自己实时计算。
  • 在引入了这种降级策略之后,大概有 1/3 的消息处理逻辑是被降级的。

6,聚合消息与批量操作

(1)这种方式是发送消息的时候在一条消息里面直接带上批量数据,消费者这边也可以借助批量接口一次性处理完。
  • 假如说我们现在有一个业务,是要从数据库里筛选出一批数据,然后针对每一条数据进行处理。处理之后,会发送一条消息到消息队列,例如 msg1(biz_id = 1),消费者再一条条取出来处理。
  • 改造后发送者发送的数据是 msg2(biz_ids=1,2,3,4),消费者可以一次性处理完毕。

(2)这种调整带来了两方面的好处:
  • 一方面自然是消费者借助批量接口处理,速度有数量级的提升;
  • 另外一个则是消息所需的存储空间大大降低,broker 的负载也会降低不少。

(3)这种方式一般适用于消费者可以改造成批量接口的场景,而且可以考虑不改造生产者,只改造消费者。把消费者改造成批量消费、批量提交偏移量。
  • 比如说消费者一次性拉取 100 条消息,构造批量处理请求。在处理成功之后,再提交偏移量。这种批量消费,再批量提交的做法也可以用于异步消费中。

7,异步消费

(1)所谓的异步消费就是指在消费者这边有一个消费者线程,负责从消息队列里面拉取消息。同时拉到的消息会立刻转发给一个线程池,线程池里面会有一些工作线程负责处理消息。

(2)这个方案对生产者毫无影响,但是消费者这边要小心消息丢失的问题。
  • 所谓的消息丢失是指我们的消费者线程取出消息之后,要想继续消费下一条就得先提交当前这一条。这种情况下,就可能会出现一个问题:消费者线程提交了,但是工作者线程还没处理就宕机了。这个时候,因为我们已经提交了,所以就算重启,我们也是从下一条开始消费。

(3)要解决消息丢失,那么就可以考虑使用批量提交的方法。也就是说,消费者线程一次拉取一批消息,比如说 10 条。然后,并不是说立刻提交这 10 条消息,而是直接开启十个线程,并行处理这 10 条消息。等到 10 条消息都处理完毕,再批量提交。

(4)批量提交很容易出现重复消费的问题。在消费者线程拉取了一批消息之后,如果过了一段时间还没提交就宕机了。而当消费者从宕机中恢复过来的时候,就会拉取同一批继续消费。
  • 对于重复消费来说,解决方案也很简单,就是让消费逻辑保证是幂等的。这样,即便宕机导致消息被消费了但是来不及提交,也可以保证在下一次恢复过来的时候,重复处理不会引起什么业务问题。

(5)在批量提交里面,还有一个非常棘手的问题就是一批消息里面部分消息处理失败了怎么办?
  • 在部分失败的情况下,第一种做法是要求工作线程立刻重试,比如说重试三次。

  • 当然我们也可以考虑异步重试。

  • 我们还可以考虑把消费失败的消息丢回消息队列里,后面再轮到它的时候又会被处理,这就相当于重试了。
提示:我们还可以在在消息里面记录一下已经处理过几次了。比如说我们限制只能重试三次,那么三次重试都失败了,就不要再丢回去了。
评论

全部评论(0)

回到顶部