返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Kafka的集成与使用详解9(消费者4:异常处理器)

作者:hangge | 2020-06-24 08:10
    通常来说 KafkaListener 要做的事只是监听 Topic 中的数据并消费,如果在 KafkaListener 中还需要对异常进行 try catch 捕获并处理的话,则会显得代码块非常臃肿不利于维护。
    好在 spring-kafka 为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理 consumer 在消费时发生的异常。

九、消费者4:异常处理器

1,注册异常处理器

(1)注册一个异常处理器就是新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,使用 @Bean 注入(BeanName 默认就是方法名):
@Configuration
public class KafkaInitialConfiguration {

    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception,
                                      Consumer<?, ?> consumer) {
                System.out.println("--- 发生消费异常 ---");
                System.out.println(message.getPayload());
                System.out.println(exception);
                return null;
            }
        };
    }
}

(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:
@Configuration
public class KafkaInitialConfiguration {

    //异常处理器
    @Bean
    public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("--- 发生消费异常 ---");
            System.out.println(message.getPayload());
            System.out.println(exception);
            return null;
        };
    }
}

2,使用异常处理器

    异常处理器注册好之后,将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"}, errorHandler = "myConsumerAwareErrorHandler")
    public void listen1(String data) throws Exception {
        throw new Exception("模拟一个异常");
    }
}

3,开始测试

(1)编写测试方法,发送一条消息:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", "hangge.com");
    }
}

(2)可以看到异常处理器已经能正常使用了:

附:批量消费异常处理器

    批量消费异常处理器和之前单消息消费异常处理器差不多,上面异常处理器代码可以完全不用改动直接使用,只不过传递过来的数据都是 List 集合方式:
评论

全部评论(0)

回到顶部