目录

1、相关环境配置

2、会话模式部署

1. 启动集群

2. 提交作业

3、单作业模式部署

4、应用模式部署

5、高可用

         独立(

Standalone

)模式由

Flink

自身提供资源,无需其他框架,这种方式降低了和其他

第三方资源框架的耦合性,独立性非常强。但我们知道,

Flink

是大数据计算框架,不是资源

调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架

集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是

YARN

了。所

以接下来我们就将学习,在强大的

YARN

平台上

Flink

是如何集成部署的。

        整体来说,

YARN

上部署的过程是:客户端把

Flink

应用提交给

Yarn

ResourceManager,

Yarn

ResourceManager

会向

Yarn

NodeManager

申请容器。在这些容器上,

Flink

会部署

JobManager

TaskManager

的实例,从而启动集群。

Flink

会根据运行在

JobManger

上的作业

所需要的

Slot

数量动态分配

TaskManager

资源。

1、相关环境配置

         在

Flink1.8.0

之前的版本,想要以

YARN

模式部署

Flink

任务时,需要

Flink

是有

Hadoop

支持的。从

Flink 1.8

版本开始,不再提供基于

Hadoop

编译的安装包,若需要

Hadoop

的环境

支持,需要自行在官网下载

Hadoop

相关版本的组件

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

并将该组件上传至

Flink

lib

目录下。在

Flink 1.11.0

版本之后,增加了很多重要新特性,其

中就包括增加了对

Hadoop3.0.0

以及更高版本

Hadoop

的支持,不再提供“

flink-shaded-hadoop-*

jar

包,而是通过配置环境变量完成与

YARN

集群的对接。

         在将

Flink

任务部署至

YARN

集群之前,需要确认集群是否安装有

Hadoop

,保证

Hadoop

版本至少在

2.2

以上,并且集群中安装有

HDFS

服务。

具体配置步骤如下:

1

)按照

3.1

节所述,下载并解压安装包,并将解压后的安装包重命名为

flink-1.13.0-yarn

本节的相关操作都将默认在此安装路径下执行。

(2)配置环境变量,增加环境变量配置如下:

$ sudo vim /etc/profile.d/my_env.sh

HADOOP_HOME=/opt/module/hadoop-2.7.5

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

export HADOOP_CLASSPATH=`hadoop classpath`

这里必须保证设置了环境变量

HADOOP_CLASSPATH

(3)启动

Hadoop

集群,包括

HDFS

YARN

[atguigu@hadoop102 ~]$ start-dfs.sh

[atguigu@hadoop103 ~]$ start-yarn.sh

分别在 3 台节点服务器查看进程启动情况。

[atguigu@hadoop102 ~]$ jps

5190 Jps

5062 NodeManager

4408 NameNode

4589 DataNode

[atguigu@hadoop103 ~]$ jps

5425 Jps

4680 ResourceManager

5241 NodeManager

4447 DataNode

[atguigu@hadoop104 ~]$ jps

4731 NodeManager

4333 DataNode

4861 Jps

4478 SecondaryNameNode

(4)进入

conf

目录,修改

flink-conf.yaml

文件,修改以下配置,这些配置项的含义在进

Standalone

模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认

配置。

$ cd /opt/module/flink-1.13.0-yarn/conf/

$ vim flink-conf.yaml

jobmanager.memory.process.size: 1600m

taskmanager.memory.process.size: 1728m

taskmanager.numberOfTaskSlots: 8

parallelism.default: 1

2、会话模式部署

YARN

的会话模式与独立集群略有不同,需要首先申请一个

YARN

会话(

YARN session

来启动

Flink

集群。具体步骤如下:

1. 启动集群

1

)启动

hadoop

集群

(HDFS, YARN)

(2)执行脚本命令向 YARN 集群申请资源,开启一个

YARN

会话,启动

Flink

集群。

$ bin/yarn-session.sh -nm test

可用参数解读:

-d

:分离模式,如果你不想让

