返回 导航

其他

hangge.com

消息中间件 Kafka 介绍与安装教程(使用CentOS环境)

作者:hangge | 2020-02-20 08:10

一、基本概念介绍

1,Kafka 简介

(1)Kafka 是一个由 LinkedIn 开发的分布式消息系统,它于 2011 年年初开源,现在由著名的 Apache 基金会维护与开发。
(2)Kafka 使用 Scala 实现,被用作 KinkedIn 的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作数据流管道和消息系统。
(3)Kafka 是基于消息发布-订阅模式实现的消息系统,其主要设计目标如下所述:
  • 消息持久化:以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上的数据也能保证常数时间复杂度的访问性能。
  • 高吞吐:在廉价的商用机器上也能支持单机每秒 10 万条以上的吞吐量。
  • 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序。
  • 跨平台:支持不同技术平台的客户端(如 JavaPHPPython 等)。
  • 实时性:支持实时数据处理和离线数据处理。
  • 伸缩性:支持水平扩展。

2,Kafka 基本概念

  • BrokerKafka 集群包含一个或多个服务器,这些服务器被称为 Broker
  • Topic:逻辑上同 RabbitMQ Queue 队列相似,每条发布到 Kafka 集群的消息都必须有一个 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处。)
  • PartitionPartition 是物理概念上的分区,为了提供系统吞吐率,在物理上每个 Topic 会分成一个或多个 Partition,每个 Partition 对应一个文件夹(存储对应分区的消息内容和索引文件)。
  • Producer:消息生产者,负责生产消息并发送到 Kafka Broker
  • Consumer:消息消费者,向 Kafka Broker 读取消息并处理的客户端。
  • Consumer Group:每个 Consumer 属于一个特定的组(可为每个 Consumer 指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。

二、安装步骤

1,安装 Zookeeper

由于 Kafka 需要依赖 ZooKeeper,因此我首先需要准备 ZooKeeper 环境,具体步骤可以参考我之前写的文章:

2,安装 Kafka

(1)首先我们从官网上下载安装包,地址如下:

(2)假设我这里下载的版本为 2.4.0
wget http://mirror.bit.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz

(3)下载执行如下命令进行解压:
tar zxvf kafka_2.12-2.4.0.tgz -C ./

(4)解压 Kafka 的安装包后,可以看到其目录结构如下:
    由于 Kafka 的设计中依赖了 ZooKeeper,所以我们在 bin config 目录中除了看到 Kafka 相关的内容之外,还有 ZooKeeper 相关的内容:
  • bin 目录中存放了 Kafka ZooKeeper 的命令行工具,bin 根目录下存放的是适用于 Linux/UNIX 的 shell,而 bin/windows 下存放的则是适用于 Windows 下的 bat
  • config 目录,则用来存放关于 Kafka ZooKeeper 的配置信息。

(5)编辑 config 文件夹下的配置文件 server.properties,修改如下两行默认配置,使得可通过外网连接服务器 Kafka(如果不需要从外部连接,只是通过 localhost 本地连接,则不用修改):
我们还可以在该文件中通过 zookeeper.connect 参数来设置 ZooKeeper 的地址和端口:
  • 如果配置文件不特别设置该参数的话,它默认会连接本地 2181 端口的 ZooKeeper
  • 如果需要设置多个 ZooKeeper 节点,可以为这个参数配置多个 ZooKeeper 地址,并用逗号分隔。比如 zookeeper.connect=192.168.1.91:2181,192.168.1.92:2181,192.168.1.93:2181
# 允许外部端口连接                                            
listeners=PLAINTEXT://0.0.0.0:9092  
# 外部代理地址(即Kafka主机地址)                                              
advertised.listeners=PLAINTEXT://192.168.60.133:9092

(6)最后进入 bin 目录执行如下命令启动 Kafka(启动命令需要指定 Karfka 配置文件的位置):
./kafka-server-start.sh -daemon ../config/server.properties

(7)启动后可以通过 jps 命令检查是否启动成功,如出现图表示启动成功:
如果提示 jps 找不到命令,可以执行如下命令进行安装:
  • yum install java-1.8.0-openjdk-devel.x86_64

三、执行命令

1,创建查看 Topic

(1)我们执行如下命令可以创建一个名为 test Topic,该 Topic 包含一个分区和一个 Replica
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

(2)创建后可以执行如下命令查看当前的 Topics
./kafka-topics.sh --list --zookeeper localhost:2181

2,创建消息

注意:如果事先没有使用 kafka-topics 命令来手工创建 Topic,直接使用下面的内容进行消息创建时也会自动创建 Topics

(1)首先创建消息生产者。执行如下命令启动 Kafka 基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。
注意:我们可以尝试输入几行消息,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为 test Topics 中,直到有消费者将其消费掉。
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

(2)接着创建消息消费者。我们打开另一个命令窗口执行如下执行命令启动 Kafka 基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到之前我们在消息生产客户端中发送的消息。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

(3)我们可以再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验 Kafka 对消息的基础处理。
评论

全部评论(0)

回到顶部