万字长文——最详Hive入门指南

数据处理方法论

1.了解产品、运营的需求;

  • 发现问题,解决目前存在的问题,得到一些具体量化的价值,指标和口径由产品或运营确定

2.数据收集

  • 准备数据—>直观该数据是否有落库操作
  • 来源:爬虫、qm、kafka、现有业务系统、数据仓库

3.数据处理

  • 首先进行数据调研阶段

    • 了解数据结构
    • 对数据展现做到心中有数
    • 确保数据价值
    • 避免后期出错
    • 需求只会讲故事,不会考虑技术实现的,需要通过数据调研确保实现成本和价值,一般以实现可以能直观产生价值的部分优先
  • 接着根据指定业务规则进行数据清洗,比如说缺失值和异常值

4.数据分析

  • 结合业务需求对数据进行分析,挖掘内在需求,可视化报表展现

5.数据决策和业务优化

  • 需要以数据作为支撑

Hive基础

Hive简介

产生原因

  1. 对存在HDFS上的文件或Hbase中的表进行查询时,是要手写一堆MapReducec代码;
  2. 对于统计任务,只能由懂MapReduce的程序员才能完成;
  3. 耗时耗力,更多精力没有有效的释放出来;

基于以上原因,Hive就此诞生了,Hive基于一个统一的查询分析层通过sql语句的方式对HDFS上的数据进行查询、统计和分析。

Hive的本质就是一个SQL解析引擎,将SQL语句转换成MR job。

Hive的定位是Hadoop大数据仓库,而SQL是其最常用的分析工具。

Hive的表都是纯逻辑表,就只是表的定义等,即表的元数据。其本质就是hdfs路径/文件,以达到元数据与数据存储分离的目的。其元数据(表、表结构等信息)一般存放在mysql,不使用自带的derby,主要是其不支持并发操作,是单线程操作,整体执行效率性能偏低。

Hive可以直接将结构化的数据文件映射成为一张数据库表。

Hive的内容是读多写少,不支持对数据的改写和删除。

Hive SQL VS SQL

image-20210222090749932

补充

读时模式

优点:只有hive读的时候才会检查、解析字段和schema,所以load data非常迅速,因为在写的过程是不需要解析数据的;

缺点:读的时候慢;

写时模式

缺点:写的慢,写的过程需要建立一些索引压缩、数据一致性、字段检查等等。

优点:读的时候会得到优化

Hive对比传统关系型数据库

  • hive和关系数据库存储文件的系统不同,hive使用的是 hadoop的HDFS( hadoop的分布式文件
    系统),关系数据库则是服务器本地的文件系统;
  • hive使用的计算模型是 mapreduce,而关系数据库则是自己设计的计算模型;
  • 关系数据库都是为实时查询的业务进行设计的,而hive则是为海量数据做数据挖掘设计的,实时性很差;
  • hive很容易扩展自己的存储能力和计算能力,这个是继承 hadoop的,而关系数据库在这个方面要比Hive差很多。

Hive体系架构

Hive体系架构

Hive Client

进行交互式执行SQL,直接与 Driver进行交互;

JDBC/ODBC

Hive提供JDBC驱动,作为JAVA的API;

ODBC是通过 Thrift Server来接入,然后发送给 DriverHivef提供的Cli工具;

Metastore(元数据)

是一个独立的关系型数据库中保存表模式和其他系统元数据;

Driver(驱动模块)

通过该模块对需求的计算进行优化,然后按照指定的步骤执行(通常启动多个MR任执行);

image-20210222095936389

对Driver内组件进行介绍:

SQL Parser

根据SQL代码,生成抽象的语法树,通过语法分析,验证SQL语法是否正确。

Query Optimize(逻辑计划执行器)

写select、group by时,都会对应生成一个操作符的树,相当于定义了一个规则。它也可以帮助我们进行优化,降低作业难度。

Physical plan(物理执行计划)

通过物理执行计划,转变成MR任务;

Hive执行流程

Hive内部详细执行流程

1.首先UI(或Cli)界面和Driver建立互通,进行访问;

2.访问之后有一个编译过程,编译的过程会去获取和查询元数据,如果有则返回,如果没有,则查询的时候会报错,比如select某不存在的列;

3.返回的数据通过Driver交给EXECUTION ENGINE,给具体的执行引擎进行执行,只有它才知道我们具体要操作哪一列;

4.任务最终发送给JobTracker,其对任务进行调度和分发,分发到不同的起点,进行reduce操作。

5.在进行对应map或reudce操作时会生成对应的一个操作的树;

6.计算的数据最终存在HDFS里,需要和NameNode连接,获取元数据,找到数据所在的DataNode,然后对应数据进行返回,进行Map或Reduce任务的运行;

HiveQL常用内部命令

1.在hive客户端执行linux shell命令:

不能使用需要用户输入的交互式命令;
不支持shell命令的“管道”功能和文件名自动补全;

  • ! (shell命令) ;

2.在hive客户端执行hadoop的dfs命令

在hive client执行hadoop命令实际比在bash shell客户端执行hadoop dfs命令更高效,因为bash shell每次都会启动一个新的jvm进程实例来运行hadoop dfs命令而hive client会共享hive client 已经开辟的自身jvm进程来执行hadoop dfs命令;

  • 在hive client执行hadoop dfs -xxx 命令时,需要把命令的”hadoop”关键字去掉并以分号”;”结尾;

