欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink on Yarn配置

程序员文章站 2022-07-14 13:50:38
...

Flink1.9.1与Yarn(hadoop2.6.2)结合使用,将flink的任务运行在Yarn上。

配置环境变量

  • 配置HADOOP_CLASSPATH

export HADOOP_CLASSPATH=${HADOOP_HOME}/bin/hadoop classpath

  • 配置  YARN_CONF_DIR or HADOOP_CONF_DIR

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

下载软件&目录权限

只需要在提交flink job的机器上部署即可。

下载flink-1.9.1-bin-scala_2.11.tgz、flink-shaded-hadoop-2-uber-2.6.5-7.0.jar(pre-bundled Hadoop)

并将flink-shaded-hadoop-2-uber-2.6.5-7.0.jar放置到${flink_home}lib目录中

权限全部使用hadoop用户

ln -s /app/flink-1.9.1 /opt/flink

chown -R hadoop:hadoop /app/flink-1.9.1

chown -h hadoop:hadoop /opt/flink

启动flink yarn cluster

/opt/flink/bin/yarn-session.sh -tm 8192 -s 10

可以通过-Dfs.overwrite-files=true方式,覆盖flink-conf.yaml中的配置项

重点参数

-d 参数当flink yarn客户端提交启动flink session cluster的命令后,退出。

cd /opt/flink;./bin/yarn-session.sh -d

-id参数,     -id,–applicationId YARN application Id

可以启动一个交互命令窗口,与已经启动的yarn session cluster通信。然后通过键入stop回车或ctrl+c可以关闭原有的flink YARN cluster。

提交任务

拷贝一个测试文件到hdfs目录

hadoop fs -copyFromLocal README.txt /kylin

提交flink job方式一

/opt/flink/bin/flink run ./examples/batch/WordCount.jar \

–input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result.txt

因为是在当前机器启动的flink yarn cluster,所以会发现/tmp/.yarn-properties-hadoop文件。

进而发现job jobmanager的地址

运行时,打印的部分日志:
Found Yarn properties file under /tmp/.yarn-properties-hadoop
#Generated YARN properties file
#Fri Dec 27 16:44:21 CST 2019
parallelism=1
dynamicPropertiesString=
applicationID=application_1577352810017_3044

提交flink job方式二

指定 --jobmanager方式提交任务

-m,–jobmanager            Address of the JobManager (master) to which
                                to connect. Use this flag to connect to a
                                different JobManager than the one specified
                                in the configuration.

在http://rm1:8088/cluster/app/application_1577352810017_3044

appattempt_1577352810017_3044_000001连接中,找到jobmanager的url,node1:59968。

执行命令:

/opt/flink/bin/flink run -m node1:59968 ./examples/batch/WordCount.jar \
--input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result333.txt

在Yarn上启动flink,只运行一个job

类似在yarn上运行spark任务。通过“-m yarn-cluster”指定要运行的yarn集群。“ yarn-cluster”是在yarn-site.xml里面配置的yarn.resourcemanager.cluster-id的值。

./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar \
--input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result444.txt

参考:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/yarn_setup.html