返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Kafka的集成与使用详解8(消费者3:并发、批量消费)

作者:hangge | 2020-06-23 08:10
    由于 Kafka 的写性能非常高,因此项目经常会碰到 Kafka 消息队列拥堵的情况。遇到这种情况,我们可以通过并发消费、批量消费的方法进行解决。

八、消费者3:并发、批量消费

1,批量消费

(1)首先我们在项目 application.properties 文件中添加如下配置,一个设置启用批量消费,一个设置批量消费每次最多消费多少条消息记录。
注意:这里设置 max-poll-records 5,并不是说如果没有达到 5 条消息,我们就一直等待。而是说一次 poll 最多返回的记录数为 5
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=5

(2)接着对消费者监听这边代码稍作修改,改成使用 List 来接收:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<ConsumerRecord<String, Object>> records) {
        System.out.println("收到"+ records.size() + "条消息:");
        System.out.println(records);
    }
}
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload List<String> data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
        System.out.println(topics);
        System.out.println(partitions);
        System.out.println(keys);
        System.out.println(tss);
    }
}

(3)我们一次性发送的 23 条数据测试一下:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 23; i++) {
            kafkaTemplate.send("topic3", "message-" + i);
        }
    }
}

(4)控制台输出内容如下:

2,并发消费

(1)为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3
注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。
# 并发数设为3
spring.kafka.listener.concurrency=3

(2)配置完毕后,消费者监听这边不需要修改:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(String data) {
        System.out.println(data);
    }
}
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(List<String> data) {
        System.out.println("收到"+ data.size() + "条消息:");
        System.out.println(data);
    }
}

(3)上面我们设置 concurrency 3,也就是将会启动 3 条线程进行监听。而由于我们创建的 topic 4partition(分区),意味着将有 2 条线程都是分配到 1 partition,还有 1 条线程分配到 2partition。我们可以通过日志看到每条线程分配到的 partition
评论

全部评论(0)

回到顶部