3.Hive数据类型

  • TINYINT

    1BYTE 有符号整数

  • SMALLINT

    2BYTE 有符号整数

  • INT

    4BYTE 有符号整数

  • BIGINT

    8BYTE 有符号整数

  • BOOLEAN

    布尔类型true or false

    (boolean 常见使用0 1 替代true和false)

  • FLOAT

    单精度浮点数
    3.14159

  • DOUBLE

    双精度浮点数
    3.14159

  • STRING

    字符序列,可以指定字符集,可以使用单引号或双引号

  • TIMESTAMP

    整数、浮点数或者字符串

  • BINARY

    字节数组

4.Hive建表

  • 1.建立school数据库

    • create database school;
  • 2.在school数据库下建student表

    • create table school.student(id bigint, name string) row format delimited fields terminated by ‘,’;

      • create table school.student(id bigint, name string)

        创建表明和表字段名,与关系型数据库建表含义同

      • row format delimited

        行格式分隔,Hive是通过行格式来管理每条数据的分隔

      • fields terminated by ‘,’

        Hive中每行数据中各个字段的分隔符为逗号”,”

  • 3.导入数据到student表

    • load data local inpath ‘数据路径’ into table student;

补充:表类型

维度表

是抽象提取出的、和业务无关、很少变化的表。维度表数据量较小,结构较为简单,不会有太复杂的查询操作;比如部门id和部门名称的关系对应表;

事实表

在数据仓库中,保存度量值的详细值或事实的表称为“事实表”。比如用户行为表等;

5.hive集合数据类型

使用场景:一般用于一般用于前端埋点,流量埋点

  • STRUCT
  • MAP
  • ARRAY

Hive数据定义与操作

HiveQL数据定义语言

数据库相关

  • 1.创建数据库

    • create database (if not exists) 数据库名;
  • 2.查询数据库名以”s”开头的数据库列表

    • show database (like ‘s.*’);
  • 3,为数据库增加描述以说明数据库业务含义

    • create database (bank) (comment ‘Internet Banking’);

      创建数据库bank作为电子网银系统后台数据库

  • 4.查看数据库详细信息

    • desc/describe database (bank);
  • 5.删除数据库

    • drop database (if exists) 数据库名;
    • 删除含有多张表的数据库

      • drop database (if exists) 数据库名 (cascade);

数据库表相关

创建表
直接建表

1.create table (if not exists) enterprise.account(acc_name string, acc_balance double) row format delimited fields terminated by ‘\t’ location ‘user/hive/warehouse/enterprise.db/account’;

  • create table (if not exists) enterprise.account(acc_name string, acc_balance double)

    定义表和表字段

  • user/hive/warehouse

    是默认的数据仓库地址

  • /enterprise.db/account

    是enterprise数据库目录,下面存着该数据库里的表,如account

2.将建表语句写到sql文件中,在linux中用hive -f xx.sql创建表;

适用场景

1.用户可以根据数据源的格式进行自定义的建表,包含指定分隔符,列的分隔符、数据的存储格式、等等;

2.根据数据量,指定的时候选择合适的存储格式textfile、orc;

抽取as建表
1
create table table1 as select * from table2;

适用场景

工作中常用,涉及到逻辑的整合,比如和临时表结合使用,表的结构和数据都要;

like建表
1
create table table1 like table2;

创建表table1的表结构和table2一样,但是table1没有数据。

适用场景

只关心表的结构,数据不复制

建表时一般不建议以关键字为表字段,如果有需要加上``符号修饰

image-20210222103458387

修改表属性
修改表名
1
ALTER TABLE table_name RENAME TO new_tbl_name

修改列名

注意:如果是分区表最后的cascade要加上。

1
ALTER TABLE table_name CHANGE [COLUMN] column_name new_column_name column_type[int/string/float/..] [CASCADE];
修改列字段类型值
1
ALTER TABLE table_name CHANGE COLUMN column_name column_name NEW_TYPE  [CASCADE];
增加新字段

注意:如果是分区表,则新增字段必须在分区字段之前。

1
2
3
4
5
6
ALTER TABLE table_name ADD COLUMNS(col_name type) [CASCADE];

ALTER TABLE table_name ADD COLUMNS(
col_name type COMMENT '注释',
col_name2 type COMMENT '注释2'
) CASCADE;

改变列的位置

1
2
3
4
ALTER TABLE table_name CHANGE
[CLOUMN] col_old_name col_new_name column_type
[CONMMENT col_conmment]
[FIRST|AFTER column_name];

这个命令可以修改表的列名,数据类型,列注释和列所在的位置顺序,FIRST将列放在第一列,AFTER col_name将列放在col_name后面一列

内外部表

管理表(内部表)

1.管理表能有效的管理表的数据但是不利于数据的分享;

2.管理表删除后,对应的数据也会被删除

适用场景

在做etl逻辑处理时,往往会选择内部表作为中间表,因为这些中间表逻辑处理完后数据会进行删除,同样HDFS上数据得到删除;

外部表

1.删除外部表后,数据仍然存在hdfs中,重新建立相同名称表即可重新恢复删除的外部表;

