SpringBoot - Kafka的集成与使用详解7(消费者2:获取消息头和消息体)
作者:hangge | 2020-06-22 08:10
之前的样例中消费者这边都直接获取消息内容并使用,如果我们还想要获取分区信息、消息头等其他内容的话,有如下两种方式。
七、消费者2:获取消息头和消息体
1,使用 ConsumerRecord 类方式
(1)使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen1(ConsumerRecord<String, Object> record) { //把ConsumerRecord里面所包含的内容打印到控制台中 System.out.println(record); } }
(2)这里我们简单的发送一条消息:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com"); } }
(3)可以看到控制台输出的内容如下:
2,使用注解方式获取
(1)如果我们监听方法需要获取该消息非常多的字段时,也可以通过如下这种注解的方式:@Component public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic3"}) public void listen2(@Payload String data, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { System.out.println(data); System.out.println(topic); System.out.println(partition); System.out.println(key); System.out.println(ts); } }
(2)这里我们简单的发送一条消息:
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com"); } }
(3)可以看到控制台输出的内容如下:
全部评论(0)