消息中间件 Kafka 介绍与安装教程(使用CentOS环境)
作者:hangge | 2020-02-20 08:10
一、基本概念介绍
1,Kafka 简介
(1)Kafka 是一个由 LinkedIn 开发的分布式消息系统,它于 2011 年年初开源,现在由著名的 Apache 基金会维护与开发。
(2)Kafka 使用 Scala 实现,被用作 KinkedIn 的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作数据流管道和消息系统。
(3)Kafka 是基于消息发布-订阅模式实现的消息系统,其主要设计目标如下所述:
- 消息持久化:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上的数据也能保证常数时间复杂度的访问性能。
- 高吞吐:在廉价的商用机器上也能支持单机每秒 10 万条以上的吞吐量。
- 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序。
- 跨平台:支持不同技术平台的客户端(如 Java、PHP、Python 等)。
- 实时性:支持实时数据处理和离线数据处理。
- 伸缩性:支持水平扩展。
Kafka 的数据时存储是磁盘中的,为什么可以满足每秒百万级别消息的生产和消费?
- Kafka 的数据是放在磁盘中的,而不是存放在内存中,但是 Kafka 利用了磁盘顺序读写速度超过内存随机读写速度这个特性实现了高吞吐量。
2,Kafka 基本概念
- Broker:Kafka 集群包含一个或多个服务器,这些服务器被称为 Broker。
- Topic:逻辑上同 RabbitMQ 的 Queue 队列相似,每条发布到 Kafka 集群的消息都必须有一个 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。)
- Partition:Partition 是物理概念上的分区,为了提供系统吞吐率,在物理上每个 Topic 会分成一个或多个 Partition,每个 Partition 对应一个文件夹(存储对应分区的消息内容和索引文件)。
- Producer:消息生产者,负责生产消息并发送到 Kafka Broker。
- Consumer:消息消费者,向 Kafka Broker 读取消息并处理的客户端。
- Consumer Group:每个 Consumer 属于一个特定的组(可为每个 Consumer 指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。
二、安装步骤
1,安装 Zookeeper
由于 Kafka 需要依赖 ZooKeeper,因此搭建部署 Kafka 之前首先需要准备 ZooKeeper 环境,具体步骤可以参考我之前写的文章:
2,安装 Kafka
(1)首先我们从官网上下载安装包,地址如下:
(2)假设我这里下载的版本为 2.4.0:
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
(3)下载执行如下命令进行解压:
tar zxvf kafka_2.12-2.4.0.tgz -C ./
(4)解压 Kafka 的安装包后,可以看到其目录结构如下:
由于 Kafka 的设计中依赖了 ZooKeeper,所以我们在 bin 和 config 目录中除了看到 Kafka 相关的内容之外,还有 ZooKeeper 相关的内容:
- bin 目录中存放了 Kafka 和 ZooKeeper 的命令行工具,bin 根目录下存放的是适用于 Linux/UNIX 的 shell,而 bin/windows 下存放的则是适用于 Windows 下的 bat。
- config 目录,则用来存放关于 Kafka 与 ZooKeeper 的配置信息。
(5)编辑 config 文件夹下的配置文件 server.properties,修改如下两行默认配置,使得可通过外网连接服务器 Kafka(如果不需要从外部连接,只是通过 localhost 本地连接,则不用修改):
我们还可以在该文件中通过 zookeeper.connect 参数来设置 ZooKeeper 的地址和端口:
- 如果配置文件不特别设置该参数的话,它默认会连接本地 2181 端口的 ZooKeeper。
- 如果需要设置多个 ZooKeeper 节点,可以为这个参数配置多个 ZooKeeper 地址,并用逗号分隔。比如 zookeeper.connect=192.168.1.91:2181,192.168.1.92:2181,192.168.1.93:2181
# 允许外部端口连接 listeners=PLAINTEXT://0.0.0.0:9092 # 外部代理地址(即Kafka主机地址) advertised.listeners=PLAINTEXT://192.168.60.133:9092
(6)最后进入 bin 目录执行如下命令启动 Kafka(启动命令需要指定 Karfka 配置文件的位置):
./kafka-server-start.sh -daemon ../config/server.properties
(7)启动后可以通过 jps 命令检查是否启动成功,如出现图表示启动成功:
如果提示 jps 找不到命令,可以执行如下命令进行安装:
- yum install java-1.8.0-openjdk-devel.x86_64
三、执行命令
1,创建查看 Topic
(1)我们执行如下命令可以创建一个名为 test 的 Topic,该 Topic 包含一个分区和一个 Replica。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(2)创建后可以执行如下命令查看当前的 Topics:
./kafka-topics.sh --list --zookeeper localhost:2181
2,创建消息
注意:如果事先没有使用 kafka-topics 命令来手工创建 Topic,直接使用下面的内容进行消息创建时也会自动创建 Topics。
注意:我们可以尝试输入几行消息,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为 test 的 Topics 中,直到有消费者将其消费掉。
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
(2)接着创建消息消费者。我们打开另一个命令窗口执行如下执行命令启动 Kafka 基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到之前我们在消息生产客户端中发送的消息。
提示:因为 kafka 的消费者默认是消费最新生产的数据,如果想消费之前生产的数据需要添加一个参数 -from-beginning,表示从头消费的意思。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
(3)我们可以再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验 Kafka 对消息的基础处理。
全部评论(0)