Flink命令行提交时参数的传递
对于Flink应���的参数传递问题,官方提供了一个简单的工具ParameterTool。当然也可以不使用该工具,而去使用Common CLI(https://commons.apache.org/proper/commons-cli/)或Argparse4j(https://argparse4j.github.io/)都是可以的。详见官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/application_parameters/
()ParameterTool看其实现源码,还是很简单的,要注意读取配置文件时,出现无法读取到文件的情况,比如运行在yarn-application模式下的配置文件的读取问题。
1. ParameterTool的读取方式有3种:
1)从 .properties 配置文件读取
()String propertiesFilePath = "/home/sam/flink/myjob.properties"; ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath); File propertiesFile = new File(propertiesFilePath); ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile); InputStream propertiesFileInputStream = new FileInputStream(file); ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
2)从命令行参数读取
比如:--input hdfs:///mydata --elements 42
ParameterTool parameter = ParameterTool.fromArgs(args);
3)从系统属性读取
比如:-Dinput=hdfs:///mydata
ParameterTool parameter = ParameterTool.fromSystemProperties();
2. ParameterTool在程序中的使用
ParameterTool parameters = // ... parameter.getRequired("input"); parameter.get("output", "myDefaultValue"); parameter.getLong("expectedCount", -1L); parameter.getNumberOfParameters(); // .. there are more methods available.
也可以将其注册为全局参数
ParameterTool parameters = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameters);
3. 运行在yarn-application模式下,外部配置文件的读取
运行在 yarn-application模式时,因为外部配置文件在本地,会出现读取不到的情况,所以需要设置-Dyarn.ship-files=xxx.properties,xxx2.properties,来将配置文件上传到集群节点,代码里就可以直接通过ParameterTool.fromPropertiesFile(xxx.properties)来读取了。对于 yarn.ship-files的使用可以参见 YarnClusterDescriptor调用decodeFilesToShipToCluster再在startAppMaster中上传等操作。