SpringBoot - Kafka的集成与使用详解7(消费者2:获取消息头和消息体)
作者:hangge | 2020-06-22 08:10
之前的样例中消费者这边都直接获取消息内容并使用,如果我们还想要获取分区信息、消息头等其他内容的话,有如下两种方式。
七、消费者2:获取消息头和消息体
1,使用 ConsumerRecord 类方式
(1)使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic3"})
public void listen1(ConsumerRecord<String, Object> record) {
//把ConsumerRecord里面所包含的内容打印到控制台中
System.out.println(record);
}
}
(2)这里我们简单的发送一条消息:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
}
}
(3)可以看到控制台输出的内容如下:

2,使用注解方式获取
(1)如果我们监听方法需要获取该消息非常多的字段时,也可以通过如下这种注解的方式:@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic3"})
public void listen2(@Payload String data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
System.out.println(data);
System.out.println(topic);
System.out.println(partition);
System.out.println(key);
System.out.println(ts);
}
}
(2)这里我们简单的发送一条消息:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
}
}
(3)可以看到控制台输出的内容如下:
全部评论(0)