返回 导航

大数据

hangge.com

Kafka集群安装部署教程2(不依赖ZooKeeper)

作者:hangge | 2024-05-15 08:33
    Kafka 是一个分布式消息系统,我在前文介绍如何结合 ZooKeeper 进行 Kafka 集群的部署(点击查看)。本文接着演示如何在无需依赖 ZooKeeper 的情况下进行 Kafka 集群的部署。

1,基本介绍

(1)我们知道在 Kafka 2.X 版本需要依赖 ZooKeeper,具体来说 ZooKeeper 提供如下作用:
  • 协调与管理ZooKeeper 主要用于协调和管理分布式系统中的各个节点。在旧版 Kafka 中,ZooKeeper 负责管理 Kafka 集群的元数据,包括 Kafka Broker 的信息、Topic 的配置以及分区的分配等。
  • 领导者选举ZooKeeper Kafka 中被用于实现领导者选举。每个分区都有一个领导者(leader),负责处理消息的读写。ZooKeeper 确保当领导者失效时,能够选举新的领导者,保证系统的可用性。
  • 分布式锁Kafka 使用 ZooKeeper 来实现分布式锁,以确保在集群中只有一个 Broker 可以充当 Topic 的分区的领导者。这有助于避免数据一致性问题。

(2)而 Kafka 3.X 版本引入了 KRaftKafka Raft)协议,以取代依赖 ZooKeeper 进行元数据和领导者选举的旧版 Kafka 集群,从而使得新版 Kafka 集群不再需要 ZooKeeper,因此可以独立运行。具体来熟,KRaft 起到了如下作用: 
  • 元数据存储KRaft Kafka 的元数据(例如 Topic 和分区的配置信息)存储在 KRaft 集群中,不再依赖于 ZooKeeper。这简化了 Kafka 集群的架构,使得 Kafka 可以在不需要额外的 ZooKeeper 的情况下运行。
  • 领导者选举KRaft 提供了一种新的领导者选举机制,替代了 ZooKeeper 的角色。KRaft 集群中的每个分区都有多个副本,其中一个是领导者,其余是追随者。KRaft 使用分布式 Raft 协议确保领导者的选举和复制日志的一致性。
  • 强一致性KRaft 在设计上追求强一致性,确保在各种故障情况下仍然能够保持数据的一致性。这使得 Kafka 更适合一些对一致性要求较高的应用场景。

2,准备工作

(1)首先我们准备三台 linux 服务器进行集群搭建,具体信息如下:
主机名 内网IP 外网IP
node1 172.31.4.99 13.211.44.248
node2 172.31.3.72 3.106.170.197
node3 172.31.6.103 3.25.88.196

(2)为方便后续 Kafka 集群的配置,我们在三台服务上均执行如下命令编辑的 host 文件,使得可以通过主机名相互访问:
echo '
172.31.4.99 node1
172.31.3.72 node2
172.31.6.103 node3' >> /etc/hosts

(3)由于 Kafka3.0 不再支持 JDK8,建议安装 JDK11 JDK17,具体安装方法可以参考我之前写的文章:

3,下载 Kafka 安装包

(1)三台服务器均执行如下命令下载 Kafka 安装包,这里我选择是 3.3.1 版本:
wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.12-3.3.1.tgz

(2)接着执行如下命令解压下载下来的压缩:
tar -zxvf kafka_2.12-3.3.1.tgz

(3)最后将解压出来的文件夹移动到合适的位置,这个可以根据个人习惯修改:
mv kafka_2.12-3.3.1 /usr/local/kafka

4,修改 kafka 配置

(1)首先进入 kafka 目录:
cd /usr/local/kafka

(2)编辑 Kraft 协议配置文件:
vi config/kraft/server.properties

(3)node1 服务器的配置文件修改如下:
注意advertised.listeners 指定 kafka 通过代理暴漏的地址,如果都是局域网使用,就配置 PLAINTEXT://:9092 即可。如果需要暴露给外部使用,则需要填写外部代理地址。比如我使用的是云服务器搭建 Kafka 集群,node1 对应的 172.31.4.99 位内部局域网地址,而其外网地址是 13.211.44.248。如果需要从外网访问 Kafka,那么 advertised.listeners 就要配置成 PLAINTEXT://13.211.44.248:9092。其他节点配置同理。
# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093

listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093

advertised.listeners=PLAINTEXT://:9092

(4)node2 服务器的配置文件修改如下:
# The node id associated with this instance's roles
node.id=2

# The connect string for the controller quorum
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093

listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:9093

advertised.listeners=PLAINTEXT://:9092

(5)node3 服务器的配置文件修改如下:
# The node id associated with this instance's roles
node.id=3

# The connect string for the controller quorum
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093

listeners=PLAINTEXT://node3:9092,CONTROLLER://node3:9093
advertised.listeners=PLAINTEXT://:9092

5,格式化存储目录

(1)我们在任意一台 Kafka 服务器上执行一次如下命令即可,用于生成一个唯一的集群 ID
提示:这一个步骤是在安装 kafka 2.0 版本的时候不存在的。
/usr/local/kafka/bin/kafka-storage.sh random-uuid

(2)比如我这里生成的是 I9keF5LZTTOZyMOaD5vh6w

(3)接着三台服务器都要执行如下命令,注意其中的 ID 替换成我们前面生成的:
./bin/kafka-storage.sh format \
-t I9keF5LZTTOZyMOaD5vh6w \
-c ./config/kraft/server.properties

(4)格式化操作完成之后,你会发现在我们定义的 log.dirs 目录下多出一个 meta.properties 文件:

(5)meta.properties 文件中存储了当前的 kafka 节点的 idnode.id),当前节点属于哪个集群:

(6)另外两个节点的 meta.properties 文件内容如下:


6,启动集群

(1)三台服务器均执行如下命令启动 Kafka 集群:
cd /usr/local/kafka
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

(2)启动后可以执行 jps 命令查看是否启动:

 
(3)如果没有启动成功可以查看日志文件查找原因。比如:Kafka 程序默认 jvm 内存为 1G,如果服务器内存太小,就是无法正常启动。我们可以编辑启动脚本:
vi ./bin/kafka-server-start.sh

(4)将 JVM 参数修改为最大 256M 最小 128M,然后保存退出。
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

(5)最后再次启动即可:
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

7,测试 Kafka 集群

(1)比如我们在任意一个节点上创建一个生产者连接 node1 节点的 hangge-test 这个 topic
./bin/kafka-console-producer.sh --broker-list node1:9092 --topic hangge-test

(2)接着再换一个节点创建一个消费者监听 node2 节点的 hangge-test 这个 topic:
./bin/kafka-console-consumer.sh --bootstrap-server node2:9092 --topic hangge-test

(3)生产者这边输入一些数据:

(4)可以看到消费者成功接收到数据,说明集群搭建成功:
评论

全部评论(0)

回到顶部