返回 导航

大数据

hangge.com

Azkaban - 快速入门教程3(数据仓库应用实践案例)

作者:hangge | 2025-06-12 09:01

四、Azkaban 在数据仓库中的应用

1,需求说明

(1)在企业中构建离线数据仓库时,由于任务数量较多,并且很多任务之间都有依赖关系,所以需要深度使用分布式任务调度系统。

(2)这里以 Azkaban 为例,针对离线数据仓库中的电商 GMV 指标统计进行演示。电商 GMV 指标的整个计算流程比较复杂,涉及多个子任务,不同子任务使用的计算工具也不一样,大致流程如下图所示:
  • MySQLHDFS:需要使用 Sqoop 命令。
  • HDFS → 数据仓库 ODS 层:需要使用 HiveAlter 命令。
  • 数据仓库 ODS 层 → 数据仓库 DWD 层:需要使用 Hive SQL
  • 数据仓库 DWD 层 → 数据仓库 APP 层:需要使用 Hive SQL

(3)下面针对这个需求来开发对应的 Azkaban Job 文件。多个 Job 的流程如下图所示:

2,准备工作

(1)首先在 MySQL 中创建一张存储订单原始数据的表 user_order
CREATE TABLE user_order (
    order_id INT,
    order_date DATETIME,
    user_id INT,
    order_money DECIMAL(10, 2),
    order_type VARCHAR(255),
    order_status VARCHAR(50),
    pay_id INT,
    update_time TIMESTAMP
);

(2)然后在该表中插入一些测试数据,其中有 5 条记录 order_id 为空:
INSERT INTO user_order (order_id, order_date, user_id, order_money, order_type, order_status, pay_id, update_time)
VALUES
(null, '2024-02-13 08:30:00', 101, 50.00, 'Online', 'Pending', 201, '2024-02-13 08:35:00'),
(2, '2024-02-13 09:15:00', 102, 75.50, 'In-store', 'Processing', 202, '2024-02-13 09:20:00'),
(null, '2024-02-13 10:00:00', 103, 120.75, 'Online', 'Completed', 203, '2024-02-13 10:05:00'),
(4, '2024-02-13 11:45:00', 104, 90.25, 'Online', 'Shipped', 204, '2024-02-13 11:50:00'),
(5, '2024-02-13 12:30:00', 105, 35.50, 'In-store', 'Pending', 205, '2024-02-13 12:35:00'),
(null, '2024-02-13 13:15:00', 106, 200.00, 'Online', 'Processing', 206, '2024-02-13 13:20:00'),
(7, '2024-02-13 14:00:00', 107, 150.75, 'In-store', 'Completed', 207, '2024-02-13 14:05:00'),
(null, '2024-02-13 15:45:00', 108, 80.50, 'Online', 'Shipped', 208, '2024-02-13 15:50:00'),
(9, '2024-02-13 16:30:00', 109, 45.25, 'Online', 'Pending', 209, '2024-02-13 16:35:00'),
(10, '2024-02-13 17:15:00', 110, 60.00, 'In-store', 'Processing', 210, '2024-02-13 17:20:00'),
(11, '2024-02-13 18:00:00', 111, 95.75, 'Online', 'Completed', 211, '2024-02-13 18:05:00'),
(12, '2024-02-13 19:45:00', 112, 120.50, 'In-store', 'Shipped', 212, '2024-02-13 19:50:00'),
(null, '2024-02-13 20:30:00', 113, 70.25, 'Online', 'Pending', 213, '2024-02-13 20:35:00'),
(14, '2024-02-13 21:15:00', 114, 55.00, 'Online', 'Processing', 214, '2024-02-13 21:20:00'),
(15, '2024-02-13 22:00:00', 115, 130.75, 'In-store', 'Completed', 215, '2024-02-13 22:05:00'),
(16, '2024-02-13 22:45:00', 116, 75.50, 'Online', 'Shipped', 216, '2024-02-13 22:50:00'),
(17, '2024-02-13 23:30:00', 117, 40.25, 'In-store', 'Pending', 217, '2024-02-13 23:35:00'),
(18, '2024-02-13 23:59:00', 118, 85.00, 'Online', 'Processing', 218, '2024-02-14 00:04:00'),
(19, '2024-02-13 14:30:00', 119, 110.75, 'Online', 'Completed', 219, '2024-02-13 14:35:00'),
(20, '2024-02-13 15:15:00', 120, 65.50, 'In-store', 'Shipped', 220, '2024-02-13 15:20:00');

