Flume日志采集工具使用详解7(样例5:数据过滤、数据替换)
作者:hangge | 2024-03-14 08:45
当使用 Flume 进行日志采集时,有时候我们需要对采集的数据进行过滤和替换,以便满足特定的需求。本文我将详细介绍如何在 Flume 中使用拦截器进行数据过滤和数据替换功能。
七、数据过滤、数据替换
1,需求说明
- 数据过滤:只分发类型为 A 或者 B 的数据
- 数据替换:将数据中所有日期,无论是"2023-12-15"格式还是"2023/12/15"格式,都统一替换为的"YYYY-MM-DD"格式。
2,配置 Agent
(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf vi example.conf
(2)接着在配置文件中添加如下内容。我们配置 Flume 的源、拦截器和目的地,以实现对 Kafka 数据的过滤、替换、分发。
# 定义 Agent 内部 3 大组件的名称 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 Source 组件源 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = 192.168.60.9:9092 a1.sources.r1.topic = main_topic a1.sources.r1.groupId = flume a1.sources.r1.batchSize = 100 # 配置 Source 组件拦截器 a1.sources.r1.interceptors = i1 i2 i3 # 配置 regex_filter 拦截器只允许type为A或B的Event # excludeEvents 为true表示只留匹配的,为false则表示排除匹配的 a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = "type":"[AB]" a1.sources.r1.interceptors.i1.excludeEvents = false # 配置 search_replace 拦截器将日期替换为统一的"YYYY-MM-DD"格式 a1.sources.r1.interceptors.i2.type = search_replace a1.sources.r1.interceptors.i2.searchPattern = (\\d{4})[-/](\\d{2})[-/](\\d{2}) a1.sources.r1.interceptors.i2.replaceString = $1-$2-$3 # 配置 regex_extractor 拦截器解析type字段作为topic a1.sources.r1.interceptors.i3.type = regex_extractor a1.sources.r1.interceptors.i3.regex = "type":"(\\w+)" a1.sources.r1.interceptors.i3.serializers = s1 a1.sources.r1.interceptors.i3.serializers.s1.name = topic # 配置 Sink 组件 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = 192.168.60.9:9092 # 配置 Channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 将 Source 组件、Sink 组件和 Channel 组件绑定到一起 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3,启动 Agent
(1)配置好 Agent 后就可以使用如下命令启动此 Agent:
参数说明:
- agent:启动一个 Flume 的 Agent 代理。
- --name:指定 Agent 的名字。
- --conf:指定 Flume 配置文件的根目录。
- --conf-file:指定 Agent 配置文件的位置。
bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf
(2)使用上面命令启动成功之后,命令行窗口会被一直占用,因为 Agent 属于一个前台进程。我们也可以通过 nohup 方式启动(注意最后的 &),将 Agent 放到后台执行:
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(3)启动之后,通过“jps -m”命令可以查看到一个 Application 进程,它就是启动的 Agent。
jps -m
(4)或者使用 ps 命令也可以查看到这个进程:
ps -ef|grep flume
4,测试 Agent
(1)我们启动一个 Kafka 生产者,并向主 topic 发送一些数据数据:
kafka-console-producer.sh --broker-list localhost:9092 --topic main_topic
{"type":"A","data":"example data1 for type A","mid":"1001","time":"2023-12-21"} {"type":"A","data":"example data2 for type A","mid":"1002","time":"2023/12/23"} {"type":"B","data":"example data3 for type B","mid":"1003","time":"2023-12-24"} {"type":"B","data":"example data4 for type B","mid":"1004","time":"2023/12/25"} {"type":"C","data":"example data5 for type C","mid":"1005","time":"2023-12-26"} {"type":"D","data":"example data5 for type D","mid":"1005","time":"2023/12/27"}
(2)查看 Kafka 数据可以看到,只有 type 为 A 或者 B 的数据被分发到对应类别的 topic 上,同时时间格式也被替换为统一的格式:
全部评论(0)