SpringBoot - 动态创建多个Kafka消费分组、多个KafkaTemplate教程
作者:hangge | 2021-06-07 09:47
有时我们程序中的 Kafka 消费者或生产者并不是一开始就定好的,而是需要在程序运行过程中根据情况(比如从数据库读取配置)动态地创建多个消费者、消费者分组(consumer group)进行数据消费,或者动态创建多个生产者(KafkaTemplate)往不同的目标地址生产数据。下面通过样例演示这些功能如何实现。
一、动态创建消费者分组
1,创建监听类
首先我创建一个监听类 MyListener,里面定义一个监听方法并添加 @KafkaListener 注解,方法内容将收到的消息以及当前的 group id 打印出来。
注意:监听类 MyListener 不需要的增加 @Component 注解,因为接下来我们需要动态地进行创建。
public class MyListener {
@KafkaListener(topics = "test_topic")
public void listen(@Payload String data, @Header(KafkaHeaders.GROUP_ID) String groupId) {
System.out.println(groupId + ":" +data);
}
}
2,动态创建消费分组
(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /create 接口时,会动态创建三个消费者分组:
注意:MyListener 这个 Bean 需要添加 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 注解,这样每次请求它的实例,spring 会给返回一个新的实例。
@RestController
@RequestMapping("consumer")
public class ConsumerController {
/**
* 应用程序上行文
*/
@Autowired
ApplicationContext context;
/**
* 监听器容器工厂
*/
@Autowired
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;
/**
* 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中
*/
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* 创建消费者分组
*/
@GetMapping("/create")
public void create() {
//动态创建三个消费者分组
String[] groupIds = {"group-0", "group-1", "group-2"};
for (String groupId : groupIds) {
// 初始化当前消费者分组的配置
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.60.4:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 设置监听器容器工厂
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
// 获取监听类实例
context.getBean(MyListener.class);
}
}
/**
* 停止所有消费监听
*/
@GetMapping("/stop")
public void stop() {
registry.getListenerContainers().forEach(container -> {
//System.out.println(container.getGroupId());
//System.out.println(container.getListenerId());
container.stop();
});
}
/**
* 获取监听类实例
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener() {
return new MyListener();
}
}
- 由于本样例中三个消费者配置只有 group id 不一样,其他都是一样的,我们也可以把公共配置写在 application.properties 配置文件中:
spring.kafka.bootstrap-servers=192.168.60.4:9092 spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 然后加载这个默认配置,再覆盖设置 group id 即可:
@RestController
@RequestMapping("consumer")
public class ConsumerController {
/**
* 应用程序上行文
*/
@Autowired
ApplicationContext context;
/**
* 消费者工厂
*/
@Autowired
ConsumerFactory<Object, Object> cf;
/**
* 监听器容器工厂
*/
@Autowired
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;
/**
* 所有@KafkaListener这个注解所标注的方法都会被注册在这里面中
*/
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* 创建消费者分组
*/
@GetMapping("/create")
public void create() {
//动态创建三个消费者分组
String[] groupIds = {"group-0", "group-1", "group-2"};
for (String groupId : groupIds) {
// 初始化当前消费者分组的配置
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 设置监听器容器工厂
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
// 获取监听类实例
context.getBean(MyListener.class);
}
}
/**
* 停止所有消费监听
*/
@GetMapping("/stop")
public void stop() {
registry.getListenerContainers().forEach(container -> {
//System.out.println(container.getGroupId());
//System.out.println(container.getListenerId());
container.stop();
});
}
/**
* 获取监听类实例
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener() {
return new MyListener();
}
}
3,运行测试
(1)程序启动后,首先访问 /consumer/create 接口初始化三个消费者分组:
(2)接着往 test 这个 topic 发送一条消息,可以看到三个消费者分组实例都收到了数据:
二、动态创建多个 KafkaTemplate
(1)为演示动态创建,这里我定义一个 Controller,当调用其中的 /send 接口时,会动态创建三个 KafkaTemplate,并发送消息:
@RestController
@RequestMapping("producer")
public class ProducerController {
@GetMapping("/send")
public void send() {
// 动态创建三个生产者
List<KafkaTemplate> kafkaTemplates = new ArrayList<>();
String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"};
for (String server : servers) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate kafkaTemplate = new KafkaTemplate(pf, true);
kafkaTemplates.add(kafkaTemplate);
}
//遍历三个生产者发送消息
for (KafkaTemplate kafkaTemplate:kafkaTemplates) {
kafkaTemplate.send("test", "welcome to hangge.com");
}
}
}
- 由于本样例中三个生产者配置只有服务器地址(bootstrap-servers)不一样,其他都是一样的,我们可以把公共配置写在 application.properties 配置文件中:
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- 然后加载这个默认配置,再覆盖设置 bootstrap-servers 即可:
@RestController
@RequestMapping("producer")
public class ProducerController {
/**
* 生产者工厂
*/
@Autowired
ProducerFactory<Object, Object> pf;
@GetMapping("/send")
public void send() {
// 动态创建三个生产者
List<KafkaTemplate> kafkaTemplates = new ArrayList<>();
String[] servers = {"192.168.60.4:9092", "192.168.60.5:9092", "192.168.60.6:9092"};
for (String server : servers) {
Map<String, Object> producerProps = new HashMap<>(pf.getConfigurationProperties());
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
DefaultKafkaProducerFactory dpf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate kafkaTemplate = new KafkaTemplate(dpf, true);
kafkaTemplates.add(kafkaTemplate);
}
//遍历三个生产者发送消息
for (KafkaTemplate kafkaTemplate:kafkaTemplates) {
kafkaTemplate.send("test", "welcome to hangge.com");
}
}
}
全部评论(0)