2.外部表可以和其他外部表进行数据的共享;

  • create (external)table product(pro_name string, price double) row format delimited fields terminated by ‘\t’ location ‘/data/stocks’

  • 9.修改表

    • 1.修改表名

      • alter table (tableName) rename to (otherTableName)
    • 2.增加表字段

      • alter table (tableName) add columns(column1 type, column2, type)

适用场景

如果怕数据被误删除,可以直接选用外部表,因为不会删除源数据,方便恢复;

外部表删除后不会删除表数据,这时重新创建外部表,相同表名,也可以获取到数据。同样,建立内部表也可以获取到数据。因为,在创建外部表时,” load local data xxxx into table 表名 “ 这个记录存在,所以相同表名都能查询到数据

内外部表使用案例

每天将日志数据传入HDFS,一天一个目录;Hive基于流入的数据建立外部表,将每天HDFS上的原始日志映射到外部表的天分区中;在外部表基础上做统计分析,使用内部表存储中间表、结果表,数据通过SELECT+ INSERT进入内部表;

分区表

目的

在Hive中执行select查询一般会扫描整张表,当表的数据量很大时,会消耗过多的时间,影响执行的效率。有时候只需要扫描表中关心的一部分数据,因此引入了分区表的概念。

1.分区表能够把一张大表的数据根据业务需求分配到多张小表中,以提高表的并发量;

2.分区表使得所有数据都集中在一张表中,但是底层物理存储数据根据一定的规则划分到不同的文件中,这些文件还可以存储到不同的磁盘上,分散了存储的压力;

如何分区

业界常使用“dt”或“d”作为分区字段(分区字段命名),选取id、年月日、男女性、年龄段或者是能够平均将数据分导不同的文件中最好。分区不好将直接导致查询结果延迟。

分区细节
  1. 一个表可以拥有一个或多个分区,每个分区以文件夹的形式单独存在表文件夹的目录下;
  2. 表和列名不区分大小写;
  3. 分区是以字段的形式在表结构中存在,通过describe table命令可以查看到字段的存在(可以看作是一个伪列),即该字段不存放实际的数据内容,仅仅以分区表示;
  4. 分区有一级、二级设置(看分区字段有多少个),一般设置是一级分区;
  5. 分区分为动态分区和静态分区;
静态与动态分区
  • 静态分区

    使用静态分区在插入数据之前需要首先知道有哪些分区类型,针对每一个分区要单独采用load data 命令载入数据。

    • create table customer_partition(name string, age int) partitioned by(sex string) row format delimited fields terminated by ‘\t’;

      • partitioned by(sex string)

        Hive分区是在创建表时用partitioned by关键字定义,以sex作为分区字段

    • 导入数据

      • load data local inpath ‘/xxx/customer.txt’ into table customer_partition partition(sex=’man’);

        • partition(sex=’man’);

          静态分区在导入数据时,要额外指定分区字段值,此时分区表会自动生成一列分区字段man(该”man”分区表的sex字段全为man
          )

    • 应用场景

      • 数据量不大,同时知道分区数据类型可以这样操作;
      • 缺点:每一次load data都要操作,繁琐
  • 动态分区

    动态分区不用手动指定指定分区,由系统自己选择分配

    • 开启动态分区功能(只在当前cli有效,要全局有效,需要设置配置文件)

      • 1.set hive.exec.dynamic.partition=true;
        1. set hive.exec.dynamic.partition.mode=nonstrict;

          • hive.exec.dynamic.partition.mode属性为“nonstrict”,意味着所有分区列都是动态的,为strict意味着不允许分区列全部是动态的
      • 3.set hive.exec.max.dynamic.partitions.pernode=1000;

  • 设置最大动态分区个数

  • 创建动态分区表

    • create table customer_partition(name string, age int) partitioned by(sex string) row format delimited fields terminated by ‘\t’;

      设置好分区字段后,直接导入数据即可自动分区,不需要指定分区字段

    • 应用场景

      • load data 非常方便,但是要打开动态分区并设置非严格模式
      • image-20210222134619146
      • image-20210223204159575
      • image-20210223204218442
      • image-20210223204459014
      • 利用时间戳分区
Hive分区常见操作整理
  • 增加单个分区(增加同级分区)
1
2
alter table (table_name) add partition(分区字段名称=实际分区值)
alter table part_table add partition(country='CN')
  • 增加多个分区
1
2
alter table (table_name) add partition(分区字段名称=实际分区值), partition(分区字段名称=实际分区值)
alter table part_table add partition(country='CN'), partition(country='UK')
  • 增加分区的同时添加数据
1
2
3
alter table (table_name) add partition(分区字段名称=实际分区的值) location 'HDFS path'
# location后面需要的是一个存储数据的路径文件夹而不是到文件
alter table part_table add partition(country='MS') location '/user/hive/warehouse/db1/test'
  • 删除单个分区
1
2
alter table (table_name) drop partition(分区字段名称=实际分区值)
alter table part_table drop partition(country='CN')

image-20210223204853396

  • 删除多个分区
1
alter table (table_name) drop partition(分区字段名称=实际分区值), partition(分区字段名称=实际分区值)
补:Hive分区表的一些操作
1.不在hive终端命令创建分区,直接HDFS创建分区,可以查询到吗?

不可以。Hive要查询到数据有两点,第一点元数据存在,第二点,对应路径下有数据。

若想这样操作查数据也是可以的,有三种思路,本质上都是补充元数据。

  1. 要运行分区修复命令,会添加新的分区元数据,此时即可查。

  1. 新建分区,分区字段等于直接HDFS创建的分区;

  1. 直接load指明分区

load的时候会做两个操作,第一添加分区信息,第二上传数据。

HiveQL数据操作语言

  • 1.向表中装载数据

    • load data local inpath ‘/xxx/customer.txt’
      (overwrite) into table customer_partition;
  • 2.经查询语句向表中插入数据

    • insert overwrite table tableName select * from table1 where keyword like ‘%拆%’;

      • 向tableName表中插入keyword字段含有“拆”的table1表的数据
  • 3.单个查询语句中创建表并加载数据

    • create table tableName as select * from table1;
  • 4.导入数据

    • load data local inpath ‘数据路径’ into table tableName;
    • create table (if not exists) tableName(acc_name string, acc_balance double) row format delimited fields terminated by ‘\t’ location ‘user/hive/warehouse/enterprise.db/account’;

      • 将数据文件放在location所指的hdfs路径下即可
  • 5.导出数据

    • hadoop fs -get 该表所在的hdfs路径

HiveQL数据查询基础

SELECT语句

用于选取字段

Where语句

用于过滤条件

GROUP BY语句

通常与聚合函数一起使用,按照一个或多个列对结果进行分组,然后用聚合函数对每个组执行聚合运算

HAVING分组筛选

Having子句允许用户通过一个简单的语法,来完成原本需要通过子查询才能对GROUP BY语句产生的分组结果进行条件过滤的任务

ORDER BY 和 SORT BY 语句

1.ORDER BY语句会对查询结果集执行一次全局排序,也就是说会有一个所有数据都通过一个reducer处理的过程。对于大数据集,会消耗过多的时间;

2.SORT BY 语句智慧在每个reducer中对数据进行排序,即执行一个局部排序,因此保证了每个reducer输出数据是有序的(非全局有序),这样就可以提高后面进行全局排序的效率;

总结:对于大数据集采用sort by语句进行处理能够提高效率,但在数据集不大的情况下,用order by即可;

  • ASC关键字-升序
  • DESC关键字-降序

Union语句

合并去重查询的记录;

Union all

将A和B表进行数据合并,要保证字段的顺序和个数一致,别名一致,合并结果不去重。对比union,union all 效率要更高;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
select
user_id, count(distinct product_id) pro_cnt
from
(
select a.user_id, b.product_id
from orders a
inner join trains b
on a.order_id = b.order_id

union all
-- union

select a.user_id, b.product_id
from orders a
inner join priors b
on a.order_id = b.order_id
) t
group by user_id
having pro_cnt > 10
order by pro_cnt desc
limit 10;

With

类似于子查询的功能,提高代码的易读性,方便排查问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
with user_pro as (
select * from
(select a.user_id, b.product_id
from orders a
inner join trains b
on a.order_id = b.order_id

union all
-- union

select a.user_id, b.product_id
from orders a
inner join priors b
on a.order_id = b.order_id
) t

)

select
user_id, count(distinct product_id) pro_cnt
from user_pro t
group by user_id
having pro_cnt > 10
order by pro_cnt desc
limit 10;

HiveQL连接查询语句

  • Join连接语句

    • 内连接

      只有进行连接的两个表中都存在与连接标准相匹配的数据才会被保留下来

      • 等值连接

        使用等号操作符的连接

        • select * from table1 inner join table2 on table1.num = table2.num where table2.num = value;

          查看num值为value的两表中的数据

      • 不等值连接

        使用>,>=,<=,<.!>,!<和<>操作符的连接

        • select * from table1 inner join table2 on table1.num = table2.num where table2.num != value;
    • 自然连接

      在广义笛卡尔积中选出同名属性上符合相等条件的元组,再进行投影,去掉重复的同名属性,组成新的关系;

      自然连接与外连接的区别在于,对于无法匹配的记录,外连接会虚拟一条与之匹配的记录来保存全连接表中的所有记录,但自然连接不会;

      • select * from table1 natural join table2 where table2.num in (‘value1’, ‘value2’);
    • 外连接

      • 左外连接

        以连接中的左表为主,返回左表的所有信息和右表中符合连接条件的信息,对于右表不符合条件的则补空值;

        • select f., s. from table1 f left outer join table2 s on f.num = s.num;
      • 右外连接

        以连接中的右表为主,返回右表的所有信息和左表中符合连接条件的信息,对于左表不符合条件的则补空值;

        • select f., s. from table1 f right outer join table2 s on f.num = s.num;
      • 全外连接

        查询结果等于左外连接和右外连接的和

        • select f., s. from table1 f full outer join table2 s on f.num = s.num;
    • 自连接

      连接的表是同一张表,使用自连接可以将自身表的一个镜像当作另一个表来对待,适用于表自己和自己的连接查询;

      • select f1., f2. from table1 f1, table1 f2 where f1.num = f2.no;

HiveQL数据查询进阶

