K8s - 使用Strimzi快速搭建Kafka全家桶教程3(Kafka Connect的安装使用)
作者:hangge | 2021-07-05 08:10
本文通过一个 MQTT Source 的样例,即订阅 MQTT 指定主题消息,并将消息写入到 Kafka 指定主题中,演示如何在 K8s 集群下安装使用 Kafka Connnect。
五、Kafka Connect 的安装使用
1,制作自定义的 Kafka Connect 镜像
由于不同项目的业务需求不同,需要的 Connect 也有差异。因此首先我们要根据实际情况制作合适的 Kafka Connect 镜像。
(1)我们可以制作一个包含所有常用 Connect 的镜像。但由于本文演示的是 MQTT -> Kafka,因此只集成 MQTT Connect 即可。这里我使用的是 Confluent 提供的 MQTT Connector,首先访问其官网(点击访问)进行下载。
(2)将压缩包解压后,libs 文件夹里就是我们需要的 jar 包:
(3)在服务器上创建一个 plugins 目录:
mkdir plugins
(4)接着进入 plugins 文件夹创建一个 kafka-connect-mqtt 目录:
cd plugins mkdir kafka-connect-mqtt
(5)前面解压出来的 libs 文件下的所有 jar 包都上传到 kafka-connect-mqtt 目录下:
(6)退回到与 plugins 文件夹同一级目录位置,执行如下命令创建一个 Dockerfile 文件:
vi Dockerfile
(7)在文件中添加如下内容,然后保存退出。
FROM strimzi/kafka:0.15.0-kafka-2.3.1 USER root:root COPY ./plugins/ /opt/kafka/plugins/ USER 1001
(8)最后执行如下命令制作镜像,镜像名为 kafka-connect-mqtt:
docker build -t kafka-connect-mqtt .
2,部署 Kafka Connect
(1)首先在服务器上创建一个 my-kafka-connect.yaml 文件,内容如下:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster spec: version: 2.5.0 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 image: kafka-connect-mqtt tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status
(2)然后执行如下命令开始创建:
kubectl apply -f my-kafka-connect.yaml -n kafka
(3)执行如下命令可以查看是否创建成功:
kubectl get pods -n kafka kubectl get service -n kafka
(4)调用 kafka connect 服务的相关接口,查看已安装的 Connectors,可以发现里面确实已经包含了 MqttSinkConnector 和 MqttSourceConnector,当然本次样例我们只用到了后者:
提示:由于本样例 kafka connect 服务并未暴露端口供 k8s 集群外部访问,所以这里我们随便进入一个 Kafka 服务实例来访问 kafka connect 服务接口。
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connector-plugins
3,添加 Connect
(1)首先我面在服务器上创建一个 mqtt-source.json 文件,内容如下:
提示:
大家根据自己实际情况主要改动高亮部分内容即可(来源 MQTT 的 topic、目标 Kafka 的 topic、来源 MQTT 地址、目标 Kafka 地址)。下面配置表示将所有满足 /mqtt/# 主题的消息都写入到 Kafka 的 my_mqtt 主题中。
{ "name": "mqtt-source", "config": { "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max" : "1", "mqtt.topics" : "/mqtt/#", "kafka.topic" : "my_mqtt", "mqtt.server.uri" : "tcp://192.168.60.2:1883", "confluent.topic.bootstrap.servers": "my-cluster-kafka-bootstrap:9092", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter.schemas.enable" : "false", "value.converter.schemas.enable" : "false" } }
(2)然后执行如下命令将这个配置文件提交到 Kafka Connect:
cat mqtt-source.json | kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-
(3)最后执行如下命令可以看到这个配置已经添加成功:
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connectors
4,开始测试
(1)首先我们使用 MQTTBox 这个客户端工具往 MQTT 服务器发送一条消息:
(2)接着在 Kafka 这边可以看到 my_mqtt 这个主题里已经出现了这条数据,说明刚添加的 Connect 已经生效。
全部评论(0)