DataX - 数据库离线数据采集工具使用详解3(样例2:MySQL与HBase之间数据同步)
作者:hangge | 2024-04-17 09:43
三、实现 MySQL 与 HBase 之间数据同步
1,准备工作
(1)首先我们需要开放 MySQL 的远程访问权限,这样 DataX 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:
(2)接着我们在 MySQL 中创建 user 和 user2 这两张表,它们的表结构是一样的:
(3)为了能够自动生成一些模拟数据,我们定义一个存储过程:
CREATE TABLE `user` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) ); CREATE TABLE `user2` ( `id` int(10) NOT NULL AUTO_INCREMENT, `name` varchar(64) DEFAULT NULL, `phone` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) );
(3)为了能够自动生成一些模拟数据,我们定义一个存储过程:
DELIMITER $$
CREATE PROCEDURE test()
BEGIN
DECLARE A INT DEFAULT 1;
WHILE (A < 100000) DO
INSERT INTO `user` VALUES (A, CONCAT("hangge", A), CONCAT("8888", A));
SET A = A + 1;
END WHILE;
END $$
DELIMITER ;
(4)然后调用这个存储过程:
call test();
(5)可以看到 user 表中已经生成了 99999 条数据:

(6)而在进行数据同步之间,HBase 这边也要创建相应的表。首先执行如下命令启动 HBase 的 shell 命令行工具:
./bin/hbase shell
(7)接着使用 create 命令创建一个新表(这里表名称为 user,列簇名为 cf)
create 'user', 'cf'
2,将 MySQL 数据同步至 HBase 样例
vi /usr/local/datax/job/myjob.json
(2)文件内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "hangge1234",
"splitPk": "id",
"column": [
"*"
],
"where": "id < 1001",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8"
],
"table": [
"user"
]
}
]
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum":"192.168.60.9:2181"
},
"table":"user",
"mode":"normal",
"rowkeyColumn":[ { "index":0,"type":"string"} ],
"column": [
{ "index":1, "name":"cf:name", "type":"string"},
{ "index":2, "name":"cf:phone", "type":"string"}
],
"encoding":"utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
(3)接着进入 DataX 的 bin 目录:
cd /usr/local/datax/bin/
(4)执行如下命令启动任务:
python datax.py ../job/myjob.json
(5)任务执行完毕后控制台会显示如下信息:

(6)执行如下命令启动 HBase 的 shell 命令行工具:
./bin/hbase shell
(7)最后执行如下命令查看 user 表数据,可以看到 1000 条数据都已经同步过来了:
scan 'user'
3,将 HBase 数据同步至 MySQL 样例
(1)接下来我们要将 HBase 新增的这 1000 条数据同步至 MySQL 的 user2 表中,首先我们将任务 json 文件内容改成如下:{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "192.168.60.9:2181"
},
"table": "user",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "cf: name",
"type": "string"
},
{
"name": "cf: phone",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "hangge1234",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8",
"table": [
"user2"
]
}
],
"preSql": [
"truncate user2"
],
"session": [
"set session sql_mode='ANSI'"
],
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
(2)接着进入 DataX 的 bin 目录:
cd /usr/local/datax/bin/
(3)执行如下命令启动任务:
python datax.py ../job/myjob.json
(4)任务执行完毕后控制台会显示如下信息:

全部评论(0)