返回 导航

Docker / K8s

hangge.com

K8s - 使用Strimzi快速搭建Kafka全家桶教程3(Kafka Connect的安装使用)

作者:hangge | 2021-07-05 08:10
    本文通过一个 MQTT Source 的样例,即订阅 MQTT 指定主题消息,并将消息写入到 Kafka 指定主题中,演示如何在 K8s 集群下安装使用 Kafka Connnect

五、Kafka Connect 的安装使用

1,制作自定义的 Kafka Connect 镜像

由于不同项目的业务需求不同,需要的 Connect 也有差异。因此首先我们要根据实际情况制作合适的 Kafka Connect 镜像。

(1)我们可以制作一个包含所有常用 Connect 的镜像。但由于本文演示的是 MQTT -> Kafka,因此只集成 MQTT Connect 即可。这里我使用的是 Confluent 提供的 MQTT Connector,首先访问其官网(点击访问)进行下载。

(2)将压缩包解压后,libs 文件夹里就是我们需要的 jar 包:

(3)在服务器上创建一个 plugins 目录:
mkdir plugins

(4)接着进入 plugins 文件夹创建一个 kafka-connect-mqtt 目录:
cd plugins
mkdir kafka-connect-mqtt

(5)前面解压出来的 libs 文件下的所有 jar 包都上传到 kafka-connect-mqtt 目录下:

(6)退回到与 plugins 文件夹同一级目录位置,执行如下命令创建一个 Dockerfile 文件:
vi Dockerfile

(7)在文件中添加如下内容,然后保存退出。
FROM strimzi/kafka:0.15.0-kafka-2.3.1
USER root:root
COPY ./plugins/ /opt/kafka/plugins/
USER 1001

(8)最后执行如下命令制作镜像,镜像名为 kafka-connect-mqtt
docker build -t kafka-connect-mqtt .

2,部署 Kafka Connect

(1)首先在服务器上创建一个 my-kafka-connect.yaml 文件,内容如下:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  version: 2.5.0
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  image: kafka-connect-mqtt
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status

(2)然后执行如下命令开始创建:
kubectl apply -f my-kafka-connect.yaml -n kafka

(3)执行如下命令可以查看是否创建成功:
kubectl get pods -n kafka
kubectl get service -n kafka

(4)调用 kafka connect 服务的相关接口,查看已安装的 Connectors,可以发现里面确实已经包含了 MqttSinkConnectorMqttSourceConnector,当然本次样例我们只用到了后者:
提示:由于本样例 kafka connect 服务并未暴露端口供 k8s 集群外部访问,所以这里我们随便进入一个 Kafka 服务实例来访问 kafka connect 服务接口。
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connector-plugins

3,添加 Connect

(1)首先我面在服务器上创建一个 mqtt-source.json 文件,内容如下:
提示: 大家根据自己实际情况主要改动高亮部分内容即可(来源 MQTTtopic、目标 Kafkatopic、来源 MQTT 地址、目标 Kafka 地址)。下面配置表示将所有满足 /mqtt/# 主题的消息都写入到 Kafkamy_mqtt 主题中。
{
  "name": "mqtt-source",
  "config": {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.topics" : "/mqtt/#",
    "kafka.topic" : "my_mqtt",
    "mqtt.server.uri" : "tcp://192.168.60.2:1883",
    "confluent.topic.bootstrap.servers": "my-cluster-kafka-bootstrap:9092",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter.schemas.enable" : "false",
    "value.converter.schemas.enable" : "false"
  }
}

(2)然后执行如下命令将这个配置文件提交到 Kafka Connect
cat mqtt-source.json | kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://my-connect-cluster-connect-api:8083/connectors -d @-

(3)最后执行如下命令可以看到这个配置已经添加成功:
kubectl exec -i -n kafka my-cluster-kafka-0 -- curl -X GET http://my-connect-cluster-connect-api:8083/connectors

4,开始测试

(1)首先我们使用 MQTTBox 这个客户端工具往 MQTT 服务器发送一条消息:

(2)接着在 Kafka 这边可以看到 my_mqtt 这个主题里已经出现了这条数据,说明刚添加的 Connect 已经生效。
评论

全部评论(0)

回到顶部