数据仓库从0到1之日志采集平台搭建

[TOC]

数仓技术选型

项目功能需求 项目技术选型
数据采集传输 Flume、Kafka、Sqoop
数据存储 HDFS、MySQL
数据计算 Hive、Tez、Spark
数据查询 Presto、Kylin
数据可视化 superset
任务调度 Azkaban
集群监控 Zabbix
元数据管理 Atlas
数据质量监控 Apache Griffin

框架版本选型

测试集群配置

服务名称 子服务 服务器hadoop116 服务器hadoop117 服务器hadoop118
HDFS NameNode 1
DataNode 1 1 1
SecondaryNameNode 1
Yarn NodeManager 1 1 1
Yarn ResourceManager 1

集群推断

服务器是物理主机还是云主机?

  • 物理机

以128GB内存,20核物理CPU,40线程,8THHD和2TSSD硬盘,戴尔单台报价4W出头。一般物理机寿命5年左右。

需要有专业的运维人员,平均一个月1W,电费的开销也不小。

  • 云主机

以阿里云为例,差不多的配置,每年5W,很多运维工作都由阿里云完成,运维相对较轻松。

  • 企业选择

金融有钱公司和阿里没有直接冲突的公司选择阿里云。中小公司,为了融资上市,选择阿里云,拉到融资后买物理主机。

有长期打算,资金充足,选择物理机。

集群规模多大?

  1. 假设每台服务器8T磁盘,128G内存
  2. 每天活跃用户100万,每人一天平均100条数据,则:100万 * 100条 = 1亿条数据
  3. 每条日志1K左右大小,每天1亿条:1亿/1024/1024 = 约100G (1K的日志数据足够大了)
  4. 如果半年不扩容服务器: 100G *180天 = 约18T
  5. 保存3副本:18T * 3 = 54 T
  6. 预留20% ~ 30% Buffer = 54T / 0.7 = 77T (预留的buffer用于运行其他应用程序)
  7. 到此,大约需要约8T服务器10台
  8. 如果考虑数仓分层,数据采用压缩(50%压缩率),则需要重新计算

日志采集平台环境准备

  • 操作系统:Centos7 2003

链接:https://pan.baidu.com/s/12Owy1sc8Jh5DVJQzOuJHWA 提取码:xp0b

  • Hadoop版本:3.1.3
  • ZooKeeper版本:3.5.7
  • Kafka版本:2.11-2.4.1
  • Flume1.9.0
  • Java版本: jdk-8u251-linux

链接:https://pan.baidu.com/s/1gqlmJi4SSBinGd5qcc-zkg 提取码:c2hw

  • VMware版本:16.x

可以官方下载,然后百度激活码

Centos7环境准备

1.创建虚拟机

可百度教程。

2.修改网卡

配置网卡时要对VMNET8网卡进行配置,注意不要和本地的IP地址冲突。

测试网络,能够ping通百度和本机即可。

3.配置hosts

这里配置的是hadoop集群中机器的网络ip地址和别名。

1
vim /etc/hosts

虚拟机配置完后到本机的C:\Windows\System32\drivers\etc目录下修改hosts文件,加入上面三行配置。

4.配置网络名+修改hostname

网络名

1
2
3
4
[root@localhost ~]# vim /etc/sysconfig/network
[root@localhost ~]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=hadoop116

hostname

1
2
3
4
5
6
7
[root@localhost ~]# hostname hadoop116
[root@localhost ~]# hostname
hadoop116

[root@localhost ~]# vim /etc/hostname
[root@localhost ~]# cat /etc/hostname
hadoop116

5.关掉网络管理

1
2
3
4
5
[root@localhost ~]# systemctl stop NetworkManager
[root@localhost ~]# systemctl disable NetworkManager
Removed symlink /etc/systemd/system/multi-user.target.wants/NetworkManager.service.
Removed symlink /etc/systemd/system/dbus-org.freedesktop.nm-dispatcher.service.
Removed symlink /etc/systemd/system/network-online.target.wants/NetworkManager-wait-online.service.

6.关掉Linux的内核防火墙

1
2
[root@localhost ~]# vim /etc/selinux/config 
[root@localhost ~]# setenforce 0

查看selinux状态

7.关闭linux防火墙,禁用其开机启动

1
2
3
4
[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.

查看防火墙状态

8.重启,再次查看selinux

至此,虚拟机环境配置完成。

9.配置普通用户具有root权限

1
[root@hadoop116 ~]# vim /etc/sudoers

10.创建文件夹用于存放软件包

1
2
[root@hadoop116 ~]# su dw 
[dw@hadoop116 ~]# mkdir /opt/module /opt/software

11.配置Java环境

1
2
# 删除存在的java环境
[dw@hadoop116 ~]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps
1
2
3
4
# 解压jdk到指定路径
[dw@hadoop116 software]# ls
jdk-8u212-linux-x64.tar.gz
[dw@hadoop116 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

配置环境变量

1
[dw@hadoop116 software]$ sudo vim /etc/profile.d/my_env.sh
1
2
3
# JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
  1. 两个文件都是设置文件的,/etc/profile是永久性的,是全局变量,/etc/profile.d/设置所有用户生效。

  2. /etc/profile.d/比/etc/profile好维护,不想要什么变量直接删除/etc/profile.d/下对应的即可,不用像/etc/profile需要改动此文件。

    1
    2
    3
    4
    5
    6
    7
    CentOS 中每个用户都要指定各自的,其中包括可执行的 path路径,这些路径决定了每个用户在执行时的命令工具。 

    一般情况下,可以再每个用户的环境变量里设定各自的 path变量值,然后再执行export PATH使其生效,但如果用户比较多,安装命令工具也原来越多,且出来本身用户可以执行这些工具,root用户或其他用户也可以执行命令,这时在每个用户环境变量里添加就比较复杂了。

    所以可以用另外一种方法:

    可以在 /etc/profile.d/ 目录下创建一个 path.sh 脚本,脚本内容如下:

激活环境

1
2
3
4
5
[dw@hadoop116 software]$ source  /etc/profile.d/my_env.sh
[dw@hadoop116 software]$ java -version
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)

克隆虚拟机配置

克隆两台虚拟机,分别命名为slaves1、slaves2。

修改hostname

分别开启两台虚拟机,分别修改他们的hostname。

修改IP

对于slaves1,slaves同理。

修改网络名称

slaves1的修改也同slaves2的修改一样。

最后重启网络:

1
systemctl restart network

分别对slaves2和主机互相ping:

配置虚拟机之间免密通信

首先ssh登陆主节点机器-hadoop116。

1
2
[dw@hadoop116 ~]$ ssh-keygen -t rsa
Generating public/private rsa key pair.

首先生成本机的公钥,之后分别切换到 hadoop117,hadoop118生成各自机器的公钥。

可以查看.ssh目录下会产生两个文件。

1
2
3
4
[dw@hadoop116 ~]$ ls
[dw@hadoop116 ~]$ cd ~/.ssh/
[dw@hadoop116 .ssh]$ ls
id_rsa id_rsa.pub

接下来是授权公钥,实现免密登陆,分别在hadoop116,hadoop117,hadoop118机器上分别对hadoop116,hadoop117,hadoop118分发公钥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dw@hadoop116 .ssh]$ ssh-copy-id hadoop116
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/dw/.ssh/id_rsa.pub"
The authenticity of host 'hadoop116 (192.168.10.116)' can't be established.
ECDSA key fingerprint is SHA256:5Et/sd5AKEm8Jso4gl6td6wp5TnXPa00mfga95dPYVI.
ECDSA key fingerprint is MD5:d8:c0:d0:dd:e9:8a:b5:5a:f7:82:10:bb:db:9b:d4:03.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
dw@hadoop116's password:

Number of key(s) added: 1

Now try logging into the machine, with: "ssh 'hadoop116'"
and check to make sure that only the key(s) you wanted were added.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dw@hadoop116 .ssh]$ ssh-copy-id hadoop117
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/dw/.ssh/id_rsa.pub"
The authenticity of host 'hadoop117 (192.168.10.117)' can't be established.
ECDSA key fingerprint is SHA256:5Et/sd5AKEm8Jso4gl6td6wp5TnXPa00mfga95dPYVI.
ECDSA key fingerprint is MD5:d8:c0:d0:dd:e9:8a:b5:5a:f7:82:10:bb:db:9b:d4:03.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
dw@hadoop117's password:

Number of key(s) added: 1

Now try logging into the machine, with: "ssh 'hadoop117'"
and check to make sure that only the key(s) you wanted were added.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dw@hadoop116 .ssh]$ ssh-copy-id hadoop118
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/dw/.ssh/id_rsa.pub"
The authenticity of host 'hadoop118 (192.168.10.118)' can't be established.
ECDSA key fingerprint is SHA256:5Et/sd5AKEm8Jso4gl6td6wp5TnXPa00mfga95dPYVI.
ECDSA key fingerprint is MD5:d8:c0:d0:dd:e9:8a:b5:5a:f7:82:10:bb:db:9b:d4:03.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
dw@hadoop118's password:

Number of key(s) added: 1

Now try logging into the machine, with: "ssh 'hadoop118'"
and check to make sure that only the key(s) you wanted were added.

完成各个机器间密钥的分发后,可以查看.ssh目录下的authorized文件,可以发现三个机器的公钥都有了:

1
2
3
AAAAB3NzaC1yc2EAAAADAQABAAABAQC87e4EXt4RWXC3jbjYpnET42s4mTUfbEEh7IkV3Q5ayZCB2KUQM9raVLbkYdtrd+oKGR7W/yEJKUyRnMyWjZGz6c1fF3zh73/pzCbonsXZ7GPIeuK1z+5F5vZxR2Fx62a3Fp1RXLbUIXz4hdPDSfXqpdQRyPTYyDiiRSKwGpg5SBsBCp38iWTaJSDc9NzUEyw3DcuaXETE79wzCccvJ1r0F5RJZ1OMu8Uyck7nKWph7xwc92cPqwaGPfAPgjITxh20jUK0ja+9Zja+TxTl1W9SZLE2Tjwpqe6lcgaiYwCI7gJ4lYDxetka3idke+cu1sxiNp/D5FxCOukTZ29DKn+b dw@hadoop116
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDpmI/Z35OaDQWzBd6MZCcjO2f2HMoKBLqTsQ7DyAASfXruqQW5xy7zHr+7bsHkTG3YPRcMYGlbLACkJnTQTar0woxIxlZzVAQ2+MvlwfbExVTnZFaLVPz6caG8GM+uOJRrGTCn3a8W5UlEP4UJgp+MZL9OHTNb8Az6bTYwKXJJAuNG1dK4nOYlIG0i9RFpmoGZiB5siRXFkTtAbY2YZclyVK4Fx1fPKdDBUNYRxBxryg7csFfOfZgw4BV5FNv9+ZCCvY5/F0sgKpOvRoKuTB10ztQLOA5OMXLDfolUq5sWZTZu+4dgNYqnlYsqHZRAhM7kslS7sOxtOwqKw15waUjj dw@hadoop117
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEtdG4Z/y2LM5T6Nk6LXD7PxmsAph9o0sguniozo7MpvZywQKjRA6QxpO+Lob3Z8WO8S03jqv5O6zoss8WRnDI8pqCVK6CMR9vToziSFDw5omOG046X06+otJEqcSIUYwv6dookBS5qJWezjR44kNGQE2Tp/pEEHMNGNAdFWGZfVx0jNdN3tUxOymhJXn6pM8D+2Exx1KjRHyF7fa7mjbkwVZAEk9KMuDxHFGcGyO+GnXMBJc7QjqUZxUP8hhar/IHTm/wwM1a3hfoey6PO+JHpXL60LDkffWc2E3mdMcKXJSOpCJuhlkFxWNoK9nLfCkoYvJPb3A5ukP5twjal5bZ dw@hadoop118

创建xsync脚本命令实现机器间数据更新

安装rsync命令

1
sudo yum install rsync -y

在root的~目录下创建bin文件夹,在文件夹内创建xsync脚本文件,并给其赋予可执行权限。

xsync脚本文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环 这里根据你的机器IP进行修改
for((host=116; host<119; host++)); do
case $host in
"116")
echo ------------------- master$host --------------
rsync -rvl $pdir/$fname $user@hadoop116:$pdir ;;
"117")
echo ------------------- slave1$host --------------
rsync -rvl $pdir/$fname $user@hadoop117:$pdir ;;
"118")
echo ------------------- slave2$host --------------
rsync -rvl $pdir/$fname $user@hadoop118:$pdir ;;
esac
done

之后,便可以在虚拟机之间实现数据的传输和更新。测试一下,把main机器的bin目录同步到其他机器上:

配置集群时间同步

编写脚本实现三台机器同时执行一个命令,同上面一样将脚本放在bin目录库下。

1
2
3
4
for i in hadoop116 hadoop117 hadoop118;do
echo "==========$i=========="
ssh $i "$*"
done

查看时间配置服务器ntp是否安装

1
sudo ~/bin/ae.sh yum install ntp ntpdate -y

查看ntp状态:

使该NTP服务器在不联网的情况下,使用本服务器的时间作为同步时间

1
vim /etc/ntp.conf

1
vim /etc/sysconfig/ntpd

以上修改都在主节点机器上进行。

crontab配置节点机器定时与主节点机器同步时间

注意,如果执行crontab命令的是普通用户,任务不会生效,需要使用root用户权限执行crontab。

sudo crontab -e

Hadoop搭建

安装hadoop软件包

1
2
3
4
5
6
7
8
[dw@hadoop116 hadoop]$ pwd
/opt/software/hadoop
[dw@hadoop116 hadoop]$ ll
total 610476
-rw-rw-r-- 1 dw dw 224565455 May 4 00:56 bigtable.lzo # 配置完lzo用于测试
-rw-rw-r-- 1 dw dw 338075860 May 4 00:56 hadoop-3.1.3.tar.gz
-rw-rw-r-- 1 dw dw 193831 May 4 00:56 hadoop-lzo-0.4.20.jar # 用以配置hadoop文件的压缩,hadoop生产环境中默认压缩方式有snappy和lzo,但是lzo原生不支持,所以需要借助开源组件
-rw-rw-r-- 1 dw dw 62282700 May 4 00:56 hadoop-lzo-master.zip # 源码,暂时没用

