Maxwell - MySQL实时数据采集工具使用详解(附:采集实时数据至Kafka)
作者:hangge | 2024-04-19 09:02
一、基本介绍
1,什么是 Maxwell?
(1)Maxwell 是由 Zendesk 开源的一个基于 MySQL 数据库的增量日志(Binary Log)解析工具,它可以实时读取 MySQL 增量日志(Binary Log),并生成 JSON 格式的数据,作为生产者将数据发送给 Kafka、Kinesis、RabbitMQ、Redis 或其他平台的应用程序。
(2)Maxwell 主要提供了以下功能:
- 支持以“SELECT * FROM Table”的方式实现全量表数据初始化。
- 支持在主库发生故障后自动恢复增量日志(Binary Log)的位置(GTID)。
- 支持对数据进行分区,可以解决数据倾斜问题,发送到 Kafka 的数据支持 Database、Table、Column 等级别的数据分区。
2,Maxwell 的工作原理
Maxwell 的工作原理也是将自己伪装为 MySQL Slave 节点,接收增量日志(Binary Log)数据并生成 JSON 格式的数据。
3,Maxwell 的架构设计
Maxwell 的架构不复杂,属于一个轻量级的组件,共有 3 个模块:
- MySQL:监控指定 MySQL 中的增量日志(Binary Log)数据。
- Filter:支持对库和表的过滤。
- Producer:一个生产者,负责将读取的数据发送到指定的目的地。
二、安装配置
1,JDK 安装
Maxwell 是基于 Java 语言开发的,所以需要依赖 JDK 环境。我们根据 Maxwell 版本选择安装对应的 JDK:
- Maxwell 从 1.30 版本开始支持 JDK 11,不支持 JDK 8。
- 1.29.2 版本是最后一个支持 JDK 1.8 的版本。
2,下载安装包
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
(2)接着将 Maxwell 安装包解压:
tar -zxvf maxwell-1.29.2.tar.gz
(3)最后将解压出来的文件夹拷贝到指定目录(这个可以根据习惯调整目录的位置):
mv maxwell-1.29.2 /usr/local/maxwell
3,修改 Maxwell 的配置文件(可选)
Maxwell 运行期间需要的参数信息,可以在 Maxwell 安装目录下的 config.properties 文件中配置,也可以在运行期间动态指定。动态指定的方式比较灵活,所以这里暂时不修改配置文件。
三、使用样例
1,样例说明
(1)下面我们使用 Maxwell 将 MySQL 数据库的实时数据采集到 Kafka 中:
(2)在 MySQL 的实时数据(Binary Log)进入 Kafka 后,可以在 Kafka 后对接一个消费者程序,判断收到的数据类型是 INSERT、DELETE,还是 UPDATE,这样就可以在第三方应用中实时维护 MySQL 数据了。
2,准备工作
(1)首先我们需要开放 MySQL 的远程访问权限,这样 Maxwell 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:
(2)接着我们还要开启 MySQL 的 Binlog 功能,执行以下命令查看目前 Binlog 的状态:
show global variables like 'log_bin';
- 如果返回的是 ON,则说明 Binlog 已开启。
- 如果返回的是 OFF,则说明 Binlog 没有开启。我们需要修改 MySQL 的“/etc/my.cnf”文件:
vi /etc/my.cnf
- 在 [mysqld] 参数下添加以下配置,然后重启 MySQL数据库:
[mysqld] server_id=1 log-bin=master binlog_format=row log_bin=/var/lib/mysql/bin-log log_bin_index=/var/lib/mysql/mysql-bin.index
(3)由于 Maxwell 需要权限来伪装自己为 MySQL 的 Slave 节点,并且还需要向 Maxwell 数据库中写入数据。所以我们执行如下命令为其单独在 MySQL 中创建一个用户,并配置一定的权限。
提示:Maxwell 程序在启动时会自动在 MySQL 中创建一个数据库,数据库的名称是 Maxwell。
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'admin'; GRANT ALL ON maxwell.* TO 'maxwell'@'%'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
(4)最后我们在 MySQL 中创建一张 user 表用于测试:
CREATE TABLE `user` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) );
3,启动 Maxwell
(1)进入 Maxwell 目录:
cd /usr/local/maxwell
(2)执行如下命令启动 Maxwell,注意 MySQL 和 Kafka 的地址信息根据情况进行修改:
bin/maxwell \ --user='maxwell' \ --password='admin' \ --host='192.168.60.1' \ --producer=kafka \ --kafka.bootstrap.servers=192.168.60.9:9092 \ --kafka_topic=test
4,开始测试
(1)我们首先对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据。
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890'); INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365'); UPDATE `user` SET `name` = '航歌' WHERE `id` = 1; DELETE FROM `user` WHERE `id` = 2;
(2)查看 Kafka 中 test 主题的数据变化,说明 Maxwell 正常工作。
Kakfa 中接收到的数据库变更数据是 JSON 格式的,其中的字段说明如下:
- database:数据库名称。
- table:表名称。
- type:数据变更类型。
- ts:数据变更时间。
- xid:事务 ID。
- commit:是否提交。
- data:变更的数据内容。
全部评论(0)