Job Executor : job manager 或 taskmanager
Job Executor : job manager or taskmanager
public static void main(String[] args) 抛出异常 {
最终 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
ParameterTool parameters = ParameterTool.fromArgs(args);
String ftpUri = "ftp://data-injest:12345@ftp-server:21/input/" + parameters.get("input-file-name");
String fileUri = parameters.get("ftp").toUpperCase(Locale.ROOT).equals("TRUE")?ftpUri:localUri;
MapFunction<String,Tuple2<Long,Collection<Some>>> mapFunction = { some code };
SomeSink sink = new SomeSink();
env.readTextFile(fileUri,"UTF-8")
.map(mapFunction)
.keyBy(tuple2 -> tuple2.f0)
.reduce((tuple2, t1) -> {
some-logic-including-loggers
}).addSink(sink);
env.execute("OPIS-PRICE-FEED-with-" + parameters.get("input-file-name"));
}
哪个节点执行逻辑,例如上面的 ftpUri 定义。
- 我尝试使用断点将调试器附加到作业管理器和任务管理器,但我没有看到这些行已启用。
- 如果在同一部分中添加了记录器语句,则哪个节点记录器将包含它。
该设置代码在客户端中执行,而不是在作业管理器或任务管理器中执行。
public static void main(String[] args) 抛出异常 {
最终 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
ParameterTool parameters = ParameterTool.fromArgs(args);
String ftpUri = "ftp://data-injest:12345@ftp-server:21/input/" + parameters.get("input-file-name");
String fileUri = parameters.get("ftp").toUpperCase(Locale.ROOT).equals("TRUE")?ftpUri:localUri;
MapFunction<String,Tuple2<Long,Collection<Some>>> mapFunction = { some code };
SomeSink sink = new SomeSink();
env.readTextFile(fileUri,"UTF-8")
.map(mapFunction)
.keyBy(tuple2 -> tuple2.f0)
.reduce((tuple2, t1) -> {
some-logic-including-loggers
}).addSink(sink);
env.execute("OPIS-PRICE-FEED-with-" + parameters.get("input-file-name"));
}
哪个节点执行逻辑,例如上面的 ftpUri 定义。
- 我尝试使用断点将调试器附加到作业管理器和任务管理器,但我没有看到这些行已启用。
- 如果在同一部分中添加了记录器语句,则哪个节点记录器将包含它。
该设置代码在客户端中执行,而不是在作业管理器或任务管理器中执行。