返回 导航

其他

hangge.com

Kafka Connect的部署和使用详解2(添加connector)

作者:hangge | 2021-05-19 08:29
    前文通过 Kafka 自带的 FileStreamSinkConnectorFileStreamSourceConnector 这两个 Connector 演示了 Kafka Connect 的一些基本用法(点击查看)。市面上的 Kafka Connector 很多,包括开源和商业版本的,大家可以根据情况自行选择。本文通过 MQTT Connector 样例演示如何添加并使用一个新的 Connector

三、添加并使用 Connector

1,安装配置

(1)本次我们仍然使用 standalone 模式启动 Kafka Connect,首先编辑 conf/connect-standalone.properties 文件:
vi conf/connect-standalone.properties

(2)将最后 plugin.path 注释取消,并设置 /opt/connectors 为插件目录(后面 Connector 都会放到该目录下): 

(3)由于本文演示的是 MQTT -> Kafka,因此只集成 MQTT Connect 即可。这里我使用的是 Confluent 提供的 MQTT Connector,首先访问其官网(点击访问)进行下载。

(4)将压缩包解压后,libs 文件夹里就是我们需要的 jar 包:

(5)首先在服务器上创建一个 /opt/connectors 目录:
mkdir /opt/connectors

(6)接着进入该目录再创建一个 kafka-connect-mqtt 目录:
cd /opt/connectors
mkdir kafka-connect-mqtt

(7)将前面解压出来的 libs 文件下的所有 jar 包都上传到 kafka-connect-mqtt 目录下:

(8)在Kafkaconf 目录下创建一个 connect-mqtt-source.properties 文件,内容如下:
提示:大家根据自己实际情况主要改动高亮部分内容即可(来源 MQTTtopic、目标 Kafkatopic、来源 MQTT 地址、目标 Kafka 地址)。下面配置表示将所有满足 /mqtt/# 主题的消息都写入到 Kafkamy_mqtt 主题中。
name=mqtt-source
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
tasks.max=1
mqtt.topics=/mqtt/#
kafka.topic=my_mqtt
mqtt.server.uri=tcp://192.168.60.2:1883
confluent.topic.bootstrap.servers=192.168.60.2:32410
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

2,启动服务

(1)执行如下命令启动 Kafka Connect,注意命令后面要带上相关的配置文件:
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-mqtt-source.properties
./bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-mqtt-source.properties

(2)启动后可以通过 jps 命令检查是否启动成功,如出现下图内容则表示启动成功:
如果提示 jps 找不到命令,可以执行如下命令进行安装:
  • yum install java-1.8.0-openjdk-devel.x86_64

(3)Kafka Connect 提供了 REST API 方便我们去管理(默认端口 8083),我们通过 /connector-plugins 接口可以看到里面确实已经包含了 MqttSinkConnectorMqttSourceConnector

(4)通过 /connectors 接口可以看到 mqtt source 的配置也已经添加成功:

3,开始测试

(1)首先我们使用 MQTTBox 这个客户端工具往MQTT服务器发送一条消息:

(2)接着在 Kafka 这边可以看到 my_mqtt 这个主题里已经出现了这条数据,说明刚添加的 Connect 已经生效。
评论

全部评论(0)

回到顶部