解压:

1
2
3
4
[dw@hadoop116 hadoop]$ 
[dw@hadoop116 hadoop]$ ls
bigtable.lzo hadoop-3.1.3.tar.gz hadoop-lzo-0.4.20.jar hadoop-lzo-master.zip
[dw@hadoop116 hadoop]$ tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

配置hadoop配置文件

这里要输入yes记得!否则不会覆盖掉原文件,导致后面集群启动出错,最好再次查看是否覆盖掉原配置文件。

修改core-site.xml配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<configuration>
<!-- 指定NameNode地址 -->
<property>
<name>fs.defaultFS</name>
<!-- dn和nn间通讯的端口号,也是文件访问的端口号 -->
<value>hdfs://hadoop116:8020</value>
</property>

<!-- 指定hadoop数据存储目录
其实是有另外两个参数配置nn和dn的数据存储路径,这两个参数都会引用hadoop.tmp.dir这个参数的配置
-->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>

<!-- 配置HDFS网页登陆使用的静态用户为dw
hadoop3之后打开hdfs的web页面可以进行各种各样的操作,不仅查看还能创建、删除文件夹
这样会涉及到一个权限问题,正常打开页面会有一个登陆的过程,hadoop借助kerberos协议(MIT提供)
来进行权限认证。
但是我们这里没有配置kerberos,所以直接指定默认登陆用户是dw
dw是启动hadoop进程的用户,是hadoop超级用户(谁启动谁就是超级用户)
-->
<property>
<name>hadoop.http.staticuser.user</name>
<value>dw</value>
</property>

<!-- 配置该dw(superUser)允许通过代理访问的主机节点
这是给hiveserver2配置的
由于客户端启动hive对应的用户是B
用户B需要通过hiveserver2去访问hdfs数据
hiveserver2是哪个用户启动的就是哪个用户的权限,假设是用户A
这样会产生权限问题,比如B访问了只有A有权限访问的hdfs上的数据或者A访问不到B有权限访问的数据

一般是以用户A代理所有节点上的所有组的所有用户进行访问.
效果是B启动hive客户端,通过hiveserver2的A代理B(拿到B权限)访问hdfs数据,最终实现B访问hdfs
所谓代理,就是让代理执行一些无关紧要的事件,真正核心的还是原用户来执行
-->
<property>
<name>hadoop.proxyuser.dw.hosts</name>
<value>*</value>
</property>

<!-- 配置该dw(superUser)允许通过代理用户所属组 -->
<property>
<name>hadoop.proxyuser.dw.groups</name>
<value>*</value>
</property>

<!-- 配置该dw(superUser)允许通过代理的用户 -->
<property>
<name>hadoop.proxyuser.dw.users</name>
<value>*</value>
</property>

</configuration>

修改hdfs-site.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<configuration>
<!-- nn web端访问地址
hadoop2的时候默认是50070
-->
<property>
<name>dfs.namenode.http-address</name>
<!-- dn和nn间通讯的端口号,也是文件访问的端口号 -->
<value>hadoop116:9870</value>
</property>
<!-- 2nn web端访问地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<!-- dn和nn间通讯的端口号,也是文件访问的端口号 -->
<value>hadoop118:9868</value>
</property>
<!-- 测试环境指定hdfs副本的数量1
生产环境一般是3
-->
<property>
<name>dfs.replication</name>
<!-- dn和nn间通讯的端口号,也是文件访问的端口号 -->
<value>1</value>
</property>
</configuration>

修改yarn-site.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<configuration>
<!-- 指定MR走shuffle -->
<property>
<!-- 指定shuffle方式,这里用的是默认的mrshuffle -->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop117</value>
</property>

<!-- 环境变量的继承
相当于容器container从nodemanager继承哪些环境变量
-->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>

<!-- yarn容器允许分配的最大最小内存 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>

<!-- yarn容器允许管理的物理内存大小
即一个nodemanager最多提供多少内存给yarn调度
-->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>

<!-- 关闭yarn对虚拟内存的限制检查
在centos6之后,系统虚拟内存有所变化,导致jdk8跑在centos6
之后的机器会造成虚拟内存较大,yarn会对虚拟内存进行检查,过大
会kill掉进程
-->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>


<!-- 日志聚集功能使能
一般mr跑在各个nodemanager上。日志信息会记录在
nodemanager节点机器上,通过配置日志聚集,能够将
这些日志文件拉到hdfs上,通过web页面去访问
-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop116:19888/jobhistory/logs</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

</configuration>

配置mapred-site.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<configuration>
<!-- 指定mapreduce程序运行在yarn上
不配置默认是local
-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<!-- 历史服务器端地址 指定在哪个节点就到哪个节点去启动-->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop116:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop116:19888</value>
</property>
</configuration>

配置workers(以前是slaves,现在改成workers):

1
2
3
4
5
[dw@hadoop116 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workers 
[dw@hadoop116 hadoop]$ cat /opt/module/hadoop-3.1.3/etc/hadoop/workers
hadoop116
hadoop117
hadoop118

配置环境变量

配置环境变量

1
2
3
4
5
6
7
8
# JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

# HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

激活环境去配置Hadoop文件:

1
2
3
4
5
[dw@hadoop116 hadoop]$ source /etc/profile.d/my_env.sh 
[dw@hadoop116 hadoop]$ cd $HADOOP_HOME/
[dw@hadoop116 hadoop]$ pwd
[dw@hadoop116 hadoop-3.1.3]$ cd ./etc/hadoop/
/opt/module/hadoop-3.1.3/etc/hadoop
1
2
3
4
5
6
7
8
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_251
export PATH=$PATH:$JAVA_HOME/bin

#HADOOP
export HADOOP_HOME=/opt/module/hadoop-2.6.1
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

之后到hadoop根目录下的etc/hadoop目录下,修改hadoop-env.sh和yarn-env.sh中的JAVA_HOME的路径为配置的java路径(/opt/module/jdk1.8.0_251)。

配置完成后关闭虚拟机。

启动Hadoop集群

分发hadoop到各个节点上

1
[dw@hadoop116 hadoop]$ xsync /opt/module/hadoop-3.1.3/

分发环境变量(完成后要source激活)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dw@hadoop116 hadoop]$ sudo /home/dw/bin/xsync /etc/profile.d/my_env.sh 
fname=my_env.sh
pdir=/etc/profile.d
------------------- hadoop116116 --------------
root@hadoop116's password:
sending incremental file list

sent 46 bytes received 12 bytes 23.20 bytes/sec
total size is 217 speedup is 3.74
------------------- hadoop117117 --------------
root@hadoop117's password:
sending incremental file list
my_env.sh

格式化集群

在主节点上执行:

1
2
3
4
[dw@hadoop116 hadoop-3.1.3]$ ./bin/hdfs namenode -format
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at hadoop116/192.168.10.116
************************************************************/

启动集群

