前提条件

准备三台CenOS7机器,主机名称,例如:node2,node3,node4

三台机器安装好jdk8,通常情况下,flink需要结合hadoop处理大数据问题,建议先安装hadoop,可参考 hadoop安装

Flink集群规划

node2node3node4 JobManager TaskManager TaskManagerTaskManager

下载安装包

在node2机器操作

[hadoop@node2 ~]$ cd installfile/

[hadoop@node2 installfile]$ wget https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz --no-check-certificate

解压安装包

[hadoop@node2 installfile]$ tar -zxvf flink-1.17.1-bin-scala_2.12.tgz -C ~/soft

进入到解压后的目录,查看解压后的文件

[hadoop@node2 installfile]$ cd ~/soft/

[hadoop@node2 soft]$ ls

配置环境变量

[hadoop@node2 soft]$ sudo nano /etc/profile.d/my_env.sh

添加如下内容

#FLINK_HOME

export FLINK_HOME=/home/hadoop/soft/flink-1.17.1

export PATH=$PATH:$FLINK_HOME/bin

让环境变量生效

[hadoop@node2 soft]$ source /etc/profile

验证版本号

[hadoop@node2 soft]$ flink -v

Version: 1.17.1, Commit ID: 2750d5c

看到如上Version: 1.17.1版本号字样,说明环境变量配置成功。

配置flink

进入flink配置目录,查看配置文件

[hadoop@node2 ~]$ cd $FLINK_HOME/conf

[hadoop@node2 conf]$ ls

flink-conf.yaml       log4j-console.properties log4j-session.properties logback-session.xml masters zoo.cfg

log4j-cli.properties log4j.properties         logback-console.xml       logback.xml         workers

配置flink-conf.yaml

[hadoop@node2 conf]$ vim flink-conf.yaml

找到相关配置项并修改,如下

jobmanager.rpc.address: node2

jobmanager.bind-host: 0.0.0.0

taskmanager.bind-host: 0.0.0.0

taskmanager.host: node2

rest.address: node2

rest.bind-address: 0.0.0.0

配置workers

[hadoop@node2 conf]$ vim workers

把原有内容删除,添加内容如下:

node2

node3

node4

配置masters

[hadoop@node2 conf]$ vim masters

修改后内容如下:

node2:8081

分发flink安装目录

确保node3、node4机器已开启的情况下,执行如下分发命令。

[hadoop@node2 conf]$ xsync ~/soft/flink-1.17.1

修改node3和node4的配置

node3

进入node3机器flink的配置目录

[hadoop@node3 ~]$ cd ~/soft/flink-1.17.1/conf/

配置flinke-conf.yaml文件

[hadoop@node3 conf]$ vim flink-conf.yaml

将taskmanager.host的值修改为node3

taskmanager.host: node3

node4

进入node4机器flink的配置目录

[hadoop@node4 ~]$ cd ~/soft/flink-1.17.1/conf/

配置flinke-conf.yaml文件

[hadoop@node4 conf]$ vim flink-conf.yaml

将taskmanager.host的值修改为node4

taskmanager.host: node4

配置node3、node4的环境变量

分别到node3、node4机器配置环境变量

sudo nano /etc/profile.d/my_env.sh

添加如下配置

#FLINK_HOME

export FLINK_HOME=/home/hadoop/soft/flink-1.17.1

export PATH=$PATH:$FLINK_HOME/bin

让环境变量生效

source /etc/profile

验证版本号

flink -v

看到Version: 1.17.1版本号字样,说明环境变量配置成功。

启动flink集群

在node2机器,执行如下命令启动集群

[hadoop@node2 conf]$ start-cluster.sh

Starting cluster.

Starting standalonesession daemon on host node2.

Starting taskexecutor daemon on host node2.

Starting taskexecutor daemon on host node3.

Starting taskexecutor daemon on host node4.

查看进程

分别在node2、node3、node4机器上执行jps查看进程

[hadoop@node2 conf]$ jps

2311 StandaloneSessionClusterEntrypoint

2793 Jps

2667 TaskManagerRunner

[hadoop@node3 conf]$ jps

1972 TaskManagerRunner

2041 Jps

[hadoop@node4 conf]$ jps

2038 Jps

1965 TaskManagerRunner

