SpringBoot - Kafka的集成与使用详解9(消费者4:异常处理器)
作者:hangge | 2020-06-24 08:10
通常来说 KafkaListener 要做的事只是监听 Topic 中的数据并消费,如果在 KafkaListener 中还需要对异常进行 try catch 捕获并处理的话,则会显得代码块非常臃肿不利于维护。
(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:
(2)可以看到异常处理器已经能正常使用了:
好在 spring-kafka 为我们提供了专门的异常处理器(ConsumerAwareListenerErrorHandler),通过异常处理器,我们可以处理 consumer 在消费时发生的异常。
九、消费者4:异常处理器
1,注册异常处理器
(1)注册一个异常处理器就是新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,使用 @Bean 注入(BeanName 默认就是方法名):
@Configuration
public class KafkaInitialConfiguration {
//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer) {
System.out.println("--- 发生消费异常 ---");
System.out.println(message.getPayload());
System.out.println(exception);
return null;
}
};
}
}
(2)当然我们也可以使用下面这种 lambda 表达式写法,简化代码:
@Configuration
public class KafkaInitialConfiguration {
//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler myConsumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("--- 发生消费异常 ---");
System.out.println(message.getPayload());
System.out.println(exception);
return null;
};
}
}
2,使用异常处理器
异常处理器注册好之后,将这个异常处理器的 BeanName 放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器。@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic3"}, errorHandler = "myConsumerAwareErrorHandler")
public void listen1(String data) throws Exception {
throw new Exception("模拟一个异常");
}
}
3,开始测试
(1)编写测试方法,发送一条消息:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic3", "hangge.com");
}
}
(2)可以看到异常处理器已经能正常使用了:

附:批量消费异常处理器
批量消费异常处理器和之前单消息消费异常处理器差不多,上面异常处理器代码可以完全不用改动直接使用,只不过传递过来的数据都是 List 集合方式:
全部评论(0)