Flink部署

Flink部署模式介绍

Flink有三种部署模式,分别为:

  1. Local模式
    • Local Cluster模式是开箱即用的,直接解压安装包,然后启动即可。
  2. Standalone模式
    • Stanalone CLuster是一种独立的集群模式,集群运行不需要依赖外部系统,完全自己独立进行管理。(如果配置HA高可用,则需要ZK进行协调管理)
  3. Yarn(或其他资源管理框架)模式
    • YARN模式是使用YARN做为Flink运行平台,JobManager、TaskManager、用户提交的应用程序都运行在YARN上。

搭建(Master上)

  • 1.下载flink-1.4.0-bin-hadoop26-scala_2.11.tgz软件包
1
2
3
4
tar -zxvf flink-1.4.0-bin-hadoop26-scala_2.11.tgz -C /usr/local/src/

(base) [root@main flink-1.4.0]# ls
bin conf examples lib LICENSE log NOTICE opt README.txt resources tools
  • 2.进入flink根目录修改conf下的flink-conf.yaml配置文件,指定jobmanager
1
2
3
4
5
6
(base) [root@main conf]# pwd
/usr/local/src/flink-1.4.0/conf
(base) [root@main conf]# ls
flink-conf.yaml log4j-console.properties log4j-yarn-session.properties logback.xml masters zoo.cfg
log4j-cli.properties log4j.properties logback-console.xml logback-yarn.xml slaves
(base) [root@main conf]# vim flink-conf.yaml

  • 3.修改conf目录下的master文件

  • 4.修改conf目录下的slaves文件,指定TaskManager。(根据集群子节点命名来)

  • 5.将flink文件分发到各个节点上
1
2
scp -r /usr/local/src/flink-1.4.0 root@slaves1:/usr/local/src/
scp -r /usr/local/src/flink-1.4.0 root@slaves2:/usr/local/src/

检验

  • 1.启动集群
1
2
3
(base) [root@main bin]# pwd
/usr/local/src/flink-1.4.0/bin
(base) [root@main bin]# start-cluster.sh

访问web页面

  • 2.使用flink的样例代码进行wordcount程序测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(base) [root@main bin]# flink run ../examples/streaming/WordCount.jar
Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/local/src/hadoop-2.6.5/etc/hadoop:/usr/local/src/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/src/hadoop-2.6.5/share/hadoop/common/*:/usr/local/src/hadoop-2.6.5/share/hadoop/hdfs:/usr/local/src/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/src/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/src/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/src/hadoop-2.6.5/share/hadoop/yarn/*:/usr/local/src/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/src/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/src/hadoop-2.6.5/contrib/capacity-scheduler/*.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/src/flink-1.4.0/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/src/hadoop-2.6.5/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at main/192.168.10.113:6123
Using address main:6123 to connect to JobManager.
JobManager web interface address http://main:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: ed5058027317929399ae94bd8dedd664. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@main:6123/user/jobmanager#884749380] with leader session id 00000000-0000-0000-0000-000000000000.
03/25/2021 18:53:41 Job execution switched to status RUNNING.
03/25/2021 18:53:41 Source: Collection Source -> Flat Map(1/1) switched to SCHEDULED
03/25/2021 18:53:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to SCHEDULED
03/25/2021 18:53:41 Source: Collection Source -> Flat Map(1/1) switched to DEPLOYING
03/25/2021 18:53:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to DEPLOYING
03/25/2021 18:53:41 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING
03/25/2021 18:53:41 Source: Collection Source -> Flat Map(1/1) switched to RUNNING
03/25/2021 18:53:41 Source: Collection Source -> Flat Map(1/1) switched to FINISHED
03/25/2021 18:53:42 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED
03/25/2021 18:53:42 Job execution switched to status FINISHED.
Program execution finished
Job with JobID ed5058027317929399ae94bd8dedd664 has finished.
Job Runtime: 84 ms

查看web页面

完成。

软件包:

链接:https://pan.baidu.com/s/1DzUsi7kCdVn7DqQgPRyjPA
提取码:w1tl
—来自百度网盘超级会员V4的分享

Stanalone CLuster是一种独立的集群模式,集群运行不需要依赖外部系统,完全自己独立进行管理。
三台机器部署情况:

节点 服务
hadoop116 master
hadoop117 worker
hadoop118 worker

 解压安装包

1
(base) [dw@hadoop116 flink]$ tar -zxvf flink-1.12.0-bin-scala_2.11.tgz

修改配置文件

1
2
3
(base) [dw@hadoop116 flink-1.12.0]$ pwd
/opt/module/flink/flink-1.12.0
(base) [dw@hadoop116 flink-1.12.0]$ vim conf/flink-conf.yaml

添加以下内容:

1
2
3
4
5
6
jobmanager.rpc.address: hadoop116
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 3
parallelism.default: 3

修改conf/master文件:

1
2
(base) [dw@hadoop116 flink-1.12.0]$ vim conf/flink-conf.yaml
(base) [dw@hadoop116 flink-1.12.0]$ vim conf/masters
1
2
hadoop116:8081
hadoop117:8081

修改conf/workers文件:

1
(base) [dw@hadoop116 flink-1.12.0]$ vim conf/slaves
1
2
3
4
hadoop116
hadoop117
hadoop118
~

配置高可用

修改flink-conf.yaml,添加以下内容:

1
2
3
4
5
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop116:8020/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop116:2181,hadoop117:2181,hadoop118:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_rt

flink1.12缺少hadoop的依赖包问题参考:

https://www.bilibili.com/read/cv11506216

https://blog.csdn.net/wmpisadba/article/details/117952310

下载hadoop的依赖包(根据hadoop版本下载)

flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar

download:https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.1.0-327-9.0

commons-cli-1.4.jar

download:https://commons.apache.org/proper/commons-cli/download_cli.cgi

将下载好后的jar包放在Flink根目录下的lib目录中。

1
2
3
4
5
6
7
8
9
10
(base) [dw@hadoop116 flink-1.12.0]$ pwd
/opt/module/flink-1.12.0
(base) [dw@hadoop116 flink-1.12.0]$ ./bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host hadoop116.
Starting standalonesession daemon on host hadoop117.
Starting taskexecutor daemon on host hadoop116.
Starting taskexecutor daemon on host hadoop117.
Starting taskexecutor daemon on host hadoop118.
(base) [dw@hadoop116 flink-1.12.0]$

查看集群进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(base) [dw@hadoop116 flink-1.12.0]$ xcall.sh jps
============= hadoop116 jps =============
17778 NodeManager
122548 Kafka
17222 NameNode
126247 StandaloneSessionClusterEntrypoint
126616 TaskManagerRunner
116285 QuorumPeerMain
17407 DataNode
127166 Jps
============= hadoop117 jps =============
14416 ResourceManager
35153 TaskManagerRunner
26882 QuorumPeerMain
14533 NodeManager
34856 StandaloneSessionClusterEntrypoint
35256 Jps
14235 DataNode
28172 Kafka
============= hadoop118 jps =============
22690 QuorumPeerMain
23122 Kafka
33044 TaskManagerRunner
10437 DataNode
10519 SecondaryNameNode
10635 NodeManager
33115 Jps