Hive内置函数P71

  • show functions;

    查看hive提供的内置函数

  • 数学函数

    • +
    • -
    • *
    • /
    • round

      四舍五入函数:
      round(77.987, 2)
      —>77.99

    • ceil

      向上取整函数
      ceil(88.7)
      —>89

    • floor

      向下取整函数
      floor(88.7)
      —>88

    • pow

      取平方函数
      pow(3,2)
      —>9

    • pmod

      取模函数,即取余数
      pmod(4, 3)
      —>1

  • 字符函数

    • lower转小写函数

      lower(‘ABCD’)
      —>abcd

    • upper转大写函数

      upper(‘abcd’)
      —>ABCD

    • length字符串长度函数

      alength(“hadoop”)
      —>6

    • concat字符串拼接函数

      concat(“hadoop”, “&spark”)
      —>hadoop&spark

    • substr求子串函数

      substr(a, b):从字符a中第b位开始取,取右边所有字符;
      substr(a, b, c):从字符a中第b位开始取,取右边c个字符;
      substr(“hadoop&spark”, 3, 3)
      —>doo

    • trim去前后空格函数

      trim(str):将字符串前后出现的空格剔除

    • get_json_object用于处理json格式数据的函数

      案例:有表table,其中json字段存放json数据如下:

      select json from table;
      [{“name”:”zhangsan”, “age”:23, “adddress”:”Gansu”}]
      [{“name”:”zhangfeng”, “age”:20, “adddress”:”Gansu”}]

      select get_json_object(a.j, ‘$.name’) from (select substr(json, 2, length(json)-2) as j from table) a;
      —>(select substr(json, 2, length(json)-2) as j from table) a

      {“name”:”zhangsan”, “age”:23, “adddress”:”Gansu”}
      {“name”:”zhangfeng”, “age”:20, “adddress”:”Gansu”}

      —>select get_json_object(a.j, ‘$.name’) from a;

      zhangsan
      zhangfeng

  • 收集函数

  • 转换函数

    • cast类型转换函数

      select cast(99 as double);
      —>99.0

  • 日期函数

    • year/month/day获取年月日的函数

      select year(“2021-2-11 00:00:00”), month(“2021-2-11 00:00:00”), day(“2021-2-11 00:00:00”);
      —>2021 2 11

  • 条件函数

    • case…when…是,条件表达式

      case A (when B then C)(括号里的判断条件可以多个) else F end
      :对于A来说,(如果判断为B则返回C),如果以上都不是则返回F。最后一个end是结束符;

  • 聚合函数

    • count:返回行数

      select count(*) from table;
      返回table表中的行数

    • sum:组内某列求和函数

    • min:组内某列最小值
    • max:组内某列最大值
    • avg:组内某列求平均值
  • 表生成函数

Hive数据库对象与用户自定义函数

Hive视图

视图是只读型的,不能对视图执行增删插改等数据结构操作。视图一旦创建就是固定的,对基础表的更新将不会反映在视图上,删除基础表,视图也不会自动删除,需要手动执行视图删除命令;

  • 创建视图

    使用create view创建视图,视图名字不能与存在的表或视图名相同;

    • create view (if not exists) db_name.view_name as select [column name…] from table_name where …
  • 查看视图

    • show tables;
    • desc view_name;
    • desc formatted view_name

      查看视图的详细信息

  • 删除视图

    • drop view (if exists) [db_name].[view_name]

Hive分桶表

对比前面的分区表,分桶表是对数据进行更加细粒度的划分。分桶表将整个数据内容按照某列属性值的哈希值进行区分,使用该哈希值除以桶的个数得到取余数,余数决定了该条记录会被分在哪个桶中。余数相同的记录会分在一个桶里。需要注意的是,在物理结构上,一个桶对应一个文件,而分区表只是一个目录,至于目录下有多少数据是不确定的。Hive执行分桶表操作需要执行MR任务,分多少桶,就有多少个reducetask。

案例:按照用户id进行分桶,假设要分成三个桶,有三个用户id分别为1,35,97,则这三个用户id对应的记录分到的桶为:
1 / 3 = 0
35 / 3 = 11…2
97 / 3 = 32…1
1<—>0桶,35<—>2桶,97<—>1桶

应用场景

数据抽样

在处理大规模数据集时,尤其载数据挖掘的阶段,可以用一份数据验证一下,代码是否可以运行成功,进行局部测试,也可以抽样进行一些代表性统计分析。

map-side join

可以获得更高的查询处理效率。桶为表加上了额外的结构,(利用原有字段进行分桶),Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如join操作。对于join操作两个表有一个相同列,如果对这两个表都进行了桶操作,那么保存相同列值的桶进行join操作就可以,大大减少了join的数据量。

Hive创建分桶

  • 创建表

    • clustered by (字段名) into bucket_num buckets
    • create table table_bucket(uid int, keyword string) comment ‘bucket test’ clustered by (uid) into 3 buckets row format delimited fields terminated by ‘\t’;
  • 插入数据

    必须启动mapreduce才能把文件顺利分桶,若使用load data local inpath这种方式加载数据,即使设置了强制分桶也不起作用。注意,插入数据前要设置hive.enforce.bucketing=true,其含义是数据分桶是否强制执行,默认false,开启写入table数据时会启动分桶;

    • set hive.enforce.bucketing=true;
    • insert overwrite table table_bucket select uid, keyword from origin_table ;
  • 数据抽样
1
select * from table_bucket tablesample(bucket 1 out of 32 on uid)

– tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
– x 表示从第几个分桶进行抽样,y每隔几个分桶取一个分桶, y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。

抽样案例

例如,table总bucket数为32,tablesample(bucket 1 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第1个bucket和第(1+16)17个bucket的数据

Hive分桶总结

Hive 分桶的概念就是MapReduce的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。

而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件。通过文件夹可以查询里面存放的文件。但文件夹本身和数据的内容毫无关系。

桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。

这些小文件可以单独排序。如果另外一个表也按照同样的规则分成了一个个小文件。两个表join的时候,就不必要扫描整个表,只需要匹配相同分桶的数据即可。效率可想而知,得到了极大的提升。

同样,对数据抽样的时候,也不需要扫描整个文件。只需要对每个分桶按照相同规则抽取一部分数据即可。

用户自定义函数

UDF可以直接应用于select语句,对查询结果进行格式化处理后再输出内容,即使用udf和hive内置函数方式一样;

  • UDF

用户自定义聚合函数

普通函数一般是接受一行输入,产出一行输出,而聚合函数是接受一组输入(多行输入),产生一个输出;
例如:count函数就是一个聚合函数,因为它接受多行输入然后产生一个输出总数。

  • UDAF

用户自定义表生成函数

表生成函数接受0个或多个输入然后产生多列或多行输出。

  • UDTF

Hive进阶

Hive表执行顺序

HSQL执行顺序

在hive中执行sql语句的执行查询顺序:

select … from … where … group by … having … order by …

执行顺序:

from … where … select … group by … having by … order by…

MR程序的执行顺序

Map阶段

1.执行from加载,进行表的查找与加载;

2.执行where过滤,进行条件过滤与筛选;

3.执行select查询:进行输出项的筛选;

4.map端文件合并

  • reduce阶段:map端本地溢出文件的合并操作,每个map最终形成一个临时文件。然后按照列映射到对应的reduce;
Reduce阶段

1.group by:对map端发送过来的数据进行分组并进行计算;

2.having:最后过滤列用于输出结果;

3.order by:排序后进行结果输出到HDFS文件;

通过以上分析可以看出,在进行select之后我们会形成一张逻辑表,在这表中做分组排序等操作;

Hive优化

image-20210222143005298

减少查询数据量

1.分区表优化;

2.桶表优化;

  • 提高join效率;
  • 提高抽样效率;

3.在查询前进行列的裁剪、提前过滤等;

压缩数据

image-20210222145751546

数据压缩,常见Textfile(维度表、数据量级不大)、orc

Map优化

– 作业会通过input的目录产生一个或者多个map任务。set dfs.block.size
– Map越多越好吗?是不是保证每个map处理接近文件块的大小?
– 如何合并小文件,减少map数?

1
2
3
4
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

– Map端提前聚合,类似于mr中的combiner

1
set hive.map.aggr=true 

减少数据量,减少数据传输的占用的带宽。

– 如何适当的增加map数?

1
set mapred.map.tasks=10;

Reduce优化

– hive.exec.reducers.bytes.per.reducer;reduce任务处理的数据量
– 调整reduce的个数:

  • 设置reduce处理的数据量

一个Reduce的情况

– 没有group by

1
2
3
4
select count(1) cnt
from orders
where order_dow='0';
number of mappers: 1; number of reducers: 1

– order by(可以使用distribute by和sort by)

1
2
3
4
5
6
select user_id, order_dow
from orders
where order_dow='0'
order by user_id
limit 10;
number of mappers: 1; number of reducers: 1

– 笛卡尔积(增加一些伪列,)

1
2
3
select *
from tmp_d a
join (select * from tmp_d ) b

数据去重多用group by,少用distinct,distinct在reduce阶段是单个reduce处理,group by是多个并发,效率更高;

分区裁剪(partition)

分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

优化案例
1
2
3
4
5
6
7
8
9
10
-- 不推荐
-- 先进行关联,过滤条件在where后
select count(*) order_cnt
from
(select order_id, order_dow from orders limit 1000) ord
inner join
(select order_id from trains limit 1000) tra
on ord.order_id = tra.order_id
where order_dow='1'
limit 10;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 推荐
--1.
select count(*) order_cnt
from
-> <-
(select order_id, order_dow from orders where order_dow='1' limit 1000) ord
inner join
(select order_id from trains limit 1000) tra
on ord.order_id = tra.order_id
-- where order_dow='1'
limit 10;

--2.
select count(*) order_cnt
from
(select order_id, order_dow from orders limit 1000) ord
inner join
(select order_id from trains limit 1000) tra
on (ord.order_id = tra.order_id and ord.order_dow='1')
-- where order_dow='1'
limit 10;

笛卡尔积

  • join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积

  • Map join

    • /+ MAPJOIN(tablelist) /,必须是小表,不要超过1G,或者50万条记录

    • ```sql
      select /+ MAPJOIN(aisles) / a.aisle as aisle, b.product_id as product_id
      from aisles a
      join products b
      on a.aisle_id=b.aisle_id
      limit 10;

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69

      * Union all / distinct

      * 先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个Select,最终只有一个mr

      * ```sql
      -- union all对比union
      -- union all 加上distinct 对比 union 的效率
      union all 加上distinct 对比 union 的效率?
      -- union all效率高
      ========================================================
      select count(*)
      from
      (select distinct *
      from
      (
      select user_id,order_id from orders where order_dow='1'
      union all
      select user_id,order_id from orders where order_dow='2'
      union all
      select user_id,order_id from orders where order_dow='3'
      ) t
      ) t1;
      -------------------------------------------------------相同意思,不同写法
      with t1 as
      (select distinct *
      from
      (
      select user_id,order_id from orders where order_dow='1'
      union all
      select user_id,order_id from orders where order_dow='2'
      union all
      select user_id,order_id from orders where order_dow='3'
      ) t
      )
      select count(*) from t1;
      ========================================================union all执行耗时结果
      Total MapReduce CPU Time Spent: 14 seconds 30 msec
      OK
      _c0
      1491710
      Time taken: 44.528 seconds, Fetched: 1 row(s)

      ========================================================
      select count(*)
      from
      (
      select user_id,order_id from orders where order_dow='1'
      union
      select user_id,order_id from orders where order_dow='2'
      union
      select user_id,order_id from orders where order_dow='3'
      ) t ;
      -------------------------------------------------------相同意思,不同写法
      with t as
      (
      select user_id,order_id from orders where order_dow='1'
      union
      select user_id,order_id from orders where order_dow='2'
      union
      select user_id,order_id from orders where order_dow='3'
      )
      select count(*) from t;
      ========================================================union执行耗时结果
      Total MapReduce CPU Time Spent: 26 seconds 70 msec
      OK
      _c0
      1491710
      Time taken: 62.972 seconds, Fetched: 1 row(s)

Multi-insert & multi-group by

– 从一份基础表中按照不同的维度,一次组合出不同的数据 userid product_num order_num
– FROM _statement

1
2
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2

Automatic merge

– 当文件大小比阈值小时,hive会启动一个mr进行合并
– hi ve.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True
– hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False
– hive.merge.size.per.task = 25610001000 合并文件的大小

Multi-Count Distinct

– 必须设置参数:set hive.groupby.skewindata=true;
– select dt, count(distinct uniq_id), count(distinct ip)
– from ods_log where dt=20170301 group by dt

set hive.groupby.skewindata=true;被称为万用参数

本质:将一个mapreduce拆分为两个MR

无论什么优化,都可以加上该参数,加入该参数后,job数会从原来的一个变成两个。

合并小文件

Map输入合并:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

— Map端输入、合并文件之后按照block的大小分割

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

— Map端输入,不合并

Map/Reduce输出合并

set hive.merge.mapfiles=true;

— 是否合并Map输出文件, 默认值为真

set hive.merge.mapredfiles=true;

— 是否合并Reduce 端输出文件,默认值为假

set hive.merge.size.per.task=25610001000;

— 合并文件的大小,默认值为 256000000

合理控制reducer数量

参数1:hive.exec.reducers.bytes.per.reducer(默认1G)

参数2:hive.exec.reducers.max(默认为999)

reducer的计算公式为:min(参数2,总输入数据量/参数1)

也可以通过set mapred.map.tasks=10;直接控制reducer个数

压缩

  • 减少数据的小大和数据磁盘读写时间

set mapreduce.map.output.compress=true;

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

  • 中间数据压缩:对hive查询多个job之间的数据进行压缩

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

set hive.intermediate.compression.type=BLOCK;

  • 结果数据压缩:reducer输出数据,结果数据作为其他查询任务数据源

set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

set mapreduce.output.fileoutputformat.compress.type=BLOCK;

矢量化执行

它通过一次性批量执行1024行而不是每次执行单行来实现,相当于提高每次执行的量级,即吞吐量。

  • 参数

set hive.vectorized.execution = true;

set hive.vectorized.execution.enabled = true;

和ORC格式配合使用,矢量化在Hive的新版本中似乎有一些问题。

Hive join 优化

一个MR job a,b ,c
1
2
3
4
select a.val, b.val, c.val
from a
join b on(a.key = b.key1)
join c on(a.key=c.key1)
生成多个MR job a,b,c
1
2
3
4
select a.val, b.val, c.val
from a
join b on(a.key = b.key1)
join c on(c.key=b.key2)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Launching Job 1 out of 1
select ord.order_id
from orders ord
join trains tra on ord.order_id = tra.order_id
join priors pri on ord.order_id = pri.order_id
limit 10;

--多个MR任务
Total jobs = 3
select ord.order_id
from orders ord
join trains tra on ord.order_id = tra.order_id
join products pro on tra.product_id = pro.product_id
limit 10;
表连接顺序

1.按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。

2.使用hint的方式启发JOIN操作

1
2
3
4
5
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val
FROM a
JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key1);
-- a表被视为大表
1
2
3
4
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a
JOIN b ON a.key = b.key;
-- MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多.

案例

1
2
3
4
5
6
7
8
9
-- /*+ STREAMTABLE(a) */

select /*+ STREAMTABLE(pri) */ ord.order_id, pri.product_id
from orders ord
join priors pri on ord.order_id = pri.order_id
join products pro on pri.product_id = pro.product_id
limit 10;

join 等价于 inner join

image-20210222164752228

并行执行

同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段。某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说可以并行执行的,这样可能使得整个job的执行时间缩短。

参数设置:set hive.exec.parallel=true;

数据倾斜
  • 产生操作

    • Join on a.id=b.id
    • Group by
    • Count Distinct count(groupby)
  • 原因

    • key分布不均导致的
    • 人为的建表疏忽
    • 业务数据特点
  • 症状

    • 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。
    • 查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜。
  • 倾斜度

    • 平均记录数超过50w且最大记录数是超过平均记录数的4倍。Null 50w 10w
    • 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍。
  • 万能方法

    • set hive.groupby.skewindata=true
  • 对异常值赋予随机变量来分散key,可以通过rand函数将为null的值分散到不同的值上

    • case when uid is null then cast(rand(100)*100000 as int) else uid end
  • 将异常值单独拿出来处理,最后union回去

  • set hive.groupby.skewindata = true; — 数据倾斜进行负载均衡

    • (默认是false, group by过程出现倾斜)
  • set hive.optimize.skewjoin=true; — join 过程出现倾斜 应该设置为true

  • count distinct拆成两个job,先distinct,再对子表count(1)(数据量大)

    • ```sql
      with t1 as
      (select distinct
      from
      (
      select user_id,order_id from orders where order_dow=’1’
      union all
      select user_id,order_id from orders where order_dow=’2’
      union all
      select user_id,order_id from orders where order_dow=’3’
      ) t
      )
      select count(
      ) from t1;
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47

      ##### 数据倾斜-大小表关联

      **原因**

      Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边。

      **思路**

      将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率使用map join让小的维度表先进内存。

      **方法**

      Small_table join big_table

      ##### 数据倾斜-大大表关联

      **原因**

      日志中有一部分的userid是空或者是0的情况,导致在用user_id进行hash分桶的时候,会将日志中userid为0或者空的数据分到一起,导致了过大的斜率。

      **思路**

      把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不展现;

      **方法**

      `on case when (x.uid = '-' or x.uid = '0‘ or x.uid is null) then concat(‘-',rand()) else x.uid end =f.user_id;`

      ##### 数据倾斜-大大表关联(消减业务)

      ![image-20210222173045460](https://imagebedjh.oss-cn-beijing.aliyuncs.com/imgs/image-20210222173045460.png)

      ##### 数据倾斜-空间换时间

      ![image-20210304180152148](https://imagebedjh.oss-cn-beijing.aliyuncs.com/imgs/image-20210304180152148.png)

      #### 补充:Hive运行中的参数介绍

      1.`set hive.exec.reducers.bytes.per.reducer=<number>`

      每一个reduce处理的byte的数量,如果大于number,就会多生成一个reduce;

      ```sql
      # 设置reduce最大处理的数据量
      set hive.exec.reducers.bytes.per.reducer=200000;
      # 200000 bytes ~= 195kb

