数据仓库从0到1之数仓DWD层

从ODS层的数据到DWD层要经过ETL处理,所以在DWD层,主要对数据进行ETL处理和事实表以及维度表的建表。

  1. 对用户行为数据解析。

  2. 对核心数据进行判空过滤。

  3. 对业务数据采用维度模型重新建模。

DWD层用户日志行为解析

日志格式回顾

页面埋点日志

启动日志

启动日志表

启动日志解析思路:启动日志表中每行数据对应一个启动记录,一个启动记录应该包含日志中的公共信息和启动信息。先将所有包含start字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

get_json_object函数解析:

数据:

[{“name”:”大郎”,”sex”:”男”,”age”:”25”},{“name”:”西门庆”,”sex”:”男”,”age”:”47”}]

取出第一个json对象:

select get_json_object(‘[{“name”:”大郎”,”sex”:”男”,”age”:”25”},{“name”:”西门庆”,”sex”:”男”,”age”:”47”}]’,’$[0]’);

结果是:{“name”:”大郎”,”sex”:”男”,”age”:”25”}

取出第一个json的age字段的值:

SELECT get_json_object(‘[{“name”:”大郎”,”sex”:”男”,”age”:”25”},{“name”:”西门庆”,”sex”:”男”,”age”:”47”}]’,”$[0].age”);

结果是:25

这里查询ods_log的日志数据:

1
select * from ods_log where dt='2020-06-15' limit 1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<--! 这里注意,不同用户的json日志格式字段不一样,只有当用户浏览了page才有page字段,启动报错才会有error字段 -->
{
"common":{
"ar":"110000",
"ba":"vivo",
"ch":"wandoujia",
"md":"vivo iqoo3",
"mid":"mid_36",
"os":"Android 11.0",
"uid":"447",
"vc":"v2.1.134"
},
"start":{
"entry":"icon",
"loading_time":12212,
"open_ad_id":3,
"open_ad_ms":2849,
"open_ad_skip_ms":0
},
"ts":1592188403000
}

启动日志表建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`entry` string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
`loading_time` bigint COMMENT '启动加载时间',
`open_ad_id` string COMMENT '广告页ID ',
`open_ad_ms` bigint COMMENT '广告总共播放时间',
`open_ad_skip_ms` bigint COMMENT '用户跳过广告时点',
`ts` bigint COMMENT '时间'
) COMMENT '启动日志表'
PARTITIONED BY (dt string) -- 按照时间创建分区
stored as parquet -- 采用parquet列式存储
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩
;
# 表不需要建立LZO索引,只有纯lzo压缩的表才需要建立索引。而表是parquet格式压缩再用lzo对内部列进行了压缩,所以还是parquet列式存储格式。

说明:数据采用parquet存储方式,是可以支持切片的,不需要再对数据创建索引。如果单纯的text方式存储数据,需要采用支持切片的,lzop压缩方式并创建索引。

关于Hive读取索引文件问题

两种方式,分别查询数据有多少行:

1
2
3
4
5
hive (gmall)> select * from ods_log;
Time taken: 0.706 seconds, Fetched: 2955 row(s)

hive (gmall)> select count(*) from ods_log;
2959

两次查询结果不一致。

原因是select * from ods_log不执行MR操作,默认采用的是ods_log建表语句中指定的DeprecatedLzoTextInputFormat,能够识别lzo.index为索引文件。

select count(*) from ods_log执行MR操作,默认采用的是CombineHiveInputFormat,不能识别lzo.index为索引文件,将索引文件当做普通文件处理。更严重的是,这会导致LZO文件无法切片。

1
2
hive (gmall)> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

解决办法:修改CombineHiveInputFormatHiveInputFormat

再次测试

1
2
3
4
5
6
7
8
hive (gmall)>
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;\

hive (gmall)> select * from ods_log;
Time taken: 0.706 seconds, Fetched: 2955 row(s)

hive (gmall)> select count(*) from ods_log;
2955

页面日志表

页面日志解析思路:页面日志表中每行数据对应一个页面访问记录,一个页面访问记录应该包含日志中的公共信息和页面信息。先将所有包含page字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
drop table if exists dwd_page_log;
CREATE EXTERNAL TABLE dwd_page_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`during_time` bigint COMMENT '持续时间毫秒',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面ID ',
`source_type` string COMMENT '来源类型',
`ts` bigint
) COMMENT '页面日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');

动作日志表

动作日志解析思路:动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含action字段的日志过滤出来,然后通过UDTF函数,将action数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
drop table if exists dwd_action_log;
CREATE EXTERNAL TABLE dwd_action_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`during_time` bigint COMMENT '持续时间毫秒',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面id ',
`source_type` string COMMENT '来源类型',
`action_id` string COMMENT '动作id',
`item` string COMMENT '目标id ',
`item_type` string COMMENT '目标类型',
`ts` bigint COMMENT '时间'
) COMMENT '动作日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_action_log'
TBLPROPERTIES('parquet.compression'='lzo');

UDTF函数定义思路:

补充:hive虚表

常与explode等udtf函数一起使用,将拆分出来的字段与原始表字段进行笛卡尔积得到一张虚表。

https://www.cnblogs.com/qi-yuan-008/p/13584113.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create database test;
use test;
create table if not exists test.test_tb(id string,content string,comment string) row format delimited fields terminated by '\1' stored as textfile;

insert into test.test_tb values('1','Tom,Bob,Andy','测试1,测试2,测试3');
insert into test.test_tb values('2','Jack,Vicent,Wendy','测试11,测试22,测试33');

select
explode(split(content, ','))
from test_tb;

-- tmp_content.split_content 虚表列字段
-- tmp_content 虚表名
select id, tmp_content.split_content, comment
from test_tb lateral view explode(split(content, ',')) tmp_content as split_content;

所以,我们需要把actions里面的元素也给拆分出来,使用虚表的形式拆分出用户多行action记录,

使用java开发一个udtf函数,创建maven工程,添加依赖导包。

1
2
3
4
5
6
7
8
<dependencies>
<!--添加hive依赖-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.everweekup.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

// 121-DWD
public class ExplodeJSONArray extends GenericUDTF {

@Override
// ObjectInspector里封装了Hive数据类型
// StructObjectInspector是Hive的复杂结构类型,比如数组,map等,含有多个参数
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

// 1 参数合法性检查
// 传入参数结构: [{"":xxx}, {"":xxx}, ...]
if (argOIs.getAllStructFieldRefs().size() != 1){
throw new UDFArgumentException("ExplodeJSONArray 只需要一个参数");
}

// 2 第一个参数必须为string
if(!"string".equals(argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName())){
throw new UDFArgumentException("json_array_to_struct_array的第1个参数应为string类型");
}

// 3 定义返回值名称和类型
List<String> fieldNames = new ArrayList<String>();
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

fieldNames.add("items");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

public void process(Object[] objects) throws HiveException {

// 1 获取传入的数据
// 前面已经校验过不为0,不会发生数组越界
String jsonArray = objects[0].toString();

// 2 将string转换为json数组
JSONArray actions = new JSONArray(jsonArray);

// 3 循环一次,取出数组中的一个json,并写出
for (int i = 0; i < actions.length(); i++) {

String[] result = new String[1];
result[0] = actions.getString(i);
forward(result);
}
}

public void close() throws HiveException {

}
}

创建函数

1.打包

2.将hivefunction-1.0-SNAPSHOT.jar上传到hadoop102的/opt/module,然后再将该jar包上传到HDFS的/user/hive/jars路径下

1
2
[dw@hadoop102 module]$ hadoop fs -mkdir -p /user/hive/jars
[dw@hadoop102 module]$ hadoop fs -put hivefunction-1.0-SNAPSHOT.jar /user/hive/jars

3.创建永久函数与开发好的java class关联

1
2
hive (gmall)>
create function explode_json_array as 'com.atguigu.hive.udtf.ExplodeJSONArray' using jar 'hdfs://hadoop102:8020/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';

4.注意:如果修改了自定义函数重新生成jar包怎么处理?只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。

曝光日志表

曝光日志解析思路:曝光日志表中每行数据对应一个曝光记录,一个曝光记录应当包含公共信息、页面信息以及曝光信息。先将包含display字段的日志过滤出来,然后通过UDTF函数,将display数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`during_time` bigint COMMENT 'app版本号',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面ID ',
`source_type` string COMMENT '来源类型',
`ts` bigint COMMENT 'app版本号',
`display_type` string COMMENT '曝光类型',
`item` string COMMENT '曝光对象id ',
`item_type` string COMMENT 'app版本号',
`order` bigint COMMENT '出现顺序'
) COMMENT '曝光日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_display_log'
TBLPROPERTIES('parquet.compression'='lzo');

错误日志表

错误日志解析思路:错误日志表中每行数据对应一个错误记录,为方便定位错误,一个错误记录应当包含与之对应的公共信息、页面信息、曝光信息、动作信息、启动信息以及错误信息。先将包含err字段的日志过滤出来,然后使用get_json_object函数解析所有字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面ID ',
`source_type` string COMMENT '来源类型',
`entry` string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
-- 一个错误对应一个页面,一个页面对应多个曝光和动作
`loading_time` string COMMENT '启动加载时间',
`open_ad_id` string COMMENT '广告页ID ',
`open_ad_ms` string COMMENT '广告总共播放时间',
`open_ad_skip_ms` string COMMENT '用户跳过广告时点',
`actions` string COMMENT '动作',
`displays` string COMMENT '曝光',
`ts` string COMMENT '时间',
`error_code` string COMMENT '错误码',
`msg` string COMMENT '错误信息'
) COMMENT '错误日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_error_log'
TBLPROPERTIES('parquet.compression'='lzo');
--这里没有对动作和曝光json列表做explode,后续如果想分析错误和动作或者曝光的关系,可以使用explode_json_array函数自行炸开分析。

说明:此处为对动作数组和曝光数组做处理,如需分析错误与单个动作或曝光的关联,可先使用explode_json_array**函数将数组“炸开”,再使用get_json_object**函数获取具体字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 测试将display和actions爆开
with ex1 as (
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.actions') actions,
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg'),
get_json_object(display,'$.displayType') displayType,
get_json_object(display,'$.item') item,
get_json_object(display,'$.item_type') item_type,
get_json_object(display,'$.order') `order`
from ods_log lateral view explode_json_array(get_json_object(line, "$.displays")) tmp as display
where dt='2020-06-15'
and get_json_object(line,'$.displays') is not null
and get_json_object(line,'$.actions') is not null
)
select
displayType,
item,
item_type,
`order`,
get_json_object(action,'$.action_id') action_id,
get_json_object(action,'$.item') item,
get_json_object(action,'$.item_type') item_type,
get_json_object(action,'$.ts') ts
from ex1 lateral view explode_json_array(actions) tmp as action;

DWD层用户行为数据加载脚本

编写DWD数据导入脚本ods_to_dwd_log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/bin/bash

hive=/opt/module/hive/bin/hive
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi

sql="
SET mapreduce.job.queuename=hive;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;


insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;


insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.displayType'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;

insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;


insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;
"

$hive -e "$sql"

脚本采用的是insert overwrite插入数据,可以保证程序的幂等性。一般企业在每日凌晨30mins~1点执行。

执行脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[dw@hadoop116 ~]$ ods_to_dwd_log.sh 2020-06-15
Hive on Spark Session Web UI URL: http://hadoop117:37777

Query Hive on Spark job[4] stages: [9, 8]
Spark job[4] status = RUNNING
--------------------------------------------------------------------------------------
STAGES ATTEMPT STATUS TOTAL COMPLETED RUNNING PENDING FAILED
--------------------------------------------------------------------------------------
Stage-8 ........ 0 FINISHED 2 2 0 0 0
Stage-9 ........ 0 FINISHED 2 2 0 0 0
--------------------------------------------------------------------------------------
STAGES: 02/02 [==========================>>] 100% ELAPSED TIME: 2.02 s
--------------------------------------------------------------------------------------
Spark job[4] finished successfully in 2.02 second(s)
Loading data to table gmall.dwd_error_log partition (dt=2020-06-15)
OK
_c0 _c1 _c2 _c3 _c4 _c5 _c6 _c7 _c8 _c9 _c10 _c11 _c12 _c13 _c14 _c15 _c16 _c17 _c18 _c19 _c20 _c21 _c22
Time taken: 2.506 seconds

DWD层业务数据

业务数据方面DWD层的搭建主要注意点在于维度建模减少后续大量Join操作。

商品维度表(全量)

