Azkaban - 快速入门教程3(数据仓库应用实践案例)
作者:hangge | 2025-06-12 09:01
四、Azkaban 在数据仓库中的应用
1,需求说明
(1)在企业中构建离线数据仓库时,由于任务数量较多,并且很多任务之间都有依赖关系,所以需要深度使用分布式任务调度系统。
(2)这里以 Azkaban 为例,针对离线数据仓库中的电商 GMV 指标统计进行演示。电商 GMV 指标的整个计算流程比较复杂,涉及多个子任务,不同子任务使用的计算工具也不一样,大致流程如下图所示:
- MySQL → HDFS:需要使用 Sqoop 命令。
- HDFS → 数据仓库 ODS 层:需要使用 Hive 的 Alter 命令。
- 数据仓库 ODS 层 → 数据仓库 DWD 层:需要使用 Hive SQL。
- 数据仓库 DWD 层 → 数据仓库 APP 层:需要使用 Hive SQL。

(3)下面针对这个需求来开发对应的 Azkaban Job 文件。多个 Job 的流程如下图所示:

2,准备工作
(1)首先在 MySQL 中创建一张存储订单原始数据的表 user_order:
CREATE TABLE user_order (
order_id INT,
order_date DATETIME,
user_id INT,
order_money DECIMAL(10, 2),
order_type VARCHAR(255),
order_status VARCHAR(50),
pay_id INT,
update_time TIMESTAMP
);
(2)然后在该表中插入一些测试数据,其中有 5 条记录 order_id 为空:
INSERT INTO user_order (order_id, order_date, user_id, order_money, order_type, order_status, pay_id, update_time) VALUES (null, '2024-02-13 08:30:00', 101, 50.00, 'Online', 'Pending', 201, '2024-02-13 08:35:00'), (2, '2024-02-13 09:15:00', 102, 75.50, 'In-store', 'Processing', 202, '2024-02-13 09:20:00'), (null, '2024-02-13 10:00:00', 103, 120.75, 'Online', 'Completed', 203, '2024-02-13 10:05:00'), (4, '2024-02-13 11:45:00', 104, 90.25, 'Online', 'Shipped', 204, '2024-02-13 11:50:00'), (5, '2024-02-13 12:30:00', 105, 35.50, 'In-store', 'Pending', 205, '2024-02-13 12:35:00'), (null, '2024-02-13 13:15:00', 106, 200.00, 'Online', 'Processing', 206, '2024-02-13 13:20:00'), (7, '2024-02-13 14:00:00', 107, 150.75, 'In-store', 'Completed', 207, '2024-02-13 14:05:00'), (null, '2024-02-13 15:45:00', 108, 80.50, 'Online', 'Shipped', 208, '2024-02-13 15:50:00'), (9, '2024-02-13 16:30:00', 109, 45.25, 'Online', 'Pending', 209, '2024-02-13 16:35:00'), (10, '2024-02-13 17:15:00', 110, 60.00, 'In-store', 'Processing', 210, '2024-02-13 17:20:00'), (11, '2024-02-13 18:00:00', 111, 95.75, 'Online', 'Completed', 211, '2024-02-13 18:05:00'), (12, '2024-02-13 19:45:00', 112, 120.50, 'In-store', 'Shipped', 212, '2024-02-13 19:50:00'), (null, '2024-02-13 20:30:00', 113, 70.25, 'Online', 'Pending', 213, '2024-02-13 20:35:00'), (14, '2024-02-13 21:15:00', 114, 55.00, 'Online', 'Processing', 214, '2024-02-13 21:20:00'), (15, '2024-02-13 22:00:00', 115, 130.75, 'In-store', 'Completed', 215, '2024-02-13 22:05:00'), (16, '2024-02-13 22:45:00', 116, 75.50, 'Online', 'Shipped', 216, '2024-02-13 22:50:00'), (17, '2024-02-13 23:30:00', 117, 40.25, 'In-store', 'Pending', 217, '2024-02-13 23:35:00'), (18, '2024-02-13 23:59:00', 118, 85.00, 'Online', 'Processing', 218, '2024-02-14 00:04:00'), (19, '2024-02-13 14:30:00', 119, 110.75, 'Online', 'Completed', 219, '2024-02-13 14:35:00'), (20, '2024-02-13 15:15:00', 120, 65.50, 'In-store', 'Shipped', 220, '2024-02-13 15:20:00');
(3)接着使用 Hive 的 bin 目录下的 hive 脚本即可启动 hive 客户端:
bin/hive
(4)然后创建 hive 数据库,具体包括 ODS 层、DWD 层、APP 层这三个层级数据库:
(5)然后创建各个层级数据库所用到的 hive 表:
(2)在 collect.job 中需要使用到 collect_mysql.sh 这个脚本。这个脚本里面是 Sqoop 的采集数据命令,脚本内容如下:
(3)在 collect_mysql.sh 脚本中,还需要用到 sqoop_collect_data_util.sh 脚本,脚本内容如下:
(2)在 ods.job 中,需要使用到 ods_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表中添加分区的命令,脚本内容如下:
(3)在 ods_mall_add_partition.sh 脚本中还需要用到 add_partition.sh 脚本。add_partition.sh 脚本的内容如下:
(2)在 dwd.job 文件中需要使用到 dwd_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:
(2)在 app.job 中,需要使用到 app_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:

