返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Kafka的集成与使用详解7(消费者2:获取消息头和消息体)

作者:hangge | 2020-06-22 08:10
    之前的样例中消费者这边都直接获取消息内容并使用,如果我们还想要获取分区信息、消息头等其他内容的话,有如下两种方式。

七、消费者2:获取消息头和消息体

1,使用 ConsumerRecord 类方式

(1)使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen1(ConsumerRecord<String, Object> record) {
        //把ConsumerRecord里面所包含的内容打印到控制台中
        System.out.println(record);
    }
}

(2)这里我们简单的发送一条消息:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

(3)可以看到控制台输出的内容如下:


2,使用注解方式获取

(1)如果我们监听方法需要获取该消息非常多的字段时,也可以通过如下这种注解的方式:
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic3"})
    public void listen2(@Payload String data,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        System.out.println(data);
        System.out.println(topic);
        System.out.println(partition);
        System.out.println(key);
        System.out.println(ts);
    }
}

(2)这里我们简单的发送一条消息:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.send("topic3", 0, System.currentTimeMillis(), "key1", "hangge.com");
    }
}

(3)可以看到控制台输出的内容如下:
评论

全部评论(0)

回到顶部