node2有StandaloneSessionClusterEntrypoint、TaskManagerRunner进程

node3有TaskManagerRunner进程

node4有TaskManagerRunner进程

看到如上进程,说明flink集群配置成功。

Web UI

浏览器访问

node2的ip:8081

或者使用主机名称代替ip访问

node2:8081

注意:如果用windows的浏览器访问,需要先在windows的hosts文件添加ip和主机名node2的映射。

关闭flink集群

[hadoop@node2 ~]$ stop-cluster.sh

Stopping taskexecutor daemon (pid: 2667) on host node2.

Stopping taskexecutor daemon (pid: 1972) on host node3.

Stopping taskexecutor daemon (pid: 1965) on host node4.

Stopping standalonesession daemon (pid: 2311) on host node2.

查看进程

[hadoop@node2 ~]$ jps

4215 Jps

[hadoop@node3 ~]$ jps

2387 Jps

[hadoop@node4 ~]$ jps

2383 Jps

单独启动/关闭flink进程

单独启动flink进程

$ jobmanager.sh start

$ taskmanager.sh start

node2

[hadoop@node2 ~]$ jobmanager.sh start

Starting standalonesession daemon on host node2.

[hadoop@node2 ~]$ jps

4507 StandaloneSessionClusterEntrypoint

4572 Jps

[hadoop@node2 ~]$ taskmanager.sh start

Starting taskexecutor daemon on host node2.

[hadoop@node2 ~]$ jps

4867 TaskManagerRunner

4507 StandaloneSessionClusterEntrypoint

4940 Jps

node3

[hadoop@node3 ~]$ taskmanager.sh start

Starting taskexecutor daemon on host node3.

[hadoop@node3 ~]$ jps

2695 TaskManagerRunner

2764 Jps

node4

[hadoop@node4 ~]$ taskmanager.sh start

Starting taskexecutor daemon on host node4.

[hadoop@node4 ~]$ jps

2691 TaskManagerRunner

2755 Jps

单独关闭flink进程

$ jobmanager.sh stop

$ taskmanager.sh stop

node4

[hadoop@node4 ~]$ taskmanager.sh stop

Stopping taskexecutor daemon (pid: 2691) on host node4.

[hadoop@node4 ~]$ jps

3068 Jps

node3

[hadoop@node3 ~]$ taskmanager.sh stop

Stopping taskexecutor daemon (pid: 2695) on host node3.

[hadoop@node3 ~]$ jps

3073 Jps

node2

[hadoop@node2 ~]$ taskmanager.sh stop

Stopping taskexecutor daemon (pid: 4867) on host node2.

[hadoop@node2 ~]$ jobmanager.sh stop

Stopping standalonesession daemon (pid: 4507) on host node2.

[hadoop@node2 ~]$ jps

5545 Jps

提交应用测试

启动flink集群

[hadoop@node2 ~]$ start-cluster.sh

运行flink提供的wordcount案例程序

[hadoop@node2 ~]$ cd $FLINK_HOME/

[hadoop@node2 flink-1.17.1]$ flink run examples/streaming/WordCount.jar

Executing example with default input data.

Use --input to specify file input.

Printing result to stdout. Use --output to specify output path.

Job has been submitted with JobID 845db6f62321830f287e71b525e87dbe

Program execution finished

Job with JobID 845db6f62321830f287e71b525e87dbe has finished.

Job Runtime: 1290 ms

查看结果

查看输出的wordcount结果的末尾10行数据

[hadoop@node2 flink-1.17.1]$ tail log/flink-*-taskexecutor-*.out

(nymph,1)

(in,3)

(thy,1)

(orisons,1)

(be,4)

(all,2)

(my,1)

(sins,1)

(remember,1)

(d,4)

Web UI查看作业

查看作业

查看作业结果

在Task Managers 的node2上可以查看到作业的结果

分别查看Task Managers 的node3、node4的输出结果

可以看到,三台Task Manager机器中,只有node2机器有结果,说明,本次wordcount计算只用到了node2机器进行计算。用于计算的机器并不固定,只要集群里有机器能看到计算结果都是正常的。

总结:至此,flink进程正常,可以提交应用到fink集群运行,同时能查看到相应计算结果,说明集群功能正常。

完成!enjoy it!

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: