Flink 可以通过 java 代码将多个作业附加到 Web Ui 的 Stream 本地环境吗?
Can Flink attach multiple jobs to Stream local envirnoment with Web Ui by java code?
Flink 可以通过 java 代码将多个作业附加到 Web Ui 的 Stream 本地环境吗?
我的代码是这样的
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.addSource(...);
env.addSink(...);
env.execute("stream job1");
我想做的是遍历这个并将 job2 job3 附加到与 web ui 管理相同的环境。如何在 java 代码中完成此操作?
我尝试使用 env2 = StreamExecutionEnvironment.getExecutionEnvironment(),
但似乎新的 job2 没有附加到我之前创建的环境中。
请帮助我提前谢谢
您应该能够重新使用已创建的 StreamExecutionEnvironment
,您已将其存储在 env
。
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI
不会设置相应的上下文环境工厂。因此,StreamExecutionEnvironment.getExecutionEnvironment
不会 return 创建的启用了 Web UI 的本地环境。
似乎可行(使用单个 LocalStreamEnvironment
)一个做多个
DataStream stream = env.addSource(...);
...
stream.addSink(...);
最后
env.execute();
然而,这似乎是一个具有多个 source->sink 流的作业,而不是多个作业。
好的。这里的技巧是将作业提交到本地集群。
在主线程中,使用一些配置启动本地集群
Configuration configuration = new Configuration();
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 100);
// start cluster
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
exec.start();
// sleep or wait for all job finishes
Thread.sleep(Long.MAX_VALUE);
然后在其他线程中,提交作业(主线程exec
)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream stream = env.addSource(...);
...
stream.addSink(...);
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName(name);
JobGraph jobGraph = streamGraph.getJobGraph();
exec.submitJobAndWait(jobGraph, true);
注意StreamExecutionEnvironment
仅用于生成JobGraph
。 LocalFlinkMiniCluster
是执行的地方。
最后一行可以选择是否等待作业(submitJobDetached
)。
Flink 可以通过 java 代码将多个作业附加到 Web Ui 的 Stream 本地环境吗?
我的代码是这样的
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.addSource(...);
env.addSink(...);
env.execute("stream job1");
我想做的是遍历这个并将 job2 job3 附加到与 web ui 管理相同的环境。如何在 java 代码中完成此操作?
我尝试使用 env2 = StreamExecutionEnvironment.getExecutionEnvironment(),
但似乎新的 job2 没有附加到我之前创建的环境中。
请帮助我提前谢谢
您应该能够重新使用已创建的 StreamExecutionEnvironment
,您已将其存储在 env
。
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI
不会设置相应的上下文环境工厂。因此,StreamExecutionEnvironment.getExecutionEnvironment
不会 return 创建的启用了 Web UI 的本地环境。
似乎可行(使用单个 LocalStreamEnvironment
)一个做多个
DataStream stream = env.addSource(...);
...
stream.addSink(...);
最后
env.execute();
然而,这似乎是一个具有多个 source->sink 流的作业,而不是多个作业。
好的。这里的技巧是将作业提交到本地集群。
在主线程中,使用一些配置启动本地集群
Configuration configuration = new Configuration();
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 100);
// start cluster
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
exec.start();
// sleep or wait for all job finishes
Thread.sleep(Long.MAX_VALUE);
然后在其他线程中,提交作业(主线程exec
)
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream stream = env.addSource(...);
...
stream.addSink(...);
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName(name);
JobGraph jobGraph = streamGraph.getJobGraph();
exec.submitJobAndWait(jobGraph, true);
注意StreamExecutionEnvironment
仅用于生成JobGraph
。 LocalFlinkMiniCluster
是执行的地方。
最后一行可以选择是否等待作业(submitJobDetached
)。