create database ods_mall; create database dwd_mall; create database app_mall;
(5)然后创建各个层级数据库所用到的 hive 表:
- ods_user_order 表存放从 MySQL 到 HDFS 的原始订单数据:
create external table ods_mall.ods_user_order(
order_id int,
order_date string,
user_id int,
order_money decimal(10, 2),
order_type string,
order_status string,
pay_id int,
update_time timestamp
)
partitioned by(dt string)
row format delimited
fields terminated by '\t'
location '/data/ods/user_order';
- dwd_user_order 表存放的是 ods_user_order 经过清洗后的订单数据:
create external table dwd_mall.dwd_user_order(
order_id int,
order_date string,
user_id int,
order_money decimal(10, 2),
order_type string,
order_status string,
pay_id int,
update_time timestamp
)
partitioned by(dt string)
row format delimited
fields terminated by '\t'
location '/data/dwd/user_order';
- app_gmv 表存放的是通过对 dwd_user_order 表数据进行汇总计算得到的最终总成交额数据:
create external table app_mall.app_gmv ( gmv decimal(10, 2) ) partitioned by(dt string) location '/data/app/app_gmv';
3,创建 collect.job 文件
(1)在 collect.job 中使用 Sqoop 命令将 MySQL 中的数据导入 HDFS,文件内容如下:
# collect.job # 将MySQL中的数据导入HDFS type=command command=sh /data/soft/warehouse_job/collect_mysql.sh
(2)在 collect.job 中需要使用到 collect_mysql.sh 这个脚本。这个脚本里面是 Sqoop 的采集数据命令,脚本内容如下:
# 每天凌展执行一次
# 默认获取昨天的日期,也支持传参指定一个自期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
# 转换日期格式,将 20240201 转换为 2024-02-01
dt_new=`date +%Y-%m-%d --date="${dt}"`
# Hive SQL语句
user_order_sql="select order_id,order_date,user_id,order_money,order_type,order_status,pay_id,update_time from user_order where order_date > '${dt_new} 00:00:00' and order_date <= '${dt_new} 23:59:59'"
# 路径前缀
path_prefix="hdfs://node1:9000/data/ods"
# 输出路径
user_order_path="${path_prefix}/user_order/${dt}"
# 采集数据
echo ”开始采集...”
echo "采集表:user_order"
sh /data/soft/warehouse_job/sqoop_collect_data_util.sh "${user_order_sql}" "${user_order_path}"
echo "结束采集..."
(3)在 collect_mysql.sh 脚本中,还需要用到 sqoop_collect_data_util.sh 脚本,脚本内容如下:
#!/bin/bash
# 将MySQL中的数据导入HDFS
if [ $# !=2 ]
then
echo "参数异常:sqoop_collect_data_util.sh <sql><hdfs_path>"
exit 100
fi
# 查询指定日期范围订单的SQL语句
# 例如:select id,name from user where id >1
sql=$1
# 导入HDFS的路径
hdfs_path=$2
sqoop import \
--connect jdbc:mysql://192.168.60.1:3306/hangge?serverTimezone=Asia/Shanghai \
--username root \
--password hangge1234 \
--target-dir "${hdfs_path}" \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query "${sql}"' and $CONDITIONS' \
--null-string '\\N' \
--null-non-string '\\N'
4,创建 ods.job 文件
(1)在 ods.job 中,使用 Hive 的 Alter 命令将 HDFS 中的数据关联到数据仓库 ODS 层的 Hive 外部表中,文件内容如下:
# ods.job # 关联ODS层的数据 type=command dependencies=collect command=sh /data/soft/warehouse_job/ods_mall_add_partition.sh
(2)在 ods.job 中,需要使用到 ods_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表中添加分区的命令,脚本内容如下:
#!/bin/bash
# 给ODS层的表添加分区,这个脚本每天执行一次
# 每天凌晨添加昨天的分区,添加完分区之后再执行后面的计算脚本
# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
sh /data/soft/warehouse_job/add_partition.sh ods_mall.ods_user_order ${dt} ${dt}
(3)在 ods_mall_add_partition.sh 脚本中还需要用到 add_partition.sh 脚本。add_partition.sh 脚本的内容如下:
#!/bin/bash
# 给外部分区表添加分区
# 接收3个参数
#1: 表名
#2: 分区字段dt的值:格式20240101
#3: 分区路径(相对路径或者绝对路径都可以)
if [ $# != 3 ]
then
echo "参数异常:add_partition,sh <tabkle_name><dt><path>"
exit 100
fi
table_name=$1
dt=$2
path=$3
hive -e "
alter table ${table_name} add if not exists partition(dt='${dt}') location '${path}';
"
5,创建 dwd.job 文件
(1)在 dwd.job 文件中使用 Hive SQL 将数据仓库 ODS 层中清洗之后(去除 order_id 为空的数据)的结果数据添加到 DWD 层的 Hive 外部表中。文件内容如下:# dwd.job # 生成DWD层的数据 type=command dependencies=ods command=sh /data/soft/warehouse_job/dwd_mall_add_partition.sh
(2)在 dwd.job 文件中需要使用到 dwd_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:
#!/bin/bash
# 基于ODS层的表进行清洗,将清洗之后的数据添加到DWD层对应表的对应分区中
#每天凌展执行一次
# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
insert overwrite table dwd_mall.dwd_user_order partition(dt='${dt}') select
order_id,
order_date,
user_id,
order_money,
order_type,
order_status,
pay_id,
update_time
from ods_mall.ods_user_order
where dt = '${dt}' and order_id is not null;
"
6,创建 app.job 文件
(1)在 app.job 文件中,使用 Hive SQL 对数据仓库 DWD 层中的数据进行统计,并将最终结果添加到 APP 层的 Hive 外部表中。文件内容如下:
# app.job # 生成APP层的数据 type=command dependencies=dwd command=sh /data/soft/warehouse_job/app_mall_add_partition.sh
(2)在 app.job 中,需要使用到 app_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:
#!/bin/bash
# 需求:电商GMV
# 每天凌晨执行一次
# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
insert overwrite table app_mall.app_gmv partition(dt='${dt}') select
sum(order_money) as gmv
from dwd_mall.dwd_user_order
where dt = '${dt}';
"
7,创建项目并上传任务
(1)我们首先在 Azkaban 中创建一个名为 gmv_calc 的项目:

(2)然后我们将前面生成的 4 个 Job 文件打包成一个压缩包 gmv_calc.zip,并上传这个压缩包。


8,执行测试
(1)上传成功之后可以看到任务之间的依赖关系,然后点击“Execute Flow”按钮执行任务:
提示:我们可以选择立刻执行或者指定时间定时执行,具体可以参考我上一篇文章(点击查看)

(2)执行完毕后我们在“Job List”中可以看到该项目中 4 个子任务的执行顺序和执行状态等信息:

(3)我们检查下各个任务的数据是否都正常生成,使用 Hive 的 bin 目录下的 hive 脚本即可启动 hive 客户端:
bin/hive
(4)首先查看 ods_mall.ods_user_order 数据,可以看到 MySQL 中的 20 条原始订单数据全部都采集过来了:
select * from ods_mall.ods_user_order;
(5)接着查看 dwd_mall.dwd_user_order 数据,可以看到将原始订单数据清洗后得到 15 条数据:
select * from dwd_mall.dwd_user_order;
(6)最后查看 app_mall.app_gmv 数据,可以看到最终的日成交金额汇总:
select * from app_mall.app_gmv;

全部评论(0)