namenode在hadoop116启动,yarn在hadoop117启动,snn在hadoop118启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
####################################hadoop116
dw@hadoop116 hadoop-3.1.3]$ start-dfs.sh
Starting namenodes on [hadoop116]
Starting datanodes
hadoop117: WARNING: /opt/module/hadoop-3.1.3/logs does not exist. Creating.
hadoop118: WARNING: /opt/module/hadoop-3.1.3/logs does not exist. Creating.
Starting secondary namenodes [hadoop118]
[dw@hadoop116 hadoop-3.1.3]$ jps
12917 NameNode
13352 NodeManager
13452 Jps
13069 DataNode
###############################hadoop117
[dw@hadoop117 ~]$ start-yarn.sh
Starting resourcemanager
Starting nodemanagers
[dw@hadoop117 ~]$ jps
12513 DataNode
12805 NodeManager
12953 Jps
#################################hadoop118
[dw@hadoop118 module]$ jps
12385 NodeManager
12524 Jps
12190 DataNode
12302 SecondaryNameNode

编写脚本查看各集群进程

1
2
3
4
5
6
7
8
[dw@hadoop116 hadoop-3.1.3]$ cd ~/bin/
[dw@hadoop116 bin]$ vim xcall.sh
[dw@hadoop116 bin]$ chmod +x xcall.sh
[dw@hadoop116 bin]$ ls -ll
total 12
-rwxrwxr-x 1 dw dw 162 May 4 00:51 log.sh
-rwxrwxr-x 1 dw dw 143 May 4 03:11 xcall.sh
-rwxr-xr-x 1 dw dw 910 May 4 02:34 xsync
1
2
3
4
5
6
params=$@
i=1
for (( i=116 ; i <= 119 ; i = $i + 1 )) ; do
echo ============= node$i $params =============
ssh hadoop$i "$params"
done
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
============= hadoop116 jps =============
16897 NodeManager
17266 Jps
16613 DataNode
16461 NameNode
============= hadoop117 jps =============
14513 DataNode
14994 NodeManager
15429 Jps
14685 ResourceManager
============= hadoop118 jps =============
13858 NodeManager
14153 Jps
13663 DataNode
13775 SecondaryNameNode

运行一个mr任务检查集群是否正常

1
2
3
4
5
6
[dw@hadoop116 hadoop-3.1.3]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi 1 1
Number of Maps = 1
Samples per Map = 1
Job Finished in 21.831 seconds
2021-05-04 03:08:30,989 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Estimated value of Pi is 4.00000000000000000000

以上便是Hadoop集群搭建的所有步骤。

模拟日志产生

这里以hadoop116和hadoop117作为日志服务器。

将资料中的Mok09文件夹下的application.properties、gmall2020-mock-log-2020-05-10.jar、path.json、logback.xml上传到hadoop116的/opt/module/applog目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[dw@hadoop116 ~]$ mkdir /opt/module/applog
[dw@hadoop116 applog]$ ls
application.properties gmall2020-mock-log-2020-05-10.jar logback.xml path.json
[dw@hadoop116 applog]$ pwd
/opt/module/applog

`gmall2020-mock-log-2020-05-10.jar`是模拟产生业务日志数据的代码程序逻辑。

`logback`和log4j类似,都是日志框架的实现,logback配置文件是xml,log4j配置文件是peoperties。

logback其中部分配置信息:

![](G:\Desktop\数仓项目笔记\从0到1数据仓库之Hadoop集群搭建.assets\image-20210503094236872.png)

运行jar包程序:

​```bash
[dw@hadoop116 applog]$ java -jar gmall2020-mock-log-2020-05-10.jar
# 生成对应的日志文件
[dw@hadoop116 module]$ ls applog/
application.properties gmall2020-mock-log-2020-05-10.jar log logback.xml path.json

将applog目录下数据发送到日志服务器hadoop117上:

1
2
3
4
5
6
[dw@hadoop116 module]$ scp -rp applog/ hadoop117:/opt/module/
application.properties 100% 618 992.4KB/s 00:00
gmall2020-mock-log-2020-05-10.jar 100% 11MB 63.6MB/s 00:00
logback.xml 100% 3178 3.3MB/s 00:00
path.json 100% 489 620.7KB/s 00:00
app.2021-05-03.log 100% 409KB 69.3MB/s 00:00

创建一个日志文件产生shell脚本,放到前面配置xsync的bin目录下,赋予其执行权限:

1
2
3
4
5
6
7
[dw@hadoop116 applog]$ cd ~/bin/
[dw@hadoop116 bin]$ vim log.sh
[dw@hadoop116 bin]$ chmod +x log.sh
[dw@hadoop116 bin]$ ls -ll
total 8
-rwxrwxr-x 1 dw dw 144 May 4 00:35 log.sh
-rwxr-xr-x 1 dw dw 910 May 3 23:48 xsync
1
2
3
4
for i in hadoop116 hadoop117;do
echo "==========$i=========="
ssh $i "cd /opt/module/applog;java -jar gmall2020-mock-log-2020-05-10.jar"
done

shell的输出分为标准错误和标准输出两种输出方式。一般”1”指代的是标准输出,”2”指代的是标准错误输出。对对标准输出进行重定向:(输入到/dev/null的数据都不会被保存,称之为黑洞)

java -jar gmall2020-mock-log-2020-05-10.jar 1>/dev/null 2>/dev/null

可以简化为:(“&”类似C指针的概念,表示2去到1去的地方)

java -jar gmall2020-mock-log-2020-05-10.jar 1>/dev/null 2>&1

还可以进行简化,一般不显示定义1还是2,默认就是1:

java -jar gmall2020-mock-log-2020-05-10.jar >/dev/null 2>&1

修改后执行命令,可以看到,终端没有任何日志文件的输出,但是会等待程序股执行:

1
2
3
[dw@hadoop116 applog]$ log.sh 
==========hadoop116==========
==========hadoop117==========

此外,要让程序到后台执行,不堵塞在当前终端可以这样:

java -jar gmall2020-mock-log-2020-05-10.jar >/dev/null 2>&1 &

修改后执行,虽然也会输出脚本里echo的信息,但是终端迅速跳过了等待该任务,让它在后台执行,我们可以继续在终端执行下一个命令。

1
2
3
4
[dw@hadoop116 applog]$ log.sh 
==========hadoop116==========
==========hadoop117==========
[dw@hadoop116 applog]$

查看log文件:

1
2
3
4
[dw@hadoop116 log]$ ls
app.2021-05-03.log app.2021-05-04.log
[dw@hadoop116 log]$ du -sh ./
1.6M ./

ZooKeeper安装

1.解压安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dw@hadoop116 zookeeper]$ pwd
/opt/software/zookeeper
[dw@hadoop116 zookeeper]$ ls
apache-zookeeper-3.5.7-bin.tar.gz
[dw@hadoop116 zookeeper]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

[dw@hadoop116 module]$ pwd
/opt/module
[dw@hadoop116 module]$ ls
apache-zookeeper-3.5.7-bin applog hadoop-3.1.3 jdk1.8.0_212
[dw@hadoop116 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7/
[dw@hadoop116 module]$ ls
applog hadoop-3.1.3 jdk1.8.0_212 zookeeper-3.5.7-bin

### 2.修改配置文件

配置myid文件

1
2
3
4
5
6
[dw@hadoop116 module]$ cd zookeeper-3.5.7/
[dw@hadoop116 zookeeper-3.5.7]$ mkdir zkData
[dw@hadoop116 zookeeper-3.5.7]$ cd zkData/
[dw@hadoop116 zkData]$ vim myid # myid文件只能写数字,范围是1~255
[dw@hadoop116 zkData]$ cat myid
2

修改zoo.cfg配置文件

1
2
3
4
5
6
7
8
9
[dw@hadoop116 zkData]$ cd ../conf/
[dw@hadoop116 conf]$ ll
total 12
-rw-r--r-- 1 dw dw 535 May 4 2018 configuration.xsl
-rw-r--r-- 1 dw dw 2712 Feb 7 2020 log4j.properties
-rw-r--r-- 1 dw dw 922 Feb 7 2020 zoo_sample.cfg
[dw@hadoop116 conf]$ pwd
/opt/module/zookeeper-3.5.7/bin/conf
[dw@hadoop116 conf]$ mv zoo_sample.cfg zoo.cfg

给文件修改添加以下内容

1
2
3
4
5
6
7
8
9
dataDir=/opt/module/zookeeper-3.5.7/zkData 

#######################cluster##########################在文件末尾添加
# server.id 指的是myid里配置的数字
# 2888端口号用于follower和leader进行交流的端口
# 3888是选举leader用的端口
server.2=hadoop116:2888:3888
server.3=hadoop117:2888:3888
server.4=hadoop118:2888:3888

分发zookeeper到各个节点:

1
[dw@hadoop116 module]$ xsync zookeeper-3.5.7/

分别到其他两个节点修改zk的myid,注意myid不能重复,从hadoop116~hadoop118依次设置为2,3,4。

3.创建ZK启停脚本

在~/bin/目录下创建zkS.sh启停脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash

case $1 in
"start"){
for i in hadoop116 hadoop117 hadoop118
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop116 hadoop117 hadoop118
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop116 hadoop117 hadoop118
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac

Kafka搭建

1.解压安装

1
2
3
4
5
6
7
8
9
10
11
12
13
[dw@hadoop116 kafka]$ pwd
/opt/software/kafka
[dw@hadoop116 kafka]$ ls
kafka_2.11-2.4.1.tgz
[dw@hadoop116 kafka]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
# 2.11表示kafka用scala2.11版本写的
# 2.4.1表示给kafka自己的版本
[dw@hadoop116 kafka]$ cd /opt/module/
[dw@hadoop116 module]$ pwd
/opt/module
[dw@hadoop116 module]$ ls
applog hadoop-3.1.3 jdk1.8.0_212 kafka_2.11-2.4.1 zookeeper-3.5.7 zookeeper-3.5.7-bin
[dw@hadoop116 module]$ mv kafka_2.11-2.4.1/ kafka/

2.修改配置文件

1
2
3
4
5
[dw@hadoop116 module]$ cd kafka/
[dw@hadoop116 kafka]$ pwd
/opt/module/kafka
[dw@hadoop116 kafka]$ mkdir logs
[dw@hadoop116 kafka]$ vim config/server.properties

server.properties文件添加以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
修改或者增加以下内容:
#broker的全局唯一编号,不能重复,kafka集群中各个节点的broker id不重复
# hadoop116:0/hadoop117:1/hadoop118:2
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
# 这里的log指的是生产者发过来的数据,因为底层也是日志文件的形式存在
log.dirs=/opt/module/kafka/data
#配置连接Zookeeper集群地址
# 指的是zk下面的一个根节点,如果不加相当于直接在zk根节点下创建路径,kafka启动会在zk上部署很多节点,这样一来看起来特别乱
zookeeper.connect=hadoop116:2181,hadoop117:2181,hadoop118:2181/kafka

3.kafka启停脚本

先看看kafka单点启动命令:

1
xxxxxxxxxx [dw@hadoop116 kafka]$ ./bin/kafka-server-start.sh USAGE: ./bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*# [-daemon] 配置可有可无,如果加上,则kafka会变成守护进程在后台执行,如果不加,kafka会堵塞当前总端,一直到kafka关闭

kafka.sh启停脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#! /bin/bash

case $1 in
"start"){
for i in hadoop116 hadoop117 hadoop118
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop116 hadoop117 hadoop118
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac
1
2
3
4
5
6
[dw@hadoop116 logs]$ cd ~/bin/
[dw@hadoop116 bin]$ vim kafka.sh
[dw@hadoop116 bin]$ chmod +x kafka.sh
[dw@hadoop116 bin]$ ls
ae.sh kafka.sh log.sh xcall.sh xsync zkServer.sh
[dw@hadoop116 bin]$ xsync kafka.sh

这里要注意,该版本的kafka停止脚本kafka-server-stop.sh有点问题,无法通过此脚本终止kafka进程,需要对脚本进行修改:

4.分发kafka启停测试

1
[dw@hadoop116 module]$ xsync kafka/

此处注意,要先等待kafka 进程终止了之后再关闭zk,否则先关闭zk,zk上的kafka节点在下次zk启动时仍然存在,不管kafka进程有无启动。

kafka简介及常用命令

简介

Kafka是一个消息队列:

  • 点对点:一个生产者对应一个消费者
  • 发布订阅模式:一个生产者发布消息,所有订阅了主题的消费者都能够受到消息

发布订阅式的消息队列都有“主题”这个概念。

只要是分布式的就一定有分区。kafka分区分的是topic。topic里有多个分区,写和消费都是分布式并行写和消费。

kafka的容错通过副本机制实现,一个topic里面的partition有多个副本,分布在集群各节点上。

在kafka种,消费者不再是单个消费者,而是一个组,一个组是一个整体。在kafka中,一个生产者对应多个消费者组。

常用脚本命令

1.查看Kafka Topic列表
1
2
[dw@hadoop116 kafka]$ ./bin/kafka-topics.sh --zookeeper hadoop116:2181/kafka --list
[dw@hadoop116 kafka]$

kafka有一个内置的topic,叫做consumeroffset,但这里查找topic却没找到。这是因为,consumeroffset是记录消费者消费时的offset的,所以只有消费者消费了才会产生这个topic,因为我们刚安装kafka还未消费,所以没有。在kafka0.9版本前是没有这个topic,offset是存在zk上。但是那样子,对zk的依赖过于严重,所以现在kafka在逐步摆脱对zk的依赖。

2.创建Kafka Topic

进入到/opt/module/kafka/目录下创建日志主题

1
2
[dw@hadoop116 kafka]$ ./bin/kafka-topics.sh --zookeeper hadoop116:2181/kafka --create --topic first --partitions 3 --replication-factor 2
Created topic first.
1
2
3
4
5
6
7
8
9
10
[dw@hadoop116 kafka]$ ./bin/kafka-topics.sh --zookeeper hadoop116:2181/kafka --describe --topic first
Topic: first PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: first Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: first Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
[dw@hadoop116 kafka]$
# partition : 分区编号
# leader : kafka的broker.id
# replicas : 副本个数和副本所在的节点的broker.id
# Isr : 用于保证高可用的,一旦当前leader节点挂掉就有isr后面一个broker.id的节点充当leader
3.删除Kafka Topic
1
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_log

4.Kafka生产消息

1
2
3
4
5
6
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic topic_log
\>hello world
\>atguigu atguigu
# broker-list:指定kafka集群地址,这里可以写多个也可以写一个,与最后数据写给哪个节点没有必然联系
# topic : 指定要写给的topic

kafka的每一个broker节点都会维护一份topic元数据,节点间元数据是同步的,就是describe topic 的元数据信息。kafka生产者写数据要写到leader上,首先要知道partition的leader在哪个节点上,所以需要指定broker-list传参,去获取元数据信息去leader写数据。一般写broker-list要写两个,避免一个节点挂掉了。

1
2
3
4
5
6
7
[dw@hadoop116 kafka]$ kafka-console-producer.sh --broker-list hadoop116:9092 --topic first # 生产数据
>atguigu
>aaaaaa
>bbbbbb
>cccccc
>dddddd
>111111
5.Kafka消费消息
1
2
3
4
5
6
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop116:9092 --from-beginning --topic topic_log
--from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

# bootstrap-server这里跟写数据的broker-list作用一样
# java写生产者和消费者底层api,这个参数的名字时broker-list,但是shell脚本写的时候却是broker-list和bootstrap-server,因为shell底层调用的也是java api。

运行命令,发现消费者堵塞不动,但是前面生产者已经写了数据了。这是因为没有配置—from-beginning参数,这个参数在java api叫做offset autoresize,当kafka一个消费者去消费,这个消费者挂掉后重启,有两种情况:

  1. 从开始去消费
  2. 从最新的开始去消费(即上次消费到的记录下来的offset)

kafka默认策略是从最新的消息去消费,所以这种情况下,想要消费先前的数据,需要加上参数 — from-begining。

1
2
[dw@hadoop117 module]$ kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic first

此时我们生产者再次生:

1
2
3
4
5
6
7
8
[dw@hadoop116 kafka]$ kafka-console-producer.sh --broker-list hadoop116:9092 --topic first
>atguigu
>aaaaaa
>bbbbbb
>cccccc
>dddddd
>111111
>333333 <--new
1
2
[dw@hadoop117 module]$ kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic first
333333 <--消费者消费
6.查看Kafka Topic详情
1
2
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \
--describe --topic topic_log
7.消费者组信息查看脚本

kafka-consumer-group.sh能看到有哪几个消费者组信息,正在消费的topic和分区信息以及offset。

1
2
3
4
5
6
[dw@hadoop118 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop116:9092 --list
console-consumer-75164
console-consumer-31713
console-consumer-49957
console-consumer-42719
# 现在有四个消费者组在消费信息,后面的id是随机的

查看消费组的信息:

1
2
3
4
5
6
7
8
9
10
11
12
[dw@hadoop118 ~]$ kafka-consumer-groups.sh --bootstrap-server hadoop116:9092 --describe --group console-consumer-75164

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-75164 first 0 - 5 - consumer-console-consumer-75164-1-fd2b2e87-393c-430f-98ce-5a2ba2f09df1 /192.168.10.117 consumer-console-consumer-75164-1
console-consumer-75164 first 1 - 2 - consumer-console-consumer-75164-1-fd2b2e87-393c-430f-98ce-5a2ba2f09df1 /192.168.10.117 consumer-console-consumer-75164-1
console-consumer-75164 first 2 - 4 - consumer-console-consumer-75164-1-fd2b2e87-393c-430f-98ce-5a2ba2f09df1 /192.168.10.117 consumer-console-consumer-75164-1

# GROUP:组的信息--75164
# TOPIC:消费的topic--first
# 消费的分区
# CURRENT-OFFSET:目前消费到的offset
#
8.消费者、生产者性能测试脚本

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

Flume搭建

1.Flume解压及安装

  • 将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
  • 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
1
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  • 修改apache-flume-1.9.0-bin的名称为flume
1
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

2.Flume配置

将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

1
[atguigu@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar

注意:删除guava-11.0.2.jar(google的一个java基础类库)的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。

1
2
3
4
5
6
Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
​ at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
​ at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
​ at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
​ at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
​ ... 1 more

将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

1
2
3
[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212

3.Flume测试

将Flume进行分发。创建一个Flume的测试配置脚本进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Name the components on this agent 
#a1:表示agent的名称
#r1:表示a1的输入源
#k1:表示a1的 输出目的地
#c1:表示a1的缓冲区
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat #表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind = hadoop116 #表示a1的监听的主机
a1.sources.r1.port = 44444 #表示监听a1的监听的端口号

# Describe the sink
a1.sinks.k1.type = logger #表示a1的输出目的地是控制台logger类型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #表示a1的channel类型是memory内存型
a1.channels.c1.capacity = 1000 #表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100 #表示a1的channel传输时收集到了100条event以后再去提交事务

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #表示将r1和c1连接起来
a1.sinks.k1.channel = c1 #表示将k1和c1连接起来

安装工具:

1
yum install -y telnet

开始测试:

1.开启flume监听端口

1
[dw@hadoop116 flume]$ ./bin/flume-ng agent -n a1  -c conf/ -f conf/flume-netcat.conf -Dflume.root.logger=info,console

2.另开一个终端输入信息进行测试

日志采集平台搭建方案

Flume选型

Source

Taildir Source相比Exec Source、Spooling Directory Source的优势:

  • TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以执行一个shell脚本,实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

  • 比如使用tail -f命令监控文件,如果shell命令出错,这段时间的数据仍会追加到文件里,但是tail -f只能读取末尾新加的几行,于是就丢失数据了。

Spooling Directory Source监控目录,支持断点续传。

  • 文件一旦放入监控的目录,就不能再添加数据或者对文件进行改名等操作,所以一定要等文件写完后才能完监控目录里放,所以延迟性较高。
  • 如果要用它实现实时监控,可以设置文件每10s一个,实现一个近乎实时的效果,但是由于有了taildir,所以不再需要了。

batchSize大小如何设置?

  • 批处理,往channel里放数据,一次放多少批合适?Event 1K左右时,500-1000合适(默认为100)

Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

日志数据采集平台的三种搭建方案

日志数据采集方案

首先,方案一和方案三,方案二显得更加简介,但是方案二并不好,因为日志埋点数据经过Flume输入到Kafka,接着又从Kafka经过Flume输入到HDFS,两进两出,对Flume压力较大,所以不好。

这里我们采取方案一来进行日志数据采集平台的搭建。

日志数据采集平台搭建

Flume->Kafka配置

首先,我们要配置Flume,Flume在整个日志数据采集平台中起着承上启下的重要作用。

首先是编写Flume到Kafka Channel的配置文件taildir_kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 先给每个组件命名
# a代表agent,是flume的一个application
# 两个source: a1.sources= r1 r2
a1.sources= r1
a1.channels= c1

# 配置source
a1.sources.r1.type= TAILDIR
# 一个文件组就匹配一个正则表达式
a1.sources.r1.filegroups= f1
# regular: "."是任意字符,"*"是前面一个字符任意个,"+"至少是一个,不能为0
a1.sources.r1.filegroups.f1= /opt/module/applog/log/app.*
# positionFile实现断点续传,flume将文件采集点记录到该文件中以实现断点续传,因为是个文件,所以flume挂掉后还能继续读
a1.sources.r1.positionFile= /opt/module/flume/taildir_position.json

## 拦截器
### 拦截器个数
a1.sources.r1.interceptors = i1
### 拦截器所需要的类包
a1.sources.r1.interceptors.i1.type = com.everweekup.gmall.flume.interceptor.LogInterceptor$Builder


# 配置channel
a1.channels.c1.type= org.apache.flume.channel.kafka.KafkaChannel
## 参数显示以","分隔
a1.channels.c1.kafka.bootstrap.servers = hadoop116:9092, hadoop117:9092
a1.channels.c1.kafka.topic= topic_log
## 该参数默认是true,就是是否按照flume event的header和body进行解析有两个角度
### kafka channel有时候通过flume的source往kafka写数据| flume往kafka写数据时是否保留event结构(header+body(真正数据)),true表示有header,flase则只有body
### 有时候是通过flume sink往hdfs读数据 | kafka将record读入hdfs时需不需要保留flume的header
a1.channels.c1.parseAsFlumeEvent= false


# 绑定sink与channel和source与channel的关系
a1.sources.r1.channels= c1
## 一个source可以对应多个channel,一个channel可以对应多个sink,但是多个sink不能对应一个channel,这里前面我们不用sink
# a1.sink.k1.channel= c1

flume官方文档找配置参数

Flume直接读log日志的数据,log日志的格式是app.yyyy-mm-dd.log。

拦截器用于文件过滤,日志的分类等。这里用来做日志的清洗,校验日志的格式。(比如说校验符合json格式的日志文件)

kafka channel有三种情况,channel是一边put一遍take。对于kafka来讲就是生产者和消费者。第一种既有生产者也有消费者,第二种只有生产者,第三种只有消费者。我们这部分是只有生产者。

以下是官方文档里kafka channel的参数:

Interceptor校验json格式数据脚本编写

首先创建一个Maven工程,配置依赖和插件文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

首先,写一个JSON验证工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.everweekup.gmall.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

// 工具类
public class JSONUtil {

public static boolean validateJSON(String log){
try{
JSON.parse(log);
return true;
}catch(JSONException e){
return false;
}
}
}

编写代码,实现Interceptor接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.everweekup.gmall.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.mortbay.util.ajax.JSON;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
// 实现flume的interceptor接口
public class LogInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
// 校验json格式
// 得到json
byte[] body = event.getBody(); // 字节数组
String log = new String(body, StandardCharsets.UTF_8);//将字节数组转换为字符串,指定编码格式utf8

// 进行校验
if(JSONUtil.validateJSON(log)){
return event;// 如果是标准json则不会出现报错,否则抛出异常
}else{
return null;
}
}

@Override
public List<Event> intercept(List<Event> list) {

// 如果event不符合json格式从list列表剔除
Iterator<Event> iterator = list.iterator();


/* 下面可以简化成lambda表达式
while(iterator.hasNext()){
Event next = iterator.next();
if (intercept(next)==null){
iterator.remove(); // 过滤掉null
}
}
*/
list.removeIf(next -> intercept(next) == null);

return list;
}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
// 返回一个拦截器对象

return new LogInterceptor();
}

@Override
public void configure(Context context) {
// 获取配置文件的参数 context.get

}
}

@Override
public void close() {

}
}

最后进行打包:

接着把打好的包放在flume的lib目录下。

再配置interceptor.type的全类名com.everweekup.gmall.flume.interceptor.LogInterceptor$Builder,注意,不是LogInterceptor

Flume->Kafka测试

在flume根目录创建一个jobs文件夹,用于存放配置文件。

1
2
3
4
5
[dw@hadoop116 flume]$ ls
bin CHANGELOG conf DEVNOTES doap_Flume.rdf docs lib LICENSE NOTICE README.md RELEASE-NOTES tools
[dw@hadoop116 flume]$ mkdir jobs
[dw@hadoop116 flume]$ cd jobs/
[dw@hadoop116 jobs]$ vim taildir_kafka.conf

将之前写好的interceptor包上传到lib目录。

启动消费者,用于查看flume是否往topic_log里写了数据。

这里由于原先kafka里并没有topic_log这个topic,kafka会帮我们自动创建该topic,自动创建的topic是单分区,单副本。

1
2
[dw@hadoop117 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic topic_log
[2021-05-05 19:24:17,615] WARN [Consumer clientId=consumer-console-consumer-95017-1, groupId=console-consumer-95017] Error while fetching metadata with correlation id 2 : {topic_log=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

执行flume命令采集日志数据:

1
[dw@hadoop116 flume]$ ./bin/flume-ng agent -n a1 -c conf/ -f jobs/taildir_kafka.conf -Dflume.root.logger=info,console

flume启动后会自动检测日志文件的log目录进行采集,consumer结果如图:

测试完成。

Flume启停脚本

在用户目录的bin目录下创建f1.sh脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case $1 in
start )
for i in hadoop116 hadoop117; do
echo "==========$i=========="
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/jobs/taildir_kafka.conf -Dflume.root.logger=info,console > /dev/null 2>&1 &"
done
;;

stop )
for i in hadoop116 hadoop117; do
echo "==========$i=========="
ssh $i "ps -ef | awk '/taildir_kafka.conf/ && !/awk/ {print /$2}' | xargs kill -9"
done
;;
esac

测试脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
###################################################启动
[dw@hadoop116 lib]$ f1.sh start
==========hadoop116==========
==========hadoop117==========
[dw@hadoop116 lib]$ xcall.sh jps
============= hadoop116 jps =============
21300 NameNode
21972 NodeManager
40455 Jps
37706 Kafka
21515 DataNode
27807 QuorumPeerMain
40335 Application
============= hadoop117 jps =============
17971 DataNode
30595 Application
18869 NodeManager
30711 Jps
18557 ResourceManager
21517 QuorumPeerMain
29789 Kafka
###################################################停止
[dw@hadoop116 lib]$ f1.sh stop
==========hadoop116==========
==========hadoop117==========
[dw@hadoop116 lib]$ xcall.sh jps
============= hadoop116 jps =============
21300 NameNode
21972 NodeManager
37706 Kafka
21515 DataNode
40509 Jps
27807 QuorumPeerMain
============= hadoop117 jps =============
17971 DataNode
30756 Jps
18869 NodeManager
18557 ResourceManager
21517 QuorumPeerMain
29789 Kafka
============= hadoop118 jps =============
28561 Kafka
29874 Jps
19171 QuorumPeerMain
16821 NodeManager
16454 DataNode
16680 SecondaryNameNode

为了能够自由针对某个配置进行kill flume,对脚本进行改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case $1 in
start )
for i in hadoop116 hadoop117; do
echo "==========$i=========="
ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/jobs/taildir_kafka.conf -Dflume.root.logger=info,console > /dev/null 2>&1 &"
done
;;

stop )
for i in hadoop116 hadoop117; do
echo "==========$i=========="
# ""里存在需要转义的字符,使用转义字符\,避免""将$2解析成传入的参数
ssh $i "ps -ef | awk '/$2/ && !/awk/ {print \$2}' | xargs kill -9 > /dev/null 2>&1 &"
done
;;
esac

Kafka Channel -> Flume -> HDFS配置

配置flume配置文件kafka_file_hdfs.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
### kafka source底层本质是kafka 消费者
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
### 批大小
a1.sources.r1.batchSize = 5000
### 一批数据达到指定大小,无论批次达到没,都会写入channel | 批次和时间配合使用
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop116:9092,hadoop117:9092,hadoop118:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder

## channel1
### channel使用file,写磁盘
a1.channels.c1.type = file
### event索引内存队列快照
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
### 存储event文件
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
### 指单个文件的最大值(即一个文件大小是有限的,达到了重新创建一个文件继续写)
a1.channels.c1.maxFileSize = 2146435071
### 内存队列最大可以存储的event索引量
a1.channels.c1.capacity = 1000000
### 以s为单位,等待put事务操作的时间
a1.channels.c1.keep-alive = 6


## sink1
a1.sinks.k1.type = hdfs
### 正常来说路径:hdfs://hadoop116:8020/origin_data/gmall/log/topic_log/%Y-%m-%d
### 因为flume启动时会加载HADOOP_HOME的配置文件,所以这里不需要写前缀。如果flume只是单独的一个节点,没有配置hadoop,这时就需要补全了。
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
### 如果不是单个小时去计算设置路径,比如说5个小时计算一次,就需要把它设置为true
a1.sinks.k1.hdfs.round = false
## 上面参数设置为true,配置下面的实现5h计算一次
# a1.sinks.k1.hdfs.roundValue= 5
# a1.sinks.k1.hdfs.roundUnit= hour

### 防止生成大量小文件:生成新文件策略
### interval 按照时间滚动文件,10s生成一个新文件 | 根据数据生成速度设置
### 这里时间要设置大一点,免得文件不够128m。倒是map切分也好切分
a1.sinks.k1.hdfs.rollInterval = 10
### rollsize:按照文件大小 | 128mb
a1.sinks.k1.hdfs.rollSize = 134217728
### rollcount:按照event数量生成文件 | 设置为0关闭
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
### 数据压缩
a1.sinks.k1.hdfs.fileType = CompressedStream
### lzo压缩
### 这里用Lzo是为了将文件切片,但是前面我们已经设置了文件的切分方式128m,所以这里也可以不配置lzo,使用snappy会快一点,具体看情景
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

hdfs sink到hdfs上,最好一天一个路径。我们可以根据flume的event当中的一个header的key时间戳来决定放到什么路径。但是,由于我们的数据是没有flume event的header结构的(前面将参数配置了不按照flume event结构来解析),所以我们可以配置一些参数来解决:

本地时间是指flume节点机器的时间节点,但这显然不合理,我们应该以日志文件数据本身的timestamp,所以可以通过拦截器来设置,所以后面消费kafka的flume也需要写一套拦截器。

TimeStampInterceptor拦截器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.everweekup.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;

public class TimeStampInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
// 找到时间戳,然后放到event的header里
byte[] body= event.getBody();
String log = new String(body, StandardCharsets.UTF_8);

// 借助json解析工具获取
JSONObject jsonObj = JSONObject.parseObject(log);
// 校验是否有ts key
if(jsonObj.containsKey("ts")){
String ts = jsonObj.getString("ts"); // 如果get的是一个内层json,则还需要将其外层json转为jsonObject再get内层
// event是一个map类型
event.getHeaders().put("timestamp", ts);
}

return event;
}

@Override
public List<Event> intercept(List<Event> list) {
// 这里我们不需要提取元素出来,只是遍历所以可以用增强型for
for (Event event : list) {
intercept(event);
}
return list;
}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new TimeStampInterceptor();
}

@Override
public void configure(Context context) {

}
}

@Override
public void close() {

}
}

测试Flume -> Kafka -> Flume -> HDFS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# 启动zookeeper
[dw@hadoop116 /]$ zkS.sh start
---------- zookeeper hadoop116 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper hadoop117 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper hadoop118 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[dw@hadoop116 /]$ zkS.sh status
---------- zookeeper hadoop116 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
---------- zookeeper hadoop117 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
---------- zookeeper hadoop118 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
++++++++++++++++++++++++++++++++++++++++
# 启动kafka
[dw@hadoop116 ~]$ kafka.sh start
--------启动 hadoop116 Kafka-------
--------启动 hadoop117 Kafka-------
--------启动 hadoop118 Kafka-------
[dw@hadoop116 ~]$ xcall.sh jps
============= hadoop116 jps =============
46193 Jps
44546 DataNode
44389 NameNode
44841 NodeManager
46158 Kafka
============= hadoop117 jps =============
36128 Kafka
33881 ResourceManager
33707 DataNode
34139 NodeManager
36159 Jps
============= hadoop118 jps =============
34581 Jps
34550 Kafka
33324 NodeManager
33117 DataNode
33231 SecondaryNameNode
++++++++++++++++++++++++++++++++++++++++
# 启动flume
[dw@hadoop116 ~]$ f1.sh start taildir_kafka.conf
[dw@hadoop118 flume]$ ./bin/flume-ng agent -n a1 -c conf/ -f jobs/kafka_file_hdfs.conf -Dflume.root.logger=info,console
[dw@hadoop116 ~]$ xcall.sh jps
============= hadoop116 jps =============
45601 Jps
44546 DataNode
44389 NameNode
44841 NodeManager
45439 Application
============= hadoop117 jps =============
34966 Jps
34808 Application
33881 ResourceManager
33707 DataNode
34139 NodeManager
============= hadoop118 jps =============
34040 Jps
33324 NodeManager
33117 DataNode
33918 Application
33231 SecondaryNameNode
++++++++++++++++++++++++++++++++++++++++
# 启动log.sh生成日志
[dw@hadoop116 ~]$ log.sh
==========hadoop116==========
==========hadoop117==========

查看HDFS是否有数据:

测试完成。

日志采集流程启停脚本

1.Hadoop集群启停脚本——hdp.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
case $1 in
start )
echo "==========$i=========="
ssh hadoop116 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
ssh hadoop117 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
;;

stop )
echo "==========$i=========="
ssh hadoop116 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
ssh hadoop117 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
;;
esac

2.整体启停脚本——cluster.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/bin/bash

case $1 in
"start"){
echo ================== 启动 集群 ==================

#启动 Zookeeper集群
/home/dw/bin/zkS.sh start

#启动 Hadoop集群
/home/dw/bin/hdp.sh start

#启动 Kafka采集集群
/home/dw/bin/kafka.sh start

#启动 Flume采集集群
/home/dw/bin/f1.sh start taildir_kafka.conf

#启动 Flume消费集群
/home/dw/bin/f2.sh start

};;
"stop"){
echo ================== 停止 集群 ==================

#停止 Flume消费集群
/home/dw/bin/f2.sh stop

#停止 Flume采集集群
/home/dw/bin/f1.sh stop taildir_kafka.conf

#停止 Kafka采集集群
/home/dw/bin/kafka.sh stop

#停止 Hadoop集群
/home/dw/bin/hdp.sh stop

#停止 Zookeeper集群
/home/dw/bin/zkS.sh stop

};;
esac