Flink YARN

客户端一直前台运行,可以使用这个参数,

43

44

即使关掉当前对话窗口,

YARN session

也可以后台运行。

-jm(--jobManagerMemory)

:配置

JobManager

所需内存,默认单位

MB

-nm(--name)

:配置在

YARN UI

界面上显示的任务名。

-qu(--queue)

:指定

YARN

队列名。

-tm(--taskManager)

:配置每个

TaskManager

所使用内存。

注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,

YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也

不会把集群资源固定,同样是动态分配的。

YARN Session

启动之后会给出一个

web UI

地址以及一个

YARN application ID

,如下所示,

用户可以通过

web UI

或者命令行两种方式提交作业。

2. 提交作业

1

)通过

Web UI

提交作业

这种方式比较简单,与上文所述

Standalone

部署模式基本相同。

(2)通过命令行提交作业

① 将

Standalone

模式讲解中打包好的任务运行

JAR

包上传至集群

② 执行以下命令将该任务提交到已经开启的

Yarn-Session

中运行。

$ bin/flink run

-c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定

JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。

③ 任务提交成功后,可在

YARN

Web UI

界面查看运行情况。

如图

3-14

所示,从图中可以看到我们创建的

Yarn-Session

实际上是一个

Yarn

Application

,并且有唯一的

Application ID

④也可以通过

Flink

Web UI

页面查看提交任务的运行情况,如图

3-15

所示。

3、单作业模式部署

YARN

环境中,由于有了外部平台做资源调度,所以我们也可以直接向

YARN

提交一

个单独的作业,从而启动一个

Flink

集群。

1

)执行命令提交作业。

$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

 早期版本也有另一种写法:

$ bin/flink run

-m yarn-cluster

-c

com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

注意这里是通过参数

-m yarn-cluster

指定向

YARN

集群提交任务。

(2)在

YARN

ResourceManager

界面查看执行情况,如图

3-16

所示

 

点击可以打开 Flink Web UI 页面进行监控,如图 3-17 所示:

(3)可以使用命令行查看或取消作业,命令如下。

$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

这里的 application_XXXX_YY 是当前应用的 ID,是作业的 ID。注意如果取消作

业,整个 Flink 集群也会停掉。

4、应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行

flink run-application

命令即可。

1

)执行命令提交作业。

$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

(2)在命令行中查看或取消作业。

$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

$ ./bin/flink cancel

-t yarn-application

-Dyarn.application.id=application_XXXX_YY

(3)也可以通过

yarn.provided.lib.dirs

配置选项指定位置,将

jar

上传到远程。

$ ./bin/flink run-application

-t yarn-application

-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"

hdfs://myhdfs/jars/my-application.jar

这种方式下

jar

可以预先上传到

HDFS

,而不需要单独发送到集群,这就使得作业提交更

加轻量了。

5、高可用

       YARN

模式的高可用和独立模式(

Standalone

)的高可用原理不一样。 Standalone 模式中

,

同时启动多个

JobManager,

一个为“领导者”(

leader

),其他为“后备” standby)

,

leader

挂了

,

其他的才会有一个成为

leader

。 而 YARN

的高可用是只启动一个

Jobmanager,

当这个

Jobmanager

挂了之后

, YARN

会再次 启动一个,

所以其实是利用的

YARN

的重试次数来实现的高可用。

1

)在

yarn-site.xml

中配置。

yarn.resourcemanager.am.max-attempts

4

The maximum number of application master execution attempts.

注意: 配置完不要忘记分发, 和重启 YARN。

(2)在

flink-conf.yaml

中配置。

yarn.application-attempts: 3

high-availability: zookeeper

high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha

high-availability.zookeeper.quorum:

hadoop102:2181,hadoop103:2181,hadoop104:2181

high-availability.zookeeper.path.root: /flink-yarn

(3)启动

yarn-session

(4)杀死

JobManager,

查看复活情况。

注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该

小于这个值。

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: