返回 导航

大数据

hangge.com

Flume日志采集工具使用详解6(样例4:Kafka数据分发、拦截器使用)

作者:hangge | 2024-03-13 08:40

六、使用拦截器实现 Kafka 数据分发

1,需求说明

(1)有时系统中多种类型的数据会被输出到 Kafka 的同一个 Topic 中。如果各种类型的数据混到一块,则会导致在后期处理数据时比较麻烦。为解决这个问题,所有的数据可以全部使用 JSON 格式,并且在 JSON 格式中增加 type 字段以标识数据的类型。这样每一条数据都有自己的类型标识,全部汇聚到 Kafka 的一个 Topic 中之后也是可以区分出来的。
{"type":"A","data":"example data for type A"}
{"type":"B","data":"example data for type B"}
{"type":"A","data":"another example data for type A"}

(2)下面我将使用 FlumeKafka 中的数据进行分发,并通过 Flume 中的拦截器解析数据中的"type"字段,将其作为输出的子 Topic 名称,实现将相同类型的数据分发到同一个 Topic 的目标。

2,配置 Agent

(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf
vi example.conf

(2)接着在配置文件中添加如下内容。我们配置 Flume 的源、拦截器和目的地,以实现对 Kafka 数据的分发。
注意:目的地 Sink 我们没有指定 topic,是因为拦截器通过正则提取出 type 字段内容并添加到 Event Header 中,keytopicSink 会自动使用其值作为目标 topic
# 定义 Agent 内部 3 大组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置 Source 组件源
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.9:9092
a1.sources.r1.topic = main_topic
a1.sources.r1.groupId = flume
a1.sources.r1.batchSize = 100

# 配置 Source 组件拦截器解析type字段作为topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = topic

# 配置 Sink 组件
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.60.9:9092
 
# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 将 Source 组件、Sink 组件和 Channel 组件绑定到一起
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3,启动 Agent

(1)配置好 Agent 后就可以使用如下命令启动此 Agent
参数说明:
  • agent:启动一个 FlumeAgent 代理。
  • --name:指定 Agent 的名字。
  • --conf:指定 Flume 配置文件的根目录。
  • --conf-file:指定 Agent 配置文件的位置。
bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf

(2)使用上面命令启动成功之后,命令行窗口会被一直占用,因为 Agent 属于一个前台进程。我们也可以通过 nohup 方式启动(注意最后的 &),将 Agent 放到后台执行:
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &

(3)启动之后,通过“jps -m”命令可以查看到一个 Application 进程,它就是启动的 Agent
jps -m

(4)或者使用 ps 命令也可以查看到这个进程:
ps -ef|grep flume

4,测试 Agent

(1)我们启动一个 Kafka 生产者,并向主 topic 发送一些数据数据,确保数据中包含"type"字段:
kafka-console-producer.sh --broker-list localhost:9092 --topic main_topic


(2)查看 Kafka 数据可以看到,主 Topic 数据已经被分发到对应类别的 topic 上了:



附:提取内容作为消息 key 值

(1)上面样例中我们从数据 body 中提取 type 值作为消息的 topic,下面配置在此基础上提取 mid 值作为消息的 key,用于分区存储。
# 定义 Agent 内部 3 大组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# 配置 Source 组件源
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.9:9092
a1.sources.r1.topic = main_topic
a1.sources.r1.groupId = flume
a1.sources.r1.batchSize = 100

# 配置 Source 组件拦截器解析type字段作为topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "type":"(\\w+)".*"mid":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = topic
a1.sources.r1.interceptors.i1.serializers.s2.name = key

# 配置 Sink 组件
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.60.9:9092
 
# 配置 Channel 组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# 将 Source 组件、Sink 组件和 Channel 组件绑定到一起
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)我们使用 Kafka 生产者向主 topic 发送一些数据数据:
{"type":"A","data":"example data1 for type A","mid":"1001"}
{"type":"A","data":"example data2 for type A","mid":"1002"}
{"type":"A","data":"example data3 for type A","mid":"1003"}
{"type":"A","data":"example data4 for type A","mid":"1004"}
{"type":"A","data":"example data5 for type A","mid":"1005"}
{"type":"A","data":"example data6 for type A","mid":"1006"}
{"type":"A","data":"example data7 for type A","mid":"1007"}

(3)查看 Kafka 数据可以发现,消息已经包含了指定的 key 了:
评论

全部评论(0)

回到顶部