返回 导航

大数据

hangge.com

DataX - 数据库离线数据采集工具使用详解3(样例2:MySQL与HBase之间数据同步)

作者:hangge | 2024-04-17 09:43

三、实现 MySQL 与 HBase 之间数据同步

1,准备工作

(1)首先我们需要开放 MySQL 的远程访问权限,这样 DataX 可以连接远程机器上的 MySQL 服务。具体操作步骤可以参考我之前写的文章:

(2)接着我们在 MySQL 中创建 useruser2 这两张表,它们的表结构是一样的:
CREATE TABLE `user` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `phone` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
);


CREATE TABLE `user2` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `phone` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

(3)为了能够自动生成一些模拟数据,我们定义一个存储过程:
DELIMITER $$

CREATE PROCEDURE test()
BEGIN
    DECLARE A INT DEFAULT 1;
    
    WHILE (A < 100000) DO
        INSERT INTO `user` VALUES (A, CONCAT("hangge", A), CONCAT("8888", A));
        SET A = A + 1;
    END WHILE;
    
END $$

DELIMITER ;

(4)然后调用这个存储过程:
call test();

(5)可以看到 user 表中已经生成了 99999 条数据:

(6)而在进行数据同步之间,HBase 这边也要创建相应的表。首先执行如下命令启动 HBaseshell 命令行工具:
./bin/hbase shell

(7)接着使用 create 命令创建一个新表(这里表名称为 user,列簇名为 cf
create 'user', 'cf'

2,将 MySQL 数据同步至 HBase 样例

(1)首先,我们创建一个任务 json 文件:
vi /usr/local/datax/job/myjob.json

(2)文件内容如下:
{
    "job": {
       "content": [
          {
             "reader": {
                "name": "mysqlreader",
                "parameter": {
                   "username": "root",
                   "password": "hangge1234",
                   "splitPk": "id",
                   "column": [
                      "*"
                   ],
                   "where": "id < 1001",
                   "connection": [
                      {
                         "jdbcUrl": [
                            "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8"
                         ],
                         "table": [
                            "user"
                         ]
                      }
                   ]
                }
             },
             "writer": {
                "name": "hbase11xwriter",
                "parameter": {
                   "hbaseConfig": {
                     "hbase.zookeeper.quorum":"192.168.60.9:2181"
                   },
                   "table":"user",
                   "mode":"normal",
                   "rowkeyColumn":[ { "index":0,"type":"string"}   ],
                   "column": [
                       { "index":1, "name":"cf:name", "type":"string"},  
                       { "index":2, "name":"cf:phone", "type":"string"}
                   ],
                   "encoding":"utf-8"
                }
             }
          }
       ],
       "setting": {
          "speed": {
             "channel": "5"
          }
       }
    }
 }

(3)接着进入 DataXbin 目录:
cd /usr/local/datax/bin/

(4)执行如下命令启动任务:
python datax.py ../job/myjob.json

(5)任务执行完毕后控制台会显示如下信息:

(6)执行如下命令启动 HBaseshell 命令行工具:
./bin/hbase shell

(7)最后执行如下命令查看 user 表数据,可以看到 1000 条数据都已经同步过来了:
scan 'user'

3,将 HBase 数据同步至 MySQL 样例

(1)接下来我们要将 HBase 新增的这 1000 条数据同步至 MySQLuser2 表中,首先我们将任务 json 文件内容改成如下:
{
    "job": {
       "content": [
          {
             "reader": {
                "name": "hbase11xreader",                                                                                          
                "parameter": {                                                                                                      
                    "hbaseConfig": {                                                                                                
                        "hbase.zookeeper.quorum": "192.168.60.9:2181"                                                             
                    },                                                                                                             
                    "table": "user",                                                                                                  
                    "encoding": "utf-8",                                                                                           
                    "mode": "normal",                                                                                              
                    "column": [                                                                                                    
                        {                                                                                                           
                            "name": "rowkey",                                                                                      
                            "type": "string"                                                                                       
                        },                                                                                                         
                        {                                                                                                           
                            "name": "cf: name",                                                                                     
                            "type": "string"                                                                                       
                        },                                                                                                         
                            {                                                                                                          
                            "name": "cf: phone",                                                                                      
                            "type": "string"                                                                                       
                        }                                                                                                          
                    ],                                                                                                             
                    "range": {                                                                                                      
                        "startRowkey": "",                                                                                         
                        "endRowkey": "",                                                                                           
                        "isBinaryRowkey": true                                                                                     
                    }                                                                                                              
                }        
             },
             "writer": {
                "name": "mysqlwriter",
                "parameter": {
                   "username": "root",
                   "password": "hangge1234",
                   "column": [
                      "*"
                   ],
                   "connection": [
                      {
                         "jdbcUrl": "jdbc:mysql://192.168.60.1:3306/hangge?useUnicode=true&characterEncoding=utf8",
                         "table": [
                            "user2"
                         ]
                      }
                   ],
                   "preSql": [
                      "truncate user2"
                   ],
                   "session": [
                      "set session sql_mode='ANSI'"
                   ],                   
                   "writeMode": "insert"
                }
            }
          }
       ],
       "setting": {
          "speed": {
             "channel": "5"
          }
       }
    }
 }

(2)接着进入 DataXbin 目录:
cd /usr/local/datax/bin/

(3)执行如下命令启动任务:
python datax.py ../job/myjob.json

(4)任务执行完毕后控制台会显示如下信息:

(5)查看数据库 user2 表数据,可以看到这 1000 条数据已经全部从 HBase 同步过来了。
评论

全部评论(0)

回到顶部