SpringBoot - Kafka的集成与使用详解6(消费者1:指定topic、partition、offset)
作者:hangge | 2020-06-19 08:10
六、消费者1:指定 topic、partition、offset
1,使用 topics 指定 topic
(1)监听器主要是使用 @KafkaListenter 注解即可,而通过 topics 参数设置监听的 topic(可监听多个,用逗号隔开):
其他参数介绍:id(消费者 ID)、 groupId(消费组 ID)
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(id = "consumer1",groupId = "my-group1", topics = {"topic1","topic2"})
public void listen1(String data) {
System.out.println(data);
}
}
(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边都能够收到:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic1", 0, "key1", "message1");
kafkaTemplate.send("topic1", 1, "key2", "message2");
kafkaTemplate.send("topic2", 0, "key3", "message3");
kafkaTemplate.send("topic2", 1, "key4", "message4");
}
}
2,使用 topicPartitions 指定 topic、parition、offset
(1)topicPartitions 可配置更加详细的监听信息,比如下面代码同样是同时监听 topic1 和 topic2,不同在于这次:- 监听 topic1 的 0 号分区
- 监听 topic2 的 0 号和 1 号分区(其中 1 号分区的初始偏移量为 100)
注意:topics 和 topicPartitions 不能同时使用。
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(id = "consumer1",groupId = "my-group1",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen1(String data) {
System.out.println(data);
}
}
(2)下面分别往这两个 topic 的两个分区发送消息,可以看到消费者这边只会收到指定的消息:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic1", 0, "key1", "message1");
kafkaTemplate.send("topic1", 1, "key2", "message2");
kafkaTemplate.send("topic2", 0, "key3", "message3");
kafkaTemplate.send("topic2", 1, "key4", "message4");
}
}
全部评论(0)