SpringBoot - Kafka的集成与使用详解4(生产者2:消息回调、同步异步发送)
作者:hangge | 2020-06-17 08:10
当我们发送消息到 Kafka 后,有时我们需要确认消息是否发送成功,如果消息发送失败,就要重新发送或者执行对应的业务逻辑。下面分别演示如何在异步或者同步发送消息时,获取发送结果。
四、生产者2:消息回调、同步异步发送消息
1,获取异步发送消息的结果
(1)默认情况下 KafkaTemplate 发送消息是采取异步方式,并且 kafkaTemplate 提供了一个回调方法 addCallback,我们可以在回调方法中监控消息是否发送成功或在失败时做补偿处理:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic1", "消息回调测试").addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
}
(2)消息发送成功后,控制台显示内容如下:

2,同步发送消息的结果
(1)默认情况下 KafkaTemplate 发送消息是采取异步方式发送的,如果希望同步发送消息只需要在 send 方法后面调用 get 方法即可,get 方法返回的即为结果(如果发送失败则抛出异常)
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
try {
// 同步发送消息
SendResult<String, Object> sendResult =
kafkaTemplate.send("topic1", "消息回调测试").get();
// 消息发送到的topic
String topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = sendResult.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
} catch (InterruptedException e) {
System.out.println("发送消息失败:" + e.getMessage());
} catch (ExecutionException e) {
System.out.println("发送消息失败:" + e.getMessage());
}
}
}
(2)get 方法还有一个重载方法 get(long timeout, TimeUnit unit),当 send 方法耗时大于 get 方法所设定的参数时会抛出一个超时异常。比如下面我们设置了超时时长为 1 微秒(肯定超时):
注意:虽然超时了,但仅仅是抛出异常,消息还是会发送成功的。
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
try {
// 同步发送消息(并且耗时限制在1ms)
SendResult<String, Object> sendResult =
kafkaTemplate.send("topic1", "消息回调测试").get(1, TimeUnit.MICROSECONDS);
// 消息发送到的topic
String topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = sendResult.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
} catch (TimeoutException e) {
System.out.println("发送消息超时");
} catch (InterruptedException e) {
System.out.println("发送消息失败:" + e.getMessage());
} catch (ExecutionException e) {
System.out.println("发送消息失败:" + e.getMessage());
}
}
}
全部评论(0)