返回 导航

大数据

hangge.com

Flume日志采集工具使用详解5(样例3:日志分发输出至HDFS、拦截器使用)

作者:hangge | 2024-03-12 09:00
    有时系统的日志文件中会包含各种类型的日志信息,为了便于数据后续的处理和分析,我们可以使用拦截器根据数据类型进行数据分发。比如将相同类型的数据输出到同一 HDFS 目录下,下面我将通过样例进行演示。

五、使用拦截器实现日志文件数据分发

1,拦截器说明

(1)系统中已经内置提供了很多 Source Interceptors,常见的有:
  • Timestamp Interceptor:向 event 中的 header 里面添加 timestamp 时间戳信息
  • Host Interceptor:向 event 中的 header 里面添加 host 属性,host 的值为当前机器的主机名或者 ip
  • Search and Replace Interceptor:根据指定的规则查询 Eventbody 里面的数据,然后进行替换,这个拦截器会修改 eventbody 的值,也就是会修改原始采集到的数据内容
  • Static Interceptor:向 event 中的 header 里面添加固定的 keyvalue
  • Regex Extractor Interceptor:根据指定的规则从 Event 中的 body 里面抽取数据,生成 keyvalue,再把 keyvalue 添加到 header
(2)总结一下:
  • Timestamp InterceptorHost InterceptorStatic InterceptorRegex Extractor Interceptor 是向 event 中的 header 里面添加 key-value 类型的数据,方便后面的 channelsink 组件使用,对采集到的原始数据内容没有任何影响。
  • Search and Replace Interceptor 是会根据规则修改 eventbody 里面的原始数据内容,对 header 没有任何影响,使用这个拦截器需要特别小心,因为他会修改原始数据内容。
提示:这里面这几个拦截器中 Search and Replace Interceptor Regex Extractor Interceptor 在工作中使用的比较多一些

2,需求说明

(1)直播 APP 产生的数据包括用户信息、直播信息和送礼信息。这 3 种数据会被记录到同一个日志文件中,具体数据格式如下(这些数据都是 JSON 格式的,可以通过 JSON 中的 type 字段的值来区分数据类型)
{
  "uid": "861848974414839801",
  "nickname": "hangge",
  "usign": "",
  "sex": 1,
  "birthday": "",
  "face": "",
  "big_face": "",
  "email": "hangge@hangge.com",
  "mobile": "",
  "reg_type": "102",
  "last_login_time": "1494344580",
  "regtime": "1494344580",
  "last_update_time": "1494344580",
  "status": "5",
  "is_verified": "0",
  "verified_info": "",
  "is_seller": "0",
  "level": 1,
  "exp": 0,
  "anchor_level": 0,
  "anchor_exp": 0,
  "os": "android",
  "timestamp": 1494344580,
  "type": "user_info"
}
{
  "id": "14943445328940974601",
  "uid": "840717325115457536",
  "lat": "53.530598",
  "lnt": "-2.5620373",
  "hots": 0,
  "title": "0",
  "status": "1",
  "topicId": "0",
  "end_time": "1494344570",
  "watch_num": 0,
  "share_num": "1",
  "replay_url": null,
  "replay_num": 0,
  "start_time": "1494344544",
  "timestamp": 1494344571,
  "type": "video_info"
}
{
  "send_id": "834688818270961664",
  "good_id": "223",
  "video_id": "14943443045138661356",
  "gold": "10",
  "timestamp": 1494344574,
  "type": "gift_record"
}

(2)使用 Flume 按天把日志数据保存到 HDFS 中的对应日期目录下,需要先按“”再按“类型”分目录存储,主要是因为后期的需求大部分都需要按天和类型分别统计分析。

3,配置 Agent

(1)启动 Flume 任务其实就是启动一个 Agent。首先我们进入 flume 程序包中的 conf 文件夹,创建一个 Agent 配置文件:
cd conf
vi example.conf

(2)接着在配置文件中添加如下内容。我们配置 Flume 的源、拦截器和目的地,以实现对日志数据的分发。其中拦截器使用逻辑如下:
  • 先使用 Search and Replace Interceptor 对原始数据中 type 的值进行转换(去掉下划线,转化为驼峰形式)。
  • 然后使用 Regex Extractor Interceptor 指定规则获取 type 字段的值,添加到 header 中。这样在 sink 阶段就可以获取到了。
提示
  • 此时 FlumeSource 可以使用 Exec SourceChannle 可以使用 Memory ChannelSink 可以使用 HDFS Sink
  • 拦截器中 "type":"([^"]+)" 这个正则表达式的含义是匹配字符串 "type":",然后使用 ([^"]+) 来捕获非引号字符的序列,直到遇到下一个引号为止。这样就可以提取出 type 字段的值。
# 定义 Agent 内部 3 大组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels =c1
 
#配置source组件源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/moreType.log
 
# 配置拦截器 [多个拦截器按照顺序依次执行]
a1.sources.r1.interceptors = i1 i2 i3 i4
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info"
a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo"

a1.sources.r1.interceptors.i2.type = search_replace
a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info"
a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo"

a1.sources.r1.interceptors.i3.type = search_replace
a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record"
a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord"

a1.sources.r1.interceptors.i4.type = regex_extractor
a1.sources.r1.interceptors.i4.regex = "type":"([^"]+)"
a1.sources.r1.interceptors.i4.serializers = s1
a1.sources.r1.interceptors.i4.serializers.s1.name = logType
 
#配置channel组件
a1.channels.c1.type = memory
 
#配置sink组件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:9000/moreType/%Y%m%d/%{logType}
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
 
#把组件连接起来
a1.sources.r1.channels =c1
a1.sinks.k1.channel = c1

4,运行测试

(1)我们首先启动 Agent
bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf

(2)接着创建 /usr/local/moreType.log 日志文件,并在里面添加一些测试数据:
{"uid":"861848974414839801","nickname":"hangge","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"hangge@hangge.com","mobile":"","reg_type":"102","last_login_time":"1494344580","regtime":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}
{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}
{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"send_id":"987654321098765432","good_id":"456","video_id":"98765432109876543210","gold":"20","timestamp":1632451202,"type":"gift_record"}
{"uid":"789012345678901234","nickname":"test_user1","usign":"","sex":0,"birthday":"","face":"","big_face":"","email":"test1@example.com","mobile":"","reg_type":"101","last_login_time":"1627367800","regtime":"1627367800","last_update_time":"1627367800","status":"3","is_verified":"1","verified_info":"","is_seller":"1","level":3,"exp":500,"anchor_level":2,"anchor_exp":100,"os":"iOS","timestamp":1627367800,"type":"user_info"}
{"id":"98765432109876543210","uid":"123456789012345678","lat":"40.7128","lnt":"-74.0060","hots":10,"title":"Amazing Sunset","status":"2","topicId":"123","end_time":"1632451200","watch_num":1000,"share_num":"50","replay_url":"https://example.com/replay1","replay_num":500,"start_time":"1632450000","timestamp":1632451201,"type":"video_info"}

(3)最后到 HDFS 上验证结果,可以看到不同的日志数据也都保存在对应日期和类型的目录下了:
评论

全部评论(0)

回到顶部