返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Kafka的集成与使用详解2(手动创建、修改、查询Topic)

作者:hangge | 2020-06-15 08:10
    在之前的文章样例中,我们发送消息的时候并没有事先创建相应的 Topic。这是因为 KafkaTemplate 在发送的时候就已经帮我们完成了创建的操作。
    但这样也会存在一些问题,比如这种情况创建出来的 Topic Partition(分区))数永远只有 1 个,也不会有副本,这就导致了我们在后期不能顺利扩展。所以有时我们还是有必要使用代码手动去创建 Topic

二、手动创建、修改、查询 Topic

1,创建 Topic

(1)使用 @Bean 注解创建 Topic 十分简单,我们可以在项目中新建一个配置类专门用来初始化 topic,代码如下:
@Configuration
public class KafkaInitialConfiguration {

    //创建TopicName为topic.hangge.initial的Topic并设置分区数为8以及副本数为1
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("topic.hangge.initial",8, (short) 1);
    }
}

(2)项目启动后,使用工具可以看到 Topic 创建成功:

2,修改 Topic 分区数

(1)如果要修改分区数,只需修改配置值重启项目即可:
注意:修改分区数并不会导致数据的丢失,但是分区数只能增大不能减。
@Configuration
public class KafkaInitialConfiguration {
    
    //创建TopicName为topic.hangge.initial的Topic并设置分区数为10以及副本数为1
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("topic.hangge.initial",10, (short) 1 );
    }
}

(2)重启项目,可以看到分区数已经成功变成了 10

3,查询 Topic 信息

(1)首先我们在配置类中注册 AdminClient 这个 Bean
@Configuration
public class KafkaInitialConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaServers;


    @Bean
    public AdminClient adminClient() {
        Map<String, Object> props = new HashMap<>();
        //配置Kafka实例的连接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        return AdminClient.create(props);
    }
}

(2)然后在需要的地方注入 AdminClient 即可以查询 Topic 信息,这里我们使用 lambda 表达式遍历输出:
AdminClient 除了查询 Topic 外,还有如下其他功能:
  • 创建 TopiccreateTopics(Collection<NewTopic> newTopics)
  • 删除 TopicdeleteTopics(Collection<String> topics)
  • 罗列所有 TopiclistTopics()
  • 查询 TopicdescribeTopics(Collection<String> topicNames)
  • 查询集群信息describeCluster()
  • 查询 ACL 信息describeAcls(AclBindingFilter filter)
  • 创建 ACL 信息createAcls(Collection<AclBinding> acls)
  • 删除 ACL 信息deleteAcls(Collection<AclBindingFilter> filters)
  • 查询配置信息describeConfigs(Collection<ConfigResource> resources)
  • 修改配置信息alterConfigs(Map<ConfigResource, Config> configs)
  • 修改副本的日志目录alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  • 查询节点的日志目录信息describeLogDirs(Collection<Integer> brokers)
  • 查询副本的日志目录信息describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  • 增加分区createPartitions(Map<String, NewPartitions> newPartitions)
@RestController
public class HelloController {

    @Autowired
    private AdminClient adminClient;

    @GetMapping("/hello")
    public void hello() throws ExecutionException, InterruptedException{
        DescribeTopicsResult result = adminClient.describeTopics(
                Arrays.asList("topic.hangge.initial"));
        result.all().get().forEach((k,v)->System.out.println("k: "+k+" ,v: "+v.toString()+"\n"));
    }
}

(3)项目启动后访问 /hello 接口,可以看到控制台输出如下内容(里面包含了各个分区的信息等等):
评论

全部评论(0)

回到顶部