商品维度表主要是将商品表SKU表、商品一级分类、商品二级分类、商品三级分类、商品品牌表和商品SPU表联接为商品表。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DROP TABLE IF EXISTS `dwd_dim_sku_info`;
CREATE EXTERNAL TABLE `dwd_dim_sku_info` (
`id` string COMMENT '商品id',
`spu_id` string COMMENT 'spuid',
`price` decimal(16,2) COMMENT '商品价格',
`sku_name` string COMMENT '商品名称',
`sku_desc` string COMMENT '商品描述',
`weight` decimal(16,2) COMMENT '重量',
`tm_id` string COMMENT '品牌id',
`tm_name` string COMMENT '品牌名称',
`category3_id` string COMMENT '三级分类id',
`category2_id` string COMMENT '二级分类id',
`category1_id` string COMMENT '一级分类id',
`category3_name` string COMMENT '三级分类名称',
`category2_name` string COMMENT '二级分类名称',
`category1_name` string COMMENT '一级分类名称',
`spu_name` string COMMENT 'spu名称',
`create_time` string COMMENT '创建时间'
) COMMENT '商品维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_sku_info/'
tblproperties ("parquet.compression"="lzo");

数据导入语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
--= with优化查询
--= https://www.cnblogs.com/gxgd/p/9463129.html 通过查询计划分析sql语句优化程度
--= 两种方法的查询计划相似,有限使用with子句,可读性强
EXPLAIN
with sku as (
select id, spu_id, price, sku_name, sku_desc, weight, create_time from ods_sku_info where dt='2020-06-14'
),
spu as (
select id, spu_name, category3_id, tm_id, dt from ods_spu_info where dt='2020-06-14'
),
br as (
select tm_id, tm_name from ods_base_trademark where dt='2020-06-14'
),
c3 as (
select id, name, category2_id from ods_base_category3 where dt='2020-06-14'
),
c2 as (
select id, name, category1_id from ods_base_category2 where dt='2020-06-14'
),
c1 as (
select id, name from ods_base_category1 where dt='2020-06-14'
)
select
sku.id
,sku.spu_id
,sku.price
,sku.sku_name
,sku.sku_desc
,sku.weight
,br.tm_id
,br.tm_name
,c3.id
,c2.id
,c1.id
,c3.name
,c2.name
,c1.name
,spu.spu_name
,sku.create_time
from sku
join spu on sku.spu_id = spu.id
join br on spu.tm_id = br.tm_id
join c3 on spu.category3_id = c3.id
join c2 on c3.category2_id = c2.id
join c1 on c2.category1_id = c1.id;

优惠券维度表(全量)

把ODS层ods_coupon_info表数据导入到DWD层优惠卷维度表,在导入过程中可以做适当的清洗。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
drop table if exists dwd_dim_coupon_info;
create external table dwd_dim_coupon_info(
`id` string COMMENT '购物券编号',
`coupon_name` string COMMENT '购物券名称',
`coupon_type` string COMMENT '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
`condition_amount` decimal(16,2) COMMENT '满额数',
`condition_num` bigint COMMENT '满件数',
`activity_id` string COMMENT '活动编号',
`benefit_amount` decimal(16,2) COMMENT '减金额',
`benefit_discount` decimal(16,2) COMMENT '折扣',
`create_time` string COMMENT '创建时间',
`range_type` string COMMENT '范围类型 1、商品 2、品类 3、品牌',
`spu_id` string COMMENT '商品id',
`tm_id` string COMMENT '品牌id',
`category3_id` string COMMENT '品类id',
`limit_num` bigint COMMENT '最多领用次数',
`operate_time` string COMMENT '修改时间',
`expire_time` string COMMENT '过期时间'
) COMMENT '优惠券维度表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_coupon_info partition(dt='2020-06-14')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ods_coupon_info
where dt='2020-06-15';

活动维度表(全量)

这里要注意,活动维度表由于是自己造的,没有列出参与活动对应的活动级别,活动级别有1,2,3级别,而活动订单表只有活动类型没有参与活动的级别,所以这里直接拿活动订单表,不考虑join其他表。

1
2
3
4
5
6
7
8
9
10
11
12
13
drop table if exists dwd_dim_activity_info;
create external table dwd_dim_activity_info(
`id` string COMMENT '编号',
`activity_name` string COMMENT '活动名称',
`activity_type` string COMMENT '活动类型',
`start_time` string COMMENT '开始时间',
`end_time` string COMMENT '结束时间',
`create_time` string COMMENT '创建时间'
) COMMENT '活动信息表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_activity_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_activity_info partition(dt='2020-06-14')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ods_activity_info
where dt='2020-06-15';

地区维度表(特殊)

1
2
3
4
5
6
7
8
9
10
11
12
13
DROP TABLE IF EXISTS `dwd_dim_base_province`;
CREATE EXTERNAL TABLE `dwd_dim_base_province` (
`id` string COMMENT 'id',
`province_name` string COMMENT '省市名称',
`area_code` string COMMENT '地区编码',
`iso_code` string COMMENT 'ISO编码',
`region_id` string COMMENT '地区id',
`region_name` string COMMENT '地区名称'
) COMMENT '地区维度表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_base_province/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from
(
select * from ods_base_province
) bp
join
(
select * from ods_base_region
) br
on bp.region_id = br.id;

时间维度表(特殊)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DROP TABLE IF EXISTS `dwd_dim_date_info`;
CREATE EXTERNAL TABLE `dwd_dim_date_info`(
`date_id` string COMMENT '日',
`week_id` string COMMENT '周',
`week_day` string COMMENT '周的第几天',
`day` string COMMENT '每月的第几天',
`month` string COMMENT '第几月',
`quarter` string COMMENT '第几季度',
`year` string COMMENT '年',
`is_workday` string COMMENT '是否是周末',
`holiday_id` string COMMENT '是否是节假日'
) COMMENT '时间维度表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_date_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

注意:由于dwd_dim_date_info是列式存储+LZO压缩。直接将date_info.txt文件导入到目标表,并不会直接转换为列式存储+LZO压缩。我们需要创建一张普通的临时表dwd_dim_date_info_tmp,将date_info.txt加载到该临时表中。最后通过查询临时表数据,把查询到的数据插入到最终的目标表中。

创建临时表,非列式存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
DROP TABLE IF EXISTS `dwd_dim_date_info_tmp`;
CREATE EXTERNAL TABLE `dwd_dim_date_info_tmp`(
`date_id` string COMMENT '日',
`week_id` string COMMENT '周',
`week_day` string COMMENT '周的第几天',
`day` string COMMENT '每月的第几天',
`month` string COMMENT '第几月',
`quarter` string COMMENT '第几季度',
`year` string COMMENT '年',
`is_workday` string COMMENT '是否是周末',
`holiday_id` string COMMENT '是否是节假日'
) COMMENT '时间临时表'
row format delimited fields terminated by '\t'
location '/warehouse/gmall/dwd/dwd_dim_date_info_tmp/';

// 将数据导入临时表
load data local inpath '/opt/module/db_log/date_info.txt' into table dwd_dim_date_info_tmp;

// 将数据导入正式表
insert overwrite table dwd_dim_date_info select * from dwd_dim_date_info_tmp;

支付事实表(事务型事实表)

时间 用户 地区 商品 优惠券 活动 编码 度量值
支付 金额

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
drop table if exists dwd_fact_payment_info;
create external table dwd_fact_payment_info (
`id` string COMMENT 'id',
`out_trade_no` string COMMENT '对外业务编号',
`order_id` string COMMENT '订单编号',
`user_id` string COMMENT '用户编号',
`alipay_trade_no` string COMMENT '支付宝交易流水编号',
`payment_amount` decimal(16,2) COMMENT '支付金额',
`subject` string COMMENT '交易内容',
`payment_type` string COMMENT '支付类型',
`payment_time` string COMMENT '支付时间',
`province_id` string COMMENT '省份ID'
) COMMENT '支付事实表表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_payment_info/'
tblproperties ("parquet.compression"="lzo");

这里的时间维度就是分区字段,这里补充下维度退化概念:由于维度表字段过少,可以将维度表并入到事实表以减少join,提高性能,这一操作就维度退化。

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--= 一行记录就是一条事务,增量更新
with pay as (
select * from ods_payment_info where dt='2020-06-15'
),
`order` as (
select id, province_id from ods_order_info where dt='2020-06-15'
)
select
pay.id,
pay.out_trade_no,
pay.order_id,
pay.user_id,
pay.alipay_trade_no,
pay.total_amount,
pay.subject,
pay.payment_type,
pay.payment_time,
`order`.province_id
from pay join `order` on pay.order_id = `order`.id;

退款事实表(事务型事实表)

把ODS层ods_order_refund_info表数据导入到DWD层退款事实表,在导入过程中可以做适当的清洗。

时间 用户 地区 商品 优惠券 活动 编码 度量值
退款 件数/金额

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
drop table if exists dwd_fact_order_refund_info;
create external table dwd_fact_order_refund_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户ID',
`order_id` string COMMENT '订单ID',
`sku_id` string COMMENT '商品ID',
`refund_type` string COMMENT '退款类型',
`refund_num` bigint COMMENT '退款件数',
`refund_amount` decimal(16,2) COMMENT '退款金额',
`refund_reason_type` string COMMENT '退款原因类型',
`create_time` string COMMENT '退款时间'
) COMMENT '退款事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_refund_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_order_refund_info partition(dt='2020-06-14')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ods_order_refund_info
where dt='2020-06-15';

评价事实表(事务型事实表)

把ODS层ods_comment_info表数据导入到DWD层评价事实表,在导入过程中可以做适当的清洗。

时间 用户 地区 商品 优惠券 活动 编码 度量值
评价 个数

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
drop table if exists dwd_fact_comment_info;
create external table dwd_fact_comment_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户ID',
`sku_id` string COMMENT '商品sku',
`spu_id` string COMMENT '商品spu',
`order_id` string COMMENT '订单ID',
`appraise` string COMMENT '评价',
`create_time` string COMMENT '评价时间'
) COMMENT '评价事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_comment_info/'
tblproperties ("parquet.compression"="lzo");

数据装载

1
2
3
4
5
6
7
8
9
10
11
12
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_comment_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ods_comment_info
where dt='2020-06-15';

订单明细事实表(事务型事实表)

时间 用户 地区 商品 优惠券 活动 编码 度量值
订单详情 件数/金额

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
drop table if exists dwd_fact_order_detail;
create external table dwd_fact_order_detail (
`id` string COMMENT '订单编号',
`order_id` string COMMENT '订单号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'sku商品id',
`sku_name` string COMMENT '商品名称',
`order_price` decimal(16,2) COMMENT '商品价格',
`sku_num` bigint COMMENT '商品数量',
`create_time` string COMMENT '创建时间',
`province_id` string COMMENT '省份ID',
`source_type` string COMMENT '来源类型',
`source_id` string COMMENT '来源编号',
`original_amount_d` decimal(20,2) COMMENT '原始价格分摊',
`final_amount_d` decimal(20,2) COMMENT '购买价格分摊',
`feight_fee_d` decimal(20,2) COMMENT '分摊运费',
`benefit_reduce_amount_d` decimal(20,2) COMMENT '分摊优惠'
) COMMENT '订单明细事实表表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_detail/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
--= 一个order_info里面有多个order_detail订单,即将读个订单合并成一个大订单放在order_info中,order_detail展现大订单里的详情订单
--= final_total_amount = Sum(order_price * sku_name) + Any(feight_fee) - Any(benefit_reduce_amount)
--= feight_fee比较特殊,同一大订单的任一小订单运费都一样,就是总体大订单的运费
--= benefit_reduce_amount是所有订单的优惠金额,即所有小订单金额总和 - Any(benefit_reduce_amount)
--= original_amount_d: 原始金额分摊,就是每个订单的金额,直接订单单价*sku_num即可
--=benefit_reduce_amount_d 分摊优惠 = 该订单总金额/大订单总金额 * 优惠金额
with od as (
select
id
,order_id
,sku_id
,sku_name
,order_price
,sku_num
,create_time
,source_type
,source_id
from ods_order_detail
where dt='2020-06-15'
),
oi as (
select
id
,user_id
,province_id
,benefit_reduce_amount
,original_total_amount
,feight_fee
,final_total_amount
from ods_order_info
where dt='2020-06-15'
),
tmp as (
select
od.id
,od.order_id
,oi.user_id
,od.sku_id
,od.sku_name
,od.order_price
,od.sku_num
,od.create_time
,oi.province_id
,od.source_type
,od.source_id
,round(od.order_price * od.sku_num, 2) original_amount_d
,round((od.order_price * od.sku_num * oi.final_total_amount) / oi.original_total_amount, 2) final_amount_d
,round((od.order_price * od.sku_num * oi.feight_fee) / oi.original_total_amount, 2) feight_fee_d
,round(
-- ((od.order_price*od.sku_num)/oi.final_total_amount) * oi.benefit_reduce_amount
-- 会存在精度丢失问题:10 / 3 = sum(3.33 * 3) != 10 :需要拿到误差金额(10-3.33*3) + 总的优惠金额
(od.order_price * od.sku_num * oi.benefit_reduce_amount) / oi.original_total_amount
, 2) benefit_reduce_amount_d
,sum(round((od.order_price * od.sku_num * oi.final_total_amount) / oi.original_total_amount, 2)) over(partition by od.order_id) final_amount_d_sum
,sum(round((od.order_price * od.sku_num * oi.feight_fee) / oi.original_total_amount, 2)) over(partition by od.order_id) feight_fee_d_sum
,sum(round((od.order_price * od.sku_num * oi.benefit_reduce_amount) / oi.original_total_amount, 2)) over(partition by od.order_id) benefit_reduce_amount_sum
,row_number() over (partition by od.order_id order by od.id) rn
,oi.original_total_amount
,oi.benefit_reduce_amount
,oi.feight_fee
,oi.final_total_amount
from od join oi
on od.order_id = oi.id
)
insert overwrite table dwd_fact_order_detail partition(dt='2020-06-15')
select
tmp.id
,tmp.order_id
,tmp.user_id
,tmp.sku_id
,tmp.sku_name
,tmp.order_price
,tmp.sku_num
,tmp.create_time
,tmp.province_id
,tmp.source_type
,tmp.source_id
,tmp.original_amount_d
,if(rn=1, tmp.final_amount_d + (tmp.final_amount_d - tmp.final_amount_d_sum), tmp.final_amount_d) final_amount_d
,if(rn=1, tmp.feight_fee_d + (tmp.feight_fee - tmp.feight_fee_d_sum), tmp.feight_fee_d) feight_fee_d
,if(rn=1, tmp.benefit_reduce_amount_d + (tmp.benefit_reduce_amount - tmp.benefit_reduce_amount_sum), tmp.benefit_reduce_amount_d) benefit_reduce_amount_d
-- ,tmp.original_total_amount
-- ,tmp.benefit_reduce_amount
-- ,tmp.feight_fee
-- ,tmp.final_total_amount
from tmp;

对上面的sql语句进行补充说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
补充:订单明细事实表中的分摊优惠处理,由于除了之后会存在精度损失,所以我们考虑用总的费用减去除后各商品分摊的优惠。计算除损失的精度后找一个商品加上这个损失,使得整体结果只和等于原来的总费用。

id order_id benefit_reduce_amount benefit_reduce_amount_d benefit_reduce_amount_d_sum row_number
1 1001 10 3.33 9.99 1
2 1001 10 3.33 9.99 2
3 1001 10 3.33 9.99 3

场景:
假设满30优惠10元,现在买了3件商品,每件商品分别为10,10,10元,则这三件商品每件分摊:
优惠金额*(该商品价/订单所有商品价格之和)=10*10/30=3.33
同理可得每件商品优惠:3.33,损失精度:10-3*3.33=0.01
为弥补精度损失,随便选一个商品使其+0.01即可。

解决方案的sql
1.第一个开创函数,用于求出benefit_reduce_amount_d_sum列:
round(sum(benefit_reduce_amount*(order_price*order_sku_num/original_total_amount)), 2)
over(partition by order_id) benefit_reduce_amount_d_sum--求出 benefit_reduce_amount_d_sum
-- 注意:partition by后面不要加上order by,因为业务逻辑不需要加,但是加了一定会出错,order by使用了求的是第一行到当前行的和,结果即为:
benefit_reduce_amount_d_sum
3.33
6.66
9.99
-- 所以这里不能使用order by
以上语句正常执行的结果为:
benefit_reduce_amount_d_sum
9.99
9.99
9.99

2.开始第二个开窗函数,加一个行号row_number按照字段order_id分组,随便找个字段做排序,比如按照id排序,排完序后会得到一个行号,每个商品项最少都有一个行号1
我们给行号为1的记录的benefit_reduce_amount_d加上误差项,补齐这个误差即可:
row_number() over(partition by order_id order by id) rn --rn(行号)
--
row_number
1
2
3
--

3.补齐误差项,将上面1,2两部查询出来的结果放在子查询里,去补齐误差:
select
xxxx
xxxx
-- 如果row_number()为1,则补齐误差:3.34,否则返回原值,即每个商品分摊后的优惠金额3.33
if(rn=1, benefit_reduce_amount_d+(benefit_reduce_amount-benefit_reduce_amount_d_sum), benefit_reduce_amount_d)
from
(
select
xxxx
xxxx
round(sum(benefit_reduce_amount*(order_price*order_sku_num/original_total_amount)), 2)
over(partition by order_id) benefit_reduce_amount_d_sum,
row_number() over(partition by order_id order by id) rn
from xxx
) tmp --from子查询必须得给查询结果起别名

-- if函数只能有是或否两种结果情况
-- case when相当于java的switch,有多个选择

加购事实表(周期型快照事实表,每日快照)

由于购物车的数量是会发生变化,所以导增量不合适。

每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。

周期型快照事实表劣势:存储的数据量会比较大。

解决方案:周期型快照事实表存储的数据比较讲究时效性,时间太久了的意义不大,可以删除以前的数据。

时间 用户 地区 商品 优惠券 活动 编码 度量值
加购 件数/金额

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
drop table if exists dwd_fact_cart_info;
create external table dwd_fact_cart_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'skuid',
`cart_price` string COMMENT '放入购物车时价格',
`sku_num` string COMMENT '数量',
`sku_name` string COMMENT 'sku名称 (冗余)',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '修改时间',
`is_ordered` string COMMENT '是否已经下单。1为已下单;0为未下单',
`order_time` string COMMENT '下单时间',
`source_type` string COMMENT '来源类型',
`srouce_id` string COMMENT '来源编号'
) COMMENT '加购事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_cart_info/'
tblproperties ("parquet.compression"="lzo");

数据装载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_cart_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ods_cart_info
where dt='2020-06-15';

收藏事实表(周期型快照事实表,每日快照)

收藏的标记,是否取消,会发生变化,做增量不合适。

每天做一次快照,导入的数据是全量,区别于事务型事实表是每天导入新增。

时间 用户 地区 商品 优惠券 活动 编码 度量值
收藏 个数

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
drop table if exists dwd_fact_favor_info;
create external table dwd_fact_favor_info(
`id` string COMMENT '编号',
`user_id` string COMMENT '用户id',
`sku_id` string COMMENT 'skuid',
`spu_id` string COMMENT 'spuid',
`is_cancel` string COMMENT '是否取消',
`create_time` string COMMENT '收藏时间',
`cancel_time` string COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_favor_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_fact_favor_info partition(dt='2020-06-14')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ods_favor_info
where dt='2020-06-14';

优惠券领用事实表(累积型快照事实表)

适用周期性的事务。领用=》使用=》过期,一行数据指代用户对优惠劵的领取使用记录

时间 用户 地区 商品 优惠券 活动 编码 度量值
优惠券领用 个数

优惠卷的生命周期:领取优惠卷-》用优惠卷下单-》优惠卷参与支付

累积型快照事实表使用:统计优惠卷领取次数、优惠卷下单次数、优惠卷参与支付次数。

累计型快照事实表需要将表进行分区,因为会有新增和修改的数据需要和原始数据对比更新,分区后只需要去到需要修改的数据所在的分区进行修改,能有效避免全量数据增删改查带来的性能损耗问题。

根据我们在数仓ODS层建模一文中的coupon_use表字段,get_timeusing_timeused_time分别代表获得日期、使用日期和使用截至日期。根据这几个字段去join原表的不同分区更新数据。

要更新的字段主要是优惠卷的使用状态和使用时间。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
drop table if exists dwd_fact_coupon_use;
create external table dwd_fact_coupon_use(
`id` string COMMENT '编号',
`coupon_id` string COMMENT '优惠券ID',
`user_id` string COMMENT 'userid',
`order_id` string COMMENT '订单id',
`coupon_status` string COMMENT '优惠券状态',
`get_time` string COMMENT '领取时间',
`using_time` string COMMENT '使用时间(下单)',
`used_time` string COMMENT '使用时间(支付)'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_coupon_use/'
tblproperties ("parquet.compression"="lzo");

注意:dt是按照优惠卷领用时间get_time做为分区。

导入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
-- 优惠券领用事实表(累积型快照事实表)
with old as (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from dwd_fact_coupon_use
where dt in(
--= 看15的ods层的优惠劵获取日期get_time去dwd层原始分区选分区更新
select date_format(get_time, 'yyyy-MM-dd') from ods_coupon_use where dt='2020-06-15'
) -- 如果今天分区没有会返回全部数据
),
new as (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ods_coupon_use
where dt='2020-06-15'
)
select
nvl(new.id, old.id),
nvl(new.coupon_id, old.coupon_id),
nvl(new.user_id, old.user_id),
nvl(new.order_id, old.order_id),
nvl(new.coupon_status, old.coupon_status),
nvl(new.get_time, old.get_time),
nvl(new.using_time, old.using_time),
nvl(new.used_time, old.used_time),
date_format(nvl(new.get_time, old.get_time), 'yyyy-MM-dd')
from new full outer join old
on new.id = old.id;

订单事实表(累积型快照事实表)

时间 用户 地区 商品 优惠券 活动 编码 度量值
订单 件数/金额

订单生命周期:创建时间=》支付时间=》取消时间=》完成时间=》退款时间=》退款完成时间。

由于ODS层订单表只有创建时间和操作时间两个状态,不能表达所有时间含义,所以需要关联订单状态表。订单事实表里面增加了活动id,所以需要关联活动订单表。

建表语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
drop table if exists dwd_fact_order_info;
create external table dwd_fact_order_info (
`id` string COMMENT '订单编号',
`order_status` string COMMENT '订单状态',
`user_id` string COMMENT '用户id',
`out_trade_no` string COMMENT '支付流水号',
`create_time` string COMMENT '创建时间(未支付状态)', --对应操作时间代码1001
`payment_time` string COMMENT '支付时间(已支付状态)', --1002
`cancel_time` string COMMENT '取消时间(已取消状态)', --1003
`finish_time` string COMMENT '完成时间(已完成状态)', --1004
`refund_time` string COMMENT '退款时间(退款中状态)', --1005
`refund_finish_time` string COMMENT '退款完成时间(退款完成状态)', --1006
`province_id` string COMMENT '省份ID',
`activity_id` string COMMENT '活动ID',
`original_total_amount` decimal(16,2) COMMENT '原价金额',
`benefit_reduce_amount` decimal(16,2) COMMENT '优惠金额',
`feight_fee` decimal(16,2) COMMENT '运费',
`final_total_amount` decimal(16,2) COMMENT '订单金额'
) COMMENT '订单事实表'
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_fact_order_info/'
tblproperties ("parquet.compression"="lzo");

数据装载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
with old as (
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from dwd_fact_order_info
where dt in
(
select date_format(create_time, 'yyyy-MM-dd') from ods_order_info where dt='2020-06-15'
)
),
new as (
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id
,str_to_map(concat_ws(',', collect_set(concat(order_id, '=', operate_time))), ',', '=') tms
from ods_order_status_log
where dt='2020-06-15'
group by order_id
) log
join
(
select
id
,order_status
,user_id
,out_trade_no
,province_id
,original_total_amount
,benefit_reduce_amount
,feight_fee
,final_total_amount
from ods_order_info where dt='2020-06-15'
) info
on log.order_id = info.id
left join
(
select order_id, activity_id from ods_activity_order where dt='2020-06-15'
) act
on log.order_id = act.order_id
)
select
nvl(new.id, old.id)
,nvl(new.order_status, old.order_status)
,nvl(new.user_id, old.user_id)
,nvl(new.out_trade_no, old.out_trade_no)
,nvl(new.tms['1001'], old.create_time)
,nvl(new.tms['1002'], old.payment_time)
,nvl(new.tms['1003'], old.cancel_time)
,nvl(new.tms['1004'], old.finish_time)
,nvl(new.tms['1005'], old.refund_time)
,nvl(new.tms['1006'], old.refund_finish_time)
,nvl(new.province_id, old.province_id)
,nvl(new.activity_id, old.activity_id)
,nvl(new.original_total_amount ,old.original_total_amount)
,nvl(new.benefit_reduce_amount, old.benefit_reduce_amount)
,nvl(new.feight_fee, old.feight_fee)
,nvl(new.final_total_amount, old.final_total_amount)
,date_format(nvl(new.tms['1001'], old.create_time), 'yyyy-MM-dd')
from new full outer join old
on new.id = old.id;

用户维度表(拉链表)

用户表中的数据每日既有可能新增,也有可能修改,但修改频率并不高,属于缓慢变化维度,此处采用拉链表存储用户维度数据。拉链表可以解决数据重复存储。

关于拉链表的介绍可以在数仓建模理论一文中查看。

拉链表制作过程

初始化拉链表(首次独立执行)

建立拉链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
drop table if exists dwd_dim_user_info_his;
create external table dwd_dim_user_info_his(
`id` string COMMENT '用户id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '用户拉链表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
tblproperties ("parquet.compression"="lzo");

初始化拉链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_user_info_his -- 初始化拉链表,初始日期6-14
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-06-14',
'9999-99-99'
from ods_user_info oi
where oi.dt='2020-06-14';
制作当日变动数据(包括新增,修改)每日执行

如何获得每日变动表:

  • 最好表内有创建时间和变动时间(Lucky!)

  • 如果没有,可以利用第三方工具监控比如canal,监控MySQL的实时变化进行记录(麻烦)。主从复制,缓解数据库压力。读数据更多,所以读写分离,使用主从分离去做,slave节点会从master节点拉取数据,读从slave读,多个slave能够缓解读的压力。

    写数据直接写master,保证数据的一致性。

    canal底层原理就是伪装成mysql的一个slave,这样canal就能获取到mysql的变动数据然后传给kafka再传入数据库进行对比。

  • 逐行对比前后两天的数据,检查md5(concat(全部有可能变化的字段))是否相同(low)

  • 要求业务数据库提供变动流水(人品,颜值)

靠谱的方法:a,b

因为ods_user_info本身导入过来就是新增变动明细的表,所以不用处理:

  • 数据库中新增2020-06-15一天的数据

* 通过Sqoop把2020-06-15日所有数据导入

1
mysql_to_hdfs.sh all 2020-06-15
  • ods层数据导入
1
hdfs_to_ods_db.sh all 2020-06-15
先合并变动信息,再追加新增信息,插入到临时表中

建立临时表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
drop table if exists dwd_dim_user_info_his_tmp;
create external table dwd_dim_user_info_his_tmp(
`id` string COMMENT '用户id',
`name` string COMMENT '姓名',
`birthday` string COMMENT '生日',
`gender` string COMMENT '性别',
`email` string COMMENT '邮箱',
`user_level` string COMMENT '用户等级',
`create_time` string COMMENT '创建时间',
`operate_time` string COMMENT '操作时间',
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
) COMMENT '订单拉链临时表'
stored as parquet
location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
tblproperties ("parquet.compression"="lzo");

导入脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table dwd_dim_user_info_his_tmp --数据插入临时拉链表
with `user` as
(
--新表
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'2020-06-15' start_date,
'9999-99-99' end_date
from ods_user_info where dt='2020-06-15'

union all -- unino操作只能两个子查询

--旧表,只需要旧的数据,下面是更新旧数据后与上面查询的新数据合并即可
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt, -1), uh.end_date) end_date
from dwd_dim_user_info_his uh left join --拉取全量的拉链表数据
(
select id, dt from ods_order_info where dt='2020-06-15' -- 拉取新增数据
) ui on uh.id = ui.id
)
select * from `user`;

临时表覆盖给拉链表:

1
2
insert overwrite table dwd_dim_user_info_his 
select * from dwd_dim_user_info_his_tmp;

DWD层业务数据导入脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
#!/bin/bash

APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi

sql1="
set mapreduce.job.queuename=hive;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

with sku as (
select id, spu_id, price, sku_name, sku_desc, weight, create_time from ${APP}.ods_sku_info where dt='$do_date'
),
spu as (
select id, spu_name, category3_id, tm_id, dt from ${APP}.ods_spu_info where dt='$do_date'
),
br as (
select tm_id, tm_name from ${APP}.ods_base_trademark where dt='$do_date'
),
c3 as (
select id, name, category2_id from ${APP}.ods_base_category3 where dt='$do_date'
),
c2 as (
select id, name, category1_id from ${APP}.ods_base_category2 where dt='$do_date'
),
c1 as (
select id, name from ${APP}.ods_base_category1 where dt='$do_date'
)
insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date')
select
sku.id
,sku.spu_id
,sku.price
,sku.sku_name
,sku.sku_desc
,sku.weight
,br.tm_id
,br.tm_name
,c3.id
,c2.id
,c1.id
,c3.name
,c2.name
,c1.name
,spu.spu_name
,sku.create_time
from sku
join spu on sku.spu_id = spu.id
join br on spu.tm_id = br.tm_id
join c3 on spu.category3_id = c3.id
join c2 on c3.category2_id = c2.id
join c1 on c2.category1_id = c1.id;


insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date';


with od as (
select
id
,order_id
,sku_id
,sku_name
,order_price
,sku_num
,create_time
,source_type
,source_id
from ${APP}.ods_order_detail
where dt='$do_date'
),
oi as (
select
id
,user_id
,province_id
,benefit_reduce_amount
,original_total_amount
,feight_fee
,final_total_amount
from ${APP}.ods_order_info
where dt='$do_date'
),
tmp as (
select
od.id
,od.order_id
,oi.user_id
,od.sku_id
,od.sku_name
,od.order_price
,od.sku_num
,od.create_time
,oi.province_id
,od.source_type
,od.source_id
,round(od.order_price * od.sku_num, 2) original_amount_d
,round((od.order_price * od.sku_num * oi.final_total_amount) / oi.original_total_amount, 2) final_amount_d
,round((od.order_price * od.sku_num * oi.feight_fee) / oi.original_total_amount, 2) feight_fee_d
,round(
-- ((od.order_price*od.sku_num)/oi.final_total_amount) * oi.benefit_reduce_amount
-- 会存在精度丢失问题:10 / 3 = sum(3.33 * 3) != 10 :需要拿到误差金额(10-3.33*3) + 总的优惠金额
(od.order_price * od.sku_num * oi.benefit_reduce_amount) / oi.original_total_amount
, 2) benefit_reduce_amount_d
,sum(round((od.order_price * od.sku_num * oi.final_total_amount) / oi.original_total_amount, 2)) over(partition by od.order_id) final_amount_d_sum
,sum(round((od.order_price * od.sku_num * oi.feight_fee) / oi.original_total_amount, 2)) over(partition by od.order_id) feight_fee_d_sum
,sum(round((od.order_price * od.sku_num * oi.benefit_reduce_amount) / oi.original_total_amount, 2)) over(partition by od.order_id) benefit_reduce_amount_sum
,row_number() over (partition by od.order_id order by od.id) rn
,oi.original_total_amount
,oi.benefit_reduce_amount
,oi.feight_fee
,oi.final_total_amount
from od join oi
on od.order_id = oi.id
)
insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date')
select
tmp.id
,tmp.order_id
,tmp.user_id
,tmp.sku_id
,tmp.sku_name
,tmp.order_price
,tmp.sku_num
,tmp.create_time
,tmp.province_id
,tmp.source_type
,tmp.source_id
,tmp.original_amount_d
,if(rn=1, tmp.final_amount_d + (tmp.final_amount_d - tmp.final_amount_d_sum), tmp.final_amount_d) final_amount_d
,if(rn=1, tmp.feight_fee_d + (tmp.feight_fee - tmp.feight_fee_d_sum), tmp.feight_fee_d) feight_fee_d
,if(rn=1, tmp.benefit_reduce_amount_d + (tmp.benefit_reduce_amount - tmp.benefit_reduce_amount_sum), tmp.benefit_reduce_amount_d) benefit_reduce_amount_d
from tmp;


with pay as (
select * from ${APP}.ods_payment_info where dt='$do_date'
),
`order` as (
select id, province_id from ${APP}.ods_order_info where dt='$do_date'
)
insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date')
select
pay.id,
pay.out_trade_no,
pay.order_id,
pay.user_id,
pay.alipay_trade_no,
pay.total_amount,
pay.subject,
pay.payment_type,
pay.payment_time,
`order`.province_id
from pay join `order` on pay.order_id = `order`.id;


insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ${APP}.ods_comment_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ${APP}.ods_cart_info
where dt='$do_date';


insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';


with old as (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.dwd_fact_coupon_use
where dt in(
--= 看15的ods层的优惠劵获取日期get_time去dwd层原始分区选分区更新
select date_format(get_time, 'yyyy-MM-dd') from ods_coupon_use where dt='$do_date'
) -- 如果今天分区没有会返回全部数据
),
new as (
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.ods_coupon_use
where dt='$do_date'
)
insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt='$do_date')
select
nvl(new.id, old.id),
nvl(new.coupon_id, old.coupon_id),
nvl(new.user_id, old.user_id),
nvl(new.order_id, old.order_id),
nvl(new.coupon_status, old.coupon_status),
nvl(new.get_time, old.get_time),
nvl(new.using_time, old.using_time),
nvl(new.used_time, old.used_time)
-- date_format(nvl(new.tms['1001'], old.create_time), 'yyyy-MM-dd')
from new full outer join old
on new.id = old.id;


with old as (
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from ${APP}.dwd_fact_order_info
where dt in
(
select date_format(create_time, 'yyyy-MM-dd') from ods_order_info where dt='$do_date'
)
),
new as (
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id
,str_to_map(concat_ws(',', collect_set(concat(order_id, '=', operate_time))), ',', '=') tms
from ${APP}.ods_order_status_log
where dt='$do_date'
group by order_id
) log
join
(
select
id
,order_status
,user_id
,out_trade_no
,province_id
,original_total_amount
,benefit_reduce_amount
,feight_fee
,final_total_amount
from ${APP}.ods_order_info where dt='$do_date'
) info
on log.order_id = info.id
left join
(
select order_id, activity_id from ${APP}.ods_activity_order where dt='$do_date'
) act
on log.order_id = act.order_id
)
insert overwrite table ${APP}.dwd_fact_order_info partition(dt='$do_date')
select
nvl(new.id, old.id)
,nvl(new.order_status, old.order_status)
,nvl(new.user_id, old.user_id)
,nvl(new.out_trade_no, old.out_trade_no)
,nvl(new.tms['1001'], old.create_time)
,nvl(new.tms['1002'], old.payment_time)
,nvl(new.tms['1003'], old.cancel_time)
,nvl(new.tms['1004'], old.finish_time)
,nvl(new.tms['1005'], old.refund_time)
,nvl(new.tms['1006'], old.refund_finish_time)
,nvl(new.province_id, old.province_id)
,nvl(new.activity_id, old.activity_id)
,nvl(new.original_total_amount ,old.original_total_amount)
,nvl(new.benefit_reduce_amount, old.benefit_reduce_amount)
,nvl(new.feight_fee, old.feight_fee)
,nvl(new.final_total_amount, old.final_total_amount)
-- ,date_format(nvl(new.tms['1001'], old.create_time), 'yyyy-MM-dd')
from new full outer join old
on new.id = old.id;


insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from ${APP}.dwd_fact_order_info
where dt
in
(
select
date_format(create_time,'yyyy-MM-dd')
from ${APP}.ods_order_info
where dt='$do_date'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ${APP}.ods_order_status_log
where dt='$do_date'
group by order_id
)log
join
(
select * from ${APP}.ods_order_info where dt='$do_date'
)info
on log.order_id=info.id
left join
(
select * from ${APP}.ods_activity_order where dt='$do_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"

sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"

sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from
(
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info where dt='$do_date'

union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
from ${APP}.dwd_dim_user_info_his uh left join
(
select
*
from ${APP}.ods_user_info
where dt='$do_date'
) ui on uh.id=ui.id
)his
order by his.id, start_date;

insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"

case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
#$hive -e "$sql1$sql2"不会报错,因为sql变量里有“;”,能够识别不同sql语句
"all"){
$hive -e "$sql1$sql3"
};;
esac

数据导入

分别运行业务数据和日志数据导入脚本,导入数据到DWD层。

1
2
[dw@hadoop116 bin]$ ods_dwd_db2.sh all 2020-06-15
[dw@hadoop116 bin]$ ods_to_dwd_log.sh all 2020-06-15