补充换算关系

  1. 1Bytes=8bits(1字节等于8位)
  2. 1KB=1024Bytes(1KB等于1024字节)
  3. 1MB=1024KB
  4. 1GB=1024MB
  5. 1TB=1024GB

2.set hive.exec.reducers.max=<number>

控制reduce个数的上限

1
2
set hive.exec.reducers.max=10;
number of mappers: 1; number of reducers: 10

set hive.exec.reducers.max=<number>的优先级要高于set hive.exec.reducers.bytes.per.reducer=<number>,如果两个参数都有设置,则以第一个参数为准。

3.set mapreduce.job.reduces=<number>
指定reduce个数。

1
2
set mapreduce.job.reduces=5;
number of mappers: 1; number of reducers: 5

set mapreduce.job.reduces=<number>参数的优先级最高。若以上三个参数都有设置,则以该参数为准。

4.yarn.scheduler.minimum-allocation-mbyarn.scheduler.maximum-allocation-mb

说明:单个容器可申请的最小与最大内存,应用在运行申请内存时不能超过最大值,小于最小值则分配最小值,从这个角度看,最小值有点想操作系统中的页。最小值还有另外一种用途,计算一个节点的最大container数目注:这两个值一经设定不能动态改变(此处所说的动态改变是指应用运行时)。

默认值:1024/8192

5.yarn.scheduler.minimum-allocation-vcoresyarn.scheduler.maximum-allocation-vcores

参数解释:单个可申请的最小/最大虚拟CPU个数。比如设置为1和4,则运行MapRedce作业时,每个Task最少可申请1个虚拟CPU,最多可申请4个虚拟CPU。

默认值:1/32

6.yarn.nodemanager.resource.memory-mbyarn.nodemanager.vmem-pmem-ratio

说明:每个节点可用的最大内存,RM中的两个值不应该超过此值。此数值可以用于计算container最大数目,即:用此值除以RM中的最小容器内存。虚拟内存率,是占task所用内存的百分比,默认值为2.1倍;注意:第一个参数是不可修改的,一旦设置,整个运行过程中不可动态修改,且该值的默认大小是8G,即使计算机内存不足8G也会按着8G内存来使用。

默认值:8G /2.1

7.yarn.nodemanager.resource.cpu-vcores

参数解释:NodeManager总的可用虚拟CPU个数。

默认值:8

mapred-site.xml

image-20210223145004749

参考资料

《大数据 Hive离线计算开发实战》