(3)接着使用 Hivebin 目录下的 hive 脚本即可启动 hive 客户端:
bin/hive

(4)然后创建 hive 数据库,具体包括 ODS 层、DWD 层、APP 层这三个层级数据库:
create database ods_mall;
create database dwd_mall;
create database app_mall;

(5)然后创建各个层级数据库所用到的 hive 表: 
  • ods_user_order 表存放从 MySQLHDFS 的原始订单数据:
create external table ods_mall.ods_user_order(
    order_id int,
    order_date string,
    user_id int,
    order_money decimal(10, 2),
    order_type string,
    order_status string,
    pay_id int,
    update_time timestamp
)
partitioned by(dt string)
row format delimited
fields terminated by '\t'
location '/data/ods/user_order';
  • dwd_user_order 表存放的是 ods_user_order 经过清洗后的订单数据:
create external table dwd_mall.dwd_user_order(
    order_id int,
    order_date string,
    user_id int,
    order_money decimal(10, 2),
    order_type string,
    order_status string,
    pay_id int,
    update_time timestamp
)
partitioned by(dt string)
row format delimited
fields terminated by '\t'
location '/data/dwd/user_order';
  • app_gmv 表存放的是通过对 dwd_user_order 表数据进行汇总计算得到的最终总成交额数据:
create external table app_mall.app_gmv (
  gmv decimal(10, 2)
)
partitioned by(dt string)
location '/data/app/app_gmv';

3,创建 collect.job 文件

(1)在 collect.job 中使用 Sqoop 命令将 MySQL 中的数据导入 HDFS,文件内容如下:
# collect.job
# 将MySQL中的数据导入HDFS
type=command
command=sh /data/soft/warehouse_job/collect_mysql.sh

(2)在 collect.job 中需要使用到 collect_mysql.sh 这个脚本。这个脚本里面是 Sqoop 的采集数据命令,脚本内容如下:
# 每天凌展执行一次
# 默认获取昨天的日期,也支持传参指定一个自期
if [ "z$1" = "z" ]
then
  dt=`date +%Y%m%d --date="1 days ago"`
else
  dt=$1
fi
 
# 转换日期格式,将 20240201 转换为 2024-02-01
dt_new=`date +%Y-%m-%d --date="${dt}"`
 
# Hive SQL语句
user_order_sql="select order_id,order_date,user_id,order_money,order_type,order_status,pay_id,update_time from user_order where order_date > '${dt_new} 00:00:00' and order_date <= '${dt_new} 23:59:59'"
 
# 路径前缀
path_prefix="hdfs://node1:9000/data/ods"
 
# 输出路径
user_order_path="${path_prefix}/user_order/${dt}"
 
# 采集数据
echo ”开始采集...”
echo "采集表:user_order"
sh /data/soft/warehouse_job/sqoop_collect_data_util.sh "${user_order_sql}" "${user_order_path}"
echo "结束采集..."

(3)在 collect_mysql.sh 脚本中,还需要用到 sqoop_collect_data_util.sh 脚本,脚本内容如下:
#!/bin/bash
# 将MySQL中的数据导入HDFS
if [ $# !=2 ]
then
  echo "参数异常:sqoop_collect_data_util.sh <sql><hdfs_path>"
  exit 100
fi
 
# 查询指定日期范围订单的SQL语句
# 例如:select id,name from user where id >1
sql=$1
 
# 导入HDFS的路径
hdfs_path=$2
 
sqoop import \
--connect jdbc:mysql://192.168.60.1:3306/hangge?serverTimezone=Asia/Shanghai \
--username root \
--password hangge1234 \
--target-dir "${hdfs_path}" \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query "${sql}"' and $CONDITIONS' \
--null-string '\\N' \
--null-non-string '\\N'

4,创建 ods.job 文件

(1)在 ods.job 中,使用 HiveAlter 命令将 HDFS 中的数据关联到数据仓库 ODS 层的 Hive 外部表中,文件内容如下:
# ods.job
# 关联ODS层的数据
type=command
dependencies=collect
command=sh /data/soft/warehouse_job/ods_mall_add_partition.sh

(2)在 ods.job 中,需要使用到 ods_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表中添加分区的命令,脚本内容如下:
#!/bin/bash
# 给ODS层的表添加分区,这个脚本每天执行一次
# 每天凌晨添加昨天的分区,添加完分区之后再执行后面的计算脚本

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
  dt=`date +%Y%m%d --date="1 days ago"`
else
  dt=$1
fi

sh /data/soft/warehouse_job/add_partition.sh ods_mall.ods_user_order ${dt} ${dt}

(3)在 ods_mall_add_partition.sh 脚本中还需要用到 add_partition.sh 脚本。add_partition.sh 脚本的内容如下:
#!/bin/bash
# 给外部分区表添加分区
# 接收3个参数
#1: 表名
#2: 分区字段dt的值:格式20240101
#3: 分区路径(相对路径或者绝对路径都可以)

if [ $# != 3 ]
then
  echo "参数异常:add_partition,sh <tabkle_name><dt><path>"
  exit 100
fi

table_name=$1
dt=$2
path=$3

hive -e "
alter table ${table_name} add if not exists partition(dt='${dt}') location '${path}';
"

5,创建 dwd.job 文件

(1)在 dwd.job 文件中使用 Hive SQL 将数据仓库 ODS 层中清洗之后(去除 order_id 为空的数据)的结果数据添加到 DWD 层的 Hive 外部表中。文件内容如下:
# dwd.job
# 生成DWD层的数据
type=command
dependencies=ods
command=sh /data/soft/warehouse_job/dwd_mall_add_partition.sh

(2)在 dwd.job 文件中需要使用到 dwd_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:
#!/bin/bash
# 基于ODS层的表进行清洗,将清洗之后的数据添加到DWD层对应表的对应分区中
#每天凌展执行一次

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
  dt=`date +%Y%m%d --date="1 days ago"`
else
  dt=$1
fi

hive -e "
insert overwrite table dwd_mall.dwd_user_order partition(dt='${dt}') select
  order_id,
  order_date,
  user_id,
  order_money,
  order_type,
  order_status,
  pay_id,
  update_time
from ods_mall.ods_user_order
where dt = '${dt}' and order_id is not null;
"

6,创建 app.job 文件

(1)在 app.job 文件中,使用 Hive SQL 对数据仓库 DWD 层中的数据进行统计,并将最终结果添加到 APP 层的 Hive 外部表中。文件内容如下:
# app.job
# 生成APP层的数据
type=command
dependencies=dwd
command=sh /data/soft/warehouse_job/app_mall_add_partition.sh

(2)在 app.job 中,需要使用到 app_mall_add_partition.sh 这个脚本。这个脚本中是向 Hive 外部表的指定分区中添加数据的命令。脚本内容如下:
#!/bin/bash
# 需求:电商GMV
# 每天凌晨执行一次

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
  dt=`date +%Y%m%d --date="1 days ago"`
else
  dt=$1
fi

hive -e "
insert overwrite table app_mall.app_gmv partition(dt='${dt}') select
sum(order_money) as gmv
from dwd_mall.dwd_user_order
where dt = '${dt}';
"

7,创建项目并上传任务

(1)我们首先在 Azkaban 中创建一个名为 gmv_calc 的项目:

(2)然后我们将前面生成的 4Job 文件打包成一个压缩包 gmv_calc.zip,并上传这个压缩包。

8,执行测试

(1)上传成功之后可以看到任务之间的依赖关系,然后点击“Execute Flow”按钮执行任务:
提示:我们可以选择立刻执行或者指定时间定时执行,具体可以参考我上一篇文章(点击查看

(2)执行完毕后我们在“Job List”中可以看到该项目中 4 个子任务的执行顺序和执行状态等信息:

(3)我们检查下各个任务的数据是否都正常生成,使用 Hive bin 目录下的 hive 脚本即可启动 hive 客户端:
bin/hive

(4)首先查看 ods_mall.ods_user_order 数据,可以看到 MySQL 中的 20 条原始订单数据全部都采集过来了:
select * from ods_mall.ods_user_order;

(5)接着查看 dwd_mall.dwd_user_order 数据,可以看到将原始订单数据清洗后得到 15 条数据:
select * from dwd_mall.dwd_user_order;

(6)最后查看 app_mall.app_gmv 数据,可以看到最终的日成交金额汇总:
select * from app_mall.app_gmv;
评论

全部评论(0)

回到顶部