如何在作业提交前获取 Flink taskmanager 编号?
How to get Flink taskmanager number before job submitted?
我有一个由
启动的 Flink Datastream 作业
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(taskmanagernumber * x) // set env parallelism this line
env.addSource...map...addSink...
env.execute()
我想控制与taskmanager number相关的env并行度,如上面的代码。
有办法吗?或设置与任务管理器编号相关的并行度的任何解决方法?
您可以使用 reactive scheduler,它会根据集群提供的任何内容自动调整并行性。
您不必在作业本身中设置并行度。您可以在启动作业时在命令行中设置它:
flink run -p <parallelism> <jar-file> <arguments>
如果您不知道集群有多少插槽可用,您可以从 REST API 中获取信息。 /overview
returns 像这样:
{
taskmanagers: 2,
slots-total: 2,
slots-available: 2,
jobs-running: 0,
jobs-finished: 0,
jobs-cancelled: 0,
jobs-failed: 0,
flink-version: "1.13.1",
flink-commit: "a7f3192"
}
slots-available
就是您要找的。所以你可以做一些像
flink run -p `curl -s http://localhost:8081/overview | jq '.["slots-available"]'` ...
我有一个由
启动的 Flink Datastream 作业val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(taskmanagernumber * x) // set env parallelism this line
env.addSource...map...addSink...
env.execute()
我想控制与taskmanager number相关的env并行度,如上面的代码。
有办法吗?或设置与任务管理器编号相关的并行度的任何解决方法?
您可以使用 reactive scheduler,它会根据集群提供的任何内容自动调整并行性。
您不必在作业本身中设置并行度。您可以在启动作业时在命令行中设置它:
flink run -p <parallelism> <jar-file> <arguments>
如果您不知道集群有多少插槽可用,您可以从 REST API 中获取信息。 /overview
returns 像这样:
{
taskmanagers: 2,
slots-total: 2,
slots-available: 2,
jobs-running: 0,
jobs-finished: 0,
jobs-cancelled: 0,
jobs-failed: 0,
flink-version: "1.13.1",
flink-commit: "a7f3192"
}
slots-available
就是您要找的。所以你可以做一些像
flink run -p `curl -s http://localhost:8081/overview | jq '.["slots-available"]'` ...