返回 导航

大数据

hangge.com

Maxwell - MySQL实时数据采集工具使用详解(附:采集实时数据至Kafka)

作者:hangge | 2024-04-19 09:02

一、基本介绍

1,什么是 Maxwell?

(1)Maxwell 是由 Zendesk 开源的一个基于 MySQL 数据库的增量日志(Binary Log)解析工具,它可以实时读取 MySQL 增量日志(Binary Log),并生成 JSON 格式的数据,作为生产者将数据发送给 KafkaKinesisRabbitMQRedis 或其他平台的应用程序。

(2)Maxwell 主要提供了以下功能:
  • 支持以“SELECT * FROM Table”的方式实现全量表数据初始化。
  • 支持在主库发生故障后自动恢复增量日志(Binary Log)的位置(GTID)。
  • 支持对数据进行分区,可以解决数据倾斜问题,发送到 Kafka 的数据支持 DatabaseTableColumn 等级别的数据分区。

2,Maxwell 的工作原理

Maxwell 的工作原理也是将自己伪装为 MySQL Slave 节点,接收增量日志(Binary Log)数据并生成 JSON 格式的数据。

3,Maxwell 的架构设计

Maxwell 的架构不复杂,属于一个轻量级的组件,共有 3 个模块:
  • MySQL:监控指定 MySQL 中的增量日志(Binary Log)数据。
  • Filter:支持对库和表的过滤。
  • Producer:一个生产者,负责将读取的数据发送到指定的目的地。

二、安装配置

1,JDK 安装

Maxwell 是基于 Java 语言开发的,所以需要依赖 JDK 环境。我们根据 Maxwell 版本选择安装对应的 JDK
  • Maxwell 1.30 版本开始支持 JDK 11,不支持 JDK 8
  • 1.29.2 版本是最后一个支持 JDK 1.8 的版本。

2,下载安装包

(1)访问 MaxwellGitHub 主页(点击访问),获取下载地址并下载,这里我使用的是 1.29.2 版本:
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

(2)接着将 Maxwell 安装包解压:
tar -zxvf maxwell-1.29.2.tar.gz

(3)最后将解压出来的文件夹拷贝到指定目录(这个可以根据习惯调整目录的位置):
mv maxwell-1.29.2 /usr/local/maxwell

3,修改 Maxwell 的配置文件(可选)

    Maxwell 运行期间需要的参数信息,可以在 Maxwell 安装目录下的 config.properties 文件中配置,也可以在运行期间动态指定。动态指定的方式比较灵活,所以这里暂时不修改配置文件。

三、使用样例

1,样例说明

(1)下面我们使用 MaxwellMySQL 数据库的实时数据采集到 Kafka 中:

(2)在 MySQL 的实时数据(Binary Log)进入 Kafka 后,可以在 Kafka 后对接一个消费者程序,判断收到的数据类型是 INSERTDELETE,还是 UPDATE,这样就可以在第三方应用中实时维护 MySQL 数据了。

2,准备工作

(1)首先我们需要开放 MySQL 的远程访问权限,这样 Maxwell 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:

(2)接着我们还要开启 MySQLBinlog 功能,执行以下命令查看目前 Binlog 的状态:
show global variables like 'log_bin';
  • 如果返回的是 ON,则说明 Binlog 已开启。

vi /etc/my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row
log_bin=/var/lib/mysql/bin-log
log_bin_index=/var/lib/mysql/mysql-bin.index

(3)由于 Maxwell 需要权限来伪装自己为 MySQLSlave 节点,并且还需要向 Maxwell 数据库中写入数据。所以我们执行如下命令为其单独在 MySQL 中创建一个用户,并配置一定的权限。
提示Maxwell 程序在启动时会自动在 MySQL 中创建一个数据库,数据库的名称是 Maxwell
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'admin';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

(4)最后我们在 MySQL 中创建一张 user 表用于测试:
CREATE TABLE `user` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `phone` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

3,启动 Maxwell

(1)进入 Maxwell 目录:
cd /usr/local/maxwell

(2)执行如下命令启动 Maxwell,注意 MySQLKafka 的地址信息根据情况进行修改:
bin/maxwell \
  --user='maxwell' \
  --password='admin' \
  --host='192.168.60.1' \
  --producer=kafka \
  --kafka.bootstrap.servers=192.168.60.9:9092 \
  --kafka_topic=test

4,开始测试

(1)我们首先对 user 表数据进行操作,首先插入两条数据、接着更新一条数据、最后删除一条数据。
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (1, 'hangge', '1234567890');
INSERT INTO `user` (`id`, `name`, `phone`) VALUES (2, 'baidu', '13362623365');
UPDATE `user` SET `name` = '航歌' WHERE `id` = 1;
DELETE FROM `user` WHERE `id` = 2;

(2)查看 Kafkatest 主题的数据变化,说明 Maxwell 正常工作。
Kakfa 中接收到的数据库变更数据是 JSON 格式的,其中的字段说明如下:
  • database:数据库名称。
  • table:表名称。
  • type:数据变更类型。
  • ts:数据变更时间。
  • xid:事务 ID
  • commit:是否提交。
  • data:变更的数据内容。
评论

全部评论(0)

回到顶部