Hive的UDF、UDTF

Hive的UDF函数

案例1:用户自定义函数实现小写转大写

1.查看案例数据格式

1
2
3
4
(base) [root@main hive_udf]# head -3 movies.csv 
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy

2.创建表

1
2
3
4
5
6
7
CREATE TABLE `movie_table`(
`movieid` string,
`title` string,
`genres` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'

3.导入数据

1
load data local inpath "/usr/local/src/practice_code/hive/hive_udf/movies.csv" into movie_table
1
2
3
4
5
6
7
8
9
hive> select * from movie_table limit 5; 
OK
movie_table.movieid movie_table.title movie_table.genres
1 Toy Story (1995) Adventure|Animation|Children|Comedy|Fantasy
2 Jumanji (1995) Adventure|Children|Fantasy
3 Grumpier Old Men (1995) Comedy|Romance
4 Waiting to Exhale (1995) Comedy|Drama|Romance
5 Father of the Bride Part II (1995) Comedy
Time taken: 0.452 seconds, Fetched: 5 row(s)

4.使用Java开发Hive的UDF并打包上传集群

1
2
3
4
5
6
7
8
9
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Uppercase extends UDF {
// 传入一个字符串,将小写变为大写,通过java 实现了hive 的不支持的功能
public Text evaluate(final Text s) {
return new Text(s.toString().toUpperCase());
}
}

使用Maven产生一个jar包(mvn clean assembly:assembly),打包上传集群,在hive中添加:

1
2
3
hive> add jar /usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar;
Added [/usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [/usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar]

5.创建UDF函数

1
2
3
hive> create temporary function upper_func as 'Uppercase';
OK
Time taken: 0.039 seconds

add jar作用域只在当前连接中有用,create func也是如此,一旦关闭客户端,下次创建的函数就不起作用了。

6.应用函数

可以看到,该函数的执行不需要调用MR。

1
2
3
4
5
6
7
8
9
hive> select title, upper_func(title) from movie_table limit 5;
OK
title _c1
Toy Story (1995) TOY STORY (1995)
Jumanji (1995) JUMANJI (1995)
Grumpier Old Men (1995) GRUMPIER OLD MEN (1995)
Waiting to Exhale (1995) WAITING TO EXHALE (1995)
Father of the Bride Part II (1995) FATHER OF THE BRIDE PART II (1995)
Time taken: 0.092 seconds, Fetched: 5 row(s)

(28条消息) HiveUDF开发指南迎难而上-CSDN博客_hive udf

案例2:用户自定义函数实现表字段的拼接

这里仍然使用案例1的表数据,实现movieId和title两个字段的拼接,使用python实现。

1.编写实现字段拼接功能udf函数

1
2
3
4
5
# encoding= utf-8

for line in sys.stdin:
fields = line.strip().split('\t')
print '-'.join(fields[0].strip(), fields[1].strip())

2.将脚本导入hive中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
hive> add file /usr/local/src/practice_code/hive/hive_udf/transform1.py;
Added resources: [/usr/local/src/practice_code/hive/hive_udf/transform1.py]
hive> select transform(movieId, title) using "python transform1.py" as (uuu) from movie_table limit 10;
Query ID = root_20210617194446_f35d4eac-6e96-401d-a242-b8b6a918e1fe
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Cumulative CPU: 1.87 sec HDFS Read: 261989 HDFS Write: 221 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 870 msec
OK
pinjie
1-Toy Story (1995)
2-Jumanji (1995)
3-Grumpier Old Men (1995)
4-Waiting to Exhale (1995)
5-Father of the Bride Part II (1995)
6-Heat (1995)
7-Sabrina (1995)
8-Tom and Huck (1995)
9-Sudden Death (1995)
10-GoldenEye (1995)
Time taken: 18.456 seconds, Fetched: 10 row(s)

注意:第一个transform是hive的关键字与文件无关系!

可以看到,这种方式启动了MR进行计算,由于只是输出,所以没有reducer。

案例3:使用linux自带awk命令创建udf函数

1.测试awk命令

1
2
3
4
5
6
7
8
9
10
11
(base) [root@main hive_udf]# head movies.csv | awk -F ',' '{print $1"_"$2}'
movieId_title
1_Toy Story (1995)
2_Jumanji (1995)
3_Grumpier Old Men (1995)
4_Waiting to Exhale (1995)
5_Father of the Bride Part II (1995)
6_Heat (1995)
7_Sabrina (1995)
8_Tom and Huck (1995)
9_Sudden Death (1995)

2.创建一个awk的脚本文件:transform.awk

1
2
3
{
print $1"_"$2
}

3.测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
hive> add file /usr/local/src/practice_code/hive/hive_udf/transform.awk;
Added resources: [/usr/local/src/practice_code/hive/hive_udf/transform.awk]
hive> select transform(movieId, title) using "awk -f transform.awk" as (pinjie) from movie_table limit 10;
Query ID = root_20210618182256_4b77c2ef-0123-4afa-9c38-b1725757b90e
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Stage-Stage-1: Map: 1 Cumulative CPU: 2.22 sec HDFS Read: 405145 HDFS Write: 91 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 220 msec
OK
pinjie
1_Toy
2_Jumanji
3_Grumpier
4_Waiting
5_Father
6_Heat
7_Sabrina
8_Tom
9_Sudden
10_GoldenEye
Time taken: 25.73 seconds, Fetched: 10 row(s)

案例4:udf实现wordcount

1.查看表数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
hive> show create table badou.docs;
OK
createtab_stmt
CREATE TABLE `badou.docs`(
`line` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://main:9000/user/hive/warehouse/badou.db/docs'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='1',
'numRows'='0',
'rawDataSize'='0',
'totalSize'='632207',
'transient_lastDdlTime'='1618880802')
Time taken: 0.163 seconds, Fetched: 17 row(s)

-- 表只有一个字段line,一行一篇文章。
hive> select line from badou.docs;
Soames gave him a strange, sidelong stare.
“My wife can see no one,” he muttered doggedly.
Young Jolyon answered gently: “I shouldn’t keep her a minute.”
Soames brushed by him and barred the way.
“She can see no one,” he said again.
Young Jolyon’s glance shot past him into the hall, and Soames turned. There in the drawing-room doorway stood Irene, her eyes were wild and eager, her lips were parted, her hands outstretched. In the sight of both men that light vanished from her face; her hands dropped to her sides; she stood like stone.
Soames spun round, and met his visitor’s eyes, and at the look he saw in them, a sound like a snarl escaped him. He drew his lips back in the ghost of a smile.
“This is my house,” he said; “I manage my own affairs. I’ve told you once — I tell you again; we are not at home.”
And in young Jolyon’s face he slammed the door.

The End
Time taken: 0.089 seconds, Fetched: 2866 row(s)

2.使用python开发map逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import re
import sys
import string

def mapper_func():
# 设置一个正则匹配模式,用于匹配单词
pattern = re.compile(r'\w+')
# hive读取的标准输出从标准输入进入
for line in sys.stdin:
# 按空格拆分文章
sgw_list = line.strip().split(" ")
# 验证,清理,输出一个单词
for word in sgw_list:
# 如果匹配的单词长度小于1,继续
if len(pattern.findall(word)) < 1:
continue
# 返回的是一个匹配列表
word = pattern.findall(word)[0].lower()
word = word.strip()
if word != '':
print("%s\t1"%word)

if __name__ == "__main__":
mapper_func()

3.开发reduce逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# python v:2.7.5
# author by:Jabari
# 2021-2-5

import sys

def reducer():
sum = 0
word_cur = None
for line in sys.stdin:
ll = line.strip().split("\t")
if len(ll) != 2:
# 如果不是(word, value)形式,忽略,继续循环
continue
word, count = ll
if word == ' ' or word == '':
continue
if word_cur == None:
word_cur = word
if word_cur != word:
print('\t'.join([word_cur, str(sum)]))
word_cur = word
sum = 0
sum += int(count)
# 会存在最后一个word无法输出,所以在for循环外加上一句输出,把最后一个word输出
print('\t'.join([word_cur, str(sum)]))
if __name__ == "__main__":
reducer()

4.在hive中加载

1
2
3
4
hive> add file /usr/local/src/practice_code/hive/hive_udf/map.py;
Added resources: [/usr/local/src/practice_code/hive/hive_udf/map.py]
hive> add file /usr/local/src/practice_code/hive/hive_udf/reduce.py;
Added resources: [/usr/local/src/practice_code/hive/hive_udf/reduce.py]

5.运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hive> select transform(wc.word, wc.count) using "python reduce.py" as w, c 
> from
> (
> select transform(line) using "python map.py" as word, count
> from badou.docs
> cluster by word)wc -- cluster by按照word进行分桶,方便后续进行reduce
> limit 1000;
Query ID = root_20210618184656_d016ca78-ece6-42b8-ad95-82e34c90fda6
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
broadstairs 3
brocaded 1
broke 19
broken 20
brokenly 1
brokers 1
brompton 1
bronze 2
Time taken: 29.892 seconds, Fetched: 1000 row(s)

Hive的UDTF函数开发

案例1:将表中一行数据转列数据

1.查看表数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
hive> show create table badou.udtf_test_table;
OK
createtab_stmt
CREATE TABLE `badou.udtf_test_table`(
`data` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://main:9000/user/hive/warehouse/badou.db/udtf_test_table'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='1',
'numRows'='0',
'rawDataSize'='0',
'totalSize'='70',
'transient_lastDdlTime'='1618877555')
Time taken: 0.036 seconds, Fetched: 18 row(s)
-- 一共两行数据
hive> select * from badou.udtf_test_table limit 5;
OK
udtf_test_table.data
1:0.1;2:0.2;3:0.3;4:0.4;5:0.5
10:0.11;20:0.22;30:0.33;40:0.44;50:0.55
Time taken: 0.046 seconds, Fetched: 2 row(s)

表中只有两行数据,1:0.1其中”;“将多行数据拼接成一列,现在要实现行转列。

2.使用java进行编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.ArrayList;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class Expolde extends GenericUDTF {
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}

@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("ExplodeMap takes string as a parameter");
}

ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("col2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
// 传入一句话,对他做split, 得到 1:0.1 2:0.2 一堆数组,然后对立面的子成分再进一步做split
// 但是得到的最后两列数据,给一个字段的说明 在上面col1, col2
String input = args[0].toString();
String[] test = input.split(";");
for (int i = 0; i < test.length; i++) {
try {
// 然后在使用 ":" 进行split 就会得到很多的 粒度最细的数组
// 1 0.1
// 2 0.2
// 3 0.3
String[] result = test[i].split(":");
forward(result);
} catch (Exception e) {
continue;
}
}
}
}

3.打jar包上传,并创建临时函数

1
2
3
4
5
6
7
hive> add jar /usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar;
Added [/usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [/usr/local/src/practice_code/hive/hive_udf/hive-1.0-SNAPSHOT-jar-with-dependencies.jar]

hive> create temporary function explode_func as 'Expolde';
OK
Time taken: 0.006 seconds

4.验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
hive> select explode_func(data) from badou.udtf_test_table;
OK
col1 col2
1 0.1
2 0.2
3 0.3
4 0.4
5 0.5
10 0.11
20 0.22
30 0.33
40 0.44
50 0.55
Time taken: 0.057 seconds, Fetched: 10 row(s)

参考文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF