Flume日志采集工具使用详解10(自定义组件:将数据写入到MySQL中)
作者:hangge | 2024-03-20 08:40
十、自定义组件的实现
1,为什么需要自定义组件?
(1)在实际工作中,95% 以上的数据采集需求都是可以直接使用 Flume 内置的组件来实现,但是谁也不敢保证 100% 都能满足,因为什么奇葩的需求都会有:
- 例如:我们想把 flume 采集到的数据输出到 mysql 中,那这个时候就需要有针对 mysql 的 sink 组件了,但是 Flume 中并没有,因为这种需求不常见,往 mysql 中写的都是结构化数据,数据的格式是固定的,但是 flume 采集的一般都是日志数据,这种属于非结构化数据,不支持也是正常的。
(2)为了实现一些特殊的需求,我们可以自己写一个自定义的组件。
2,实现自定义组件的参考资料
(1)我们可以查看 Flume 官方的开发者文档(点击访问):

(2)只不过开发者文档里面目前还不算太完善,但是基本 source、sink 组件的自定义过程在这里都是有的:
注意:自定义 channel 的内容目前还没完善,如果我们确实想自定义这个组件,就需要到 Flume 源码中找到目前支持的那些 channel 的代码,参考着实现我们自定义的 channel 组件。

(3)例如下面是自定义 Sink 的说明文档:

附:通过自定义组件将数据写入到 MySQL 表中
1,创建 mysql 数据库表
首先我们创建一张表 flume2mysql,flume 采集到数据后会将其插入到该表中:
CREATE TABLE flume2mysql ( id INT(11) NOT NULL AUTO_INCREMENT, createTime VARCHAR(64) NOT NULL, content VARCHAR(255) NOT NULL, PRIMARY KEY (id) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
2,创建自定义组件
(1)我们创建一个 Maven 项目,然后编辑项目的 pom.xml 文件,添加 flume 依赖:
(2)编写自定义的 MysqlSink 类:
(3)然后将项目代码打成 jar 包上传到 flume 的 lib 目录下:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
(2)编写自定义的 MysqlSink 类:
package com.hangge;
import org.apache.flume.conf.Configurable;
import org.apache.flume.*;
import org.apache.flume.sink.AbstractSink;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 自定义MysqlSink
*/
public class MysqlSink extends AbstractSink implements Configurable {
private String mysqlurl = "";
private String username = "";
private String password = "";
private String tableName = "";
Connection con = null;
@Override
public Status process(){
Status status = null;
// Start transaction 获得Channel对象
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try
{
Event event = ch.take();
if (event != null)
{
//获取body中的数据
String body = new String(event.getBody(), "UTF-8");
//如果日志中有以下关键字的不需要保存,过滤掉
if(body.contains("delete") || body.contains("drop") || body.contains("alert")){
status = Status.BACKOFF;
}else {
//存入Mysql
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createtime = df.format(new Date());
PreparedStatement stmt = con.prepareStatement("insert into " + tableName
+ " (createtime, content) values (?, ?)");
stmt.setString(1, createtime);
stmt.setString(2, body);
stmt.execute();
stmt.close();
status = Status.READY;
}
}else {
status = Status.BACKOFF;
}
txn.commit();
} catch (Throwable t){
txn.rollback();
t.getCause().printStackTrace();
status = Status.BACKOFF;
} finally{
txn.close();
}
return status;
}
/**
* 获取配置文件中指定的参数
* @param context
*/
@Override
public void configure(Context context) {
mysqlurl = context.getString("mysqlurl");
username = context.getString("username");
password = context.getString("password");
tableName = context.getString("tablename");
}
@Override
public synchronized void start() {
try{
//初始化数据库连接
con = DriverManager.getConnection(mysqlurl, username, password);
super.start();
System.out.println("finish start");
}catch (Exception ex){
ex.printStackTrace();
}
}
@Override
public synchronized void stop(){
try{
con.close();
}catch(SQLException e) {
e.printStackTrace();
}
super.stop();
}
}
(3)然后将项目代码打成 jar 包上传到 flume 的 lib 目录下:

(4)同时还要把 MySQL 的驱动程序包上传到 flume 的 lib 目录下:

3,自定义组件的使用
(1)首先我们执行如下命令启动 Agent:
(2)然后我们创建文件并写入两条数据:
(3)查看 MySQL 的 flume2mysql 表可以看到 Flume 已经成功采集到数据,并写入到表中:
nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/example.conf &
(2)然后我们创建文件并写入两条数据:
mkdir -p /data/flumeData echo "hello hangge" >> /data/flumeData/data.log echo "欢迎访问hangge.com" >> /data/flumeData/data.log
(3)查看 MySQL 的 flume2mysql 表可以看到 Flume 已经成功采集到数据,并写入到表中:
全部评论(0)