Docker - 使用kafka-lenses一键搭建Kafka环境套件2(Kafka Connect的使用)
作者:hangge | 2021-05-26 08:10
前文提到 Lenses 包含了 30 多种 Kafka Connect 方便我们使用。本文通过一个样例(从 MQTT 订阅消息,并将读取到数据并导入到 Kafka 中)演示如何使用 Kafka Connect。
二、Kafka Connect
1,准备工作
由于本次演示样例的 Sources 是 MQTT,所有首先我们需要准备一个 MQTT 服务器,具体搭建方法参考我之前写的文章:
2,创建 Kafka Connect
(1)使用浏览器访问 http://ip:3030,点击 CONNECTORS 面板的 ENTER 按钮。
(2)在进入的页面里我们可以直接配置 Kafka Connector,点击 NEW 按钮后,点击右侧的 Sources 栏下的 MQTT 开始创建一个 MQTT Connect:
(3)在 PROPERTIES 文本框中输入如下内容,表示将所有满足 /+ 主题的消息都写入到 Kafka 的 my_mqtt 主题中,然后点击右下角 CREATE 创建。
name=MqttSourceConnector connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector tasks.max=1 connect.mqtt.kcql=INSERT INTO my_mqtt SELECT * FROM /+ connect.mqtt.service.quality=1 connect.mqtt.hosts=tcp://192.168.60.165:1883
(4)可以看到这个 Connector 已经创建成功。
3,开始测试
(1)首先我们使用 MQTTBox 这个客户端工具往 MQTT 服务器发送一条消息:
(2)接着在 Kafka 这边可以看到 my_mqtt 这个主题里已经出现了这条数据,说明刚才创建的 Kafka Connect 已经生效。
全部评论(0)