SpringBoot - Kafka的集成与使用详解2(手动创建、修改、查询Topic)
作者:hangge | 2020-06-15 08:10
在之前的文章样例中,我们发送消息的时候并没有事先创建相应的 Topic。这是因为 KafkaTemplate 在发送的时候就已经帮我们完成了创建的操作。
(2)项目启动后,使用工具可以看到 Topic 创建成功:![](http://hangge.com/blog_uploads/202006/2020061216532517254.jpg)
(2)重启项目,可以看到分区数已经成功变成了 10:![](http://hangge.com/blog_uploads/202006/2020061216590287834.jpg)
(2)然后在需要的地方注入 AdminClient 即可以查询 Topic 信息,这里我们使用 lambda 表达式遍历输出:
(3)项目启动后访问 /hello 接口,可以看到控制台输出如下内容(里面包含了各个分区的信息等等):
但这样也会存在一些问题,比如这种情况创建出来的 Topic 的 Partition(分区))数永远只有 1 个,也不会有副本,这就导致了我们在后期不能顺利扩展。所以有时我们还是有必要使用代码手动去创建 Topic。
二、手动创建、修改、查询 Topic
1,创建 Topic
(1)使用 @Bean 注解创建 Topic 十分简单,我们可以在项目中新建一个配置类专门用来初始化 topic,代码如下:
@Configuration public class KafkaInitialConfiguration { //创建TopicName为topic.hangge.initial的Topic并设置分区数为8以及副本数为1 @Bean public NewTopic initialTopic() { return new NewTopic("topic.hangge.initial",8, (short) 1); } }
(2)项目启动后,使用工具可以看到 Topic 创建成功:
![](http://hangge.com/blog_uploads/202006/2020061216532517254.jpg)
2,修改 Topic 分区数
(1)如果要修改分区数,只需修改配置值重启项目即可:
注意:修改分区数并不会导致数据的丢失,但是分区数只能增大不能减。
@Configuration public class KafkaInitialConfiguration { //创建TopicName为topic.hangge.initial的Topic并设置分区数为10以及副本数为1 @Bean public NewTopic initialTopic() { return new NewTopic("topic.hangge.initial",10, (short) 1 ); } }
(2)重启项目,可以看到分区数已经成功变成了 10:
![](http://hangge.com/blog_uploads/202006/2020061216590287834.jpg)
3,查询 Topic 信息
(1)首先我们在配置类中注册 AdminClient 这个 Bean:
@Configuration public class KafkaInitialConfiguration { @Value("${spring.kafka.bootstrap-servers}") private String kafkaServers; @Bean public AdminClient adminClient() { Map<String, Object> props = new HashMap<>(); //配置Kafka实例的连接地址 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); return AdminClient.create(props); } }
(2)然后在需要的地方注入 AdminClient 即可以查询 Topic 信息,这里我们使用 lambda 表达式遍历输出:
AdminClient 除了查询 Topic 外,还有如下其他功能:
- 创建 Topic:createTopics(Collection<NewTopic> newTopics)
- 删除 Topic:deleteTopics(Collection<String> topics)
- 罗列所有 Topic:listTopics()
- 查询 Topic:describeTopics(Collection<String> topicNames)
- 查询集群信息:describeCluster()
- 查询 ACL 信息:describeAcls(AclBindingFilter filter)
- 创建 ACL 信息:createAcls(Collection<AclBinding> acls)
- 删除 ACL 信息:deleteAcls(Collection<AclBindingFilter> filters)
- 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
- 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
- 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
- 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
- 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
- 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)
@RestController public class HelloController { @Autowired private AdminClient adminClient; @GetMapping("/hello") public void hello() throws ExecutionException, InterruptedException{ DescribeTopicsResult result = adminClient.describeTopics( Arrays.asList("topic.hangge.initial")); result.all().get().forEach((k,v)->System.out.println("k: "+k+" ,v: "+v.toString()+"\n")); } }
(3)项目启动后访问 /hello 接口,可以看到控制台输出如下内容(里面包含了各个分区的信息等等):
![](http://hangge.com/blog_uploads/202006/2020061217163786139.png)
全部评论(0)