返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Kafka的集成与使用详解10(消费者5:消息过滤器)

作者:hangge | 2020-06-26 08:10
    消息过滤器可以让消息在抵达监听容器前被拦截,过滤器根据系统业务逻辑去筛选出需要的数据交由 KafkaListener 处理,不需要的消息则会过滤掉。

十、消费者5:消息过滤器

1,配置消息过滤器

    配置消息过滤器十分简单,只需要为监听容器工厂配置一个 RecordFilterStrategy(消息过滤策略),返回 true 的时候消息将会被抛弃,返回 false 时,消息则能正常抵达监听容器。
@Configuration
public class KafkaInitialConfiguration {

    // 监听器工厂
    @Autowired
    private ConsumerFactory consumerFactory;

    // 配置一个消息过滤策略
    @Bean
    public ConcurrentKafkaListenerContainerFactory myFilterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略(将消息转换为long类型,判断是奇数还是偶数,把所有奇数过滤,监听器只接收偶数)
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                long data = Long.parseLong((String) consumerRecord.value());
                if (data % 2 == 0) {
                    return false;
                }
                //返回true将会被丢弃
                return true;
            }
        });
        return factory;
    }
}

2,使用消息过滤器

    带有消息过滤策略的容器工厂注册好之后,将这个容器工厂的 BeanName 放到 @KafkaListener 注解的 containerFactory 属性里面。这样消息在抵达监听器之前,会先进行过滤。
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, containerFactory = "myFilterContainerFactory")
    public void listen1(String data) {
        System.out.println(data);
    }
}

3,开始测试

(1)编写测试方法,连续发送 5 条消息(消息内容分别是 0 4 这个 5 个数字):
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        for (int i = 0; i < 5; i++) {
            kafkaTemplate.send("topic3", String.valueOf(i));
        }
    }
}

(2)看一下监听器的消费情况,可以发现监听器只消费了偶数(奇数已经被过滤器过滤掉了):
评论

全部评论(0)

回到顶部