Flink命令行提交时参数的传递

小明 2025-05-06 16:41:26 4

对于Flink应���的参数传递问题,官方提供了一个简单的工具ParameterTool。当然也可以不使用该工具,而去使用Common CLI(https://commons.apache.org/proper/commons-cli/)或Argparse4j(https://argparse4j.github.io/)都是可以的。详见官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/application_parameters/

()

ParameterTool看其实现源码,还是很简单的,要注意读取配置文件时,出现无法读取到文件的情况,比如运行在yarn-application模式下的配置文件的读取问题。

1. ParameterTool的读取方式有3种:

1)从 .properties 配置文件读取 

()
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

2)从命令行参数读取

比如:--input hdfs:///mydata --elements 42

ParameterTool parameter = ParameterTool.fromArgs(args);

3)从系统属性读取

比如:-Dinput=hdfs:///mydata

ParameterTool parameter = ParameterTool.fromSystemProperties();

2. ParameterTool在程序中的使用

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters();
// .. there are more methods available.

也可以将其注册为全局参数

ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

3. 运行在yarn-application模式下,外部配置文件的读取

运行在 yarn-application模式时,因为外部配置文件在本地,会出现读取不到的情况,所以需要设置-Dyarn.ship-files=xxx.properties,xxx2.properties,来将配置文件上传到集群节点,代码里就可以直接通过ParameterTool.fromPropertiesFile(xxx.properties)来读取了。对于 yarn.ship-files的使用可以参见 YarnClusterDescriptor调用decodeFilesToShipToCluster再在startAppMaster中上传等操作。

The End
微信