如何在作业提交前获取 Flink taskmanager 编号?

How to get Flink taskmanager number before job submitted?

我有一个由

启动的 Flink Datastream 作业
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(taskmanagernumber * x) // set env parallelism this line

env.addSource...map...addSink...
env.execute()

我想控制与taskmanager number相关的env并行度,如上面的代码。

有办法吗?或设置与任务管理器编号相关的并行度的任何解决方法?

您可以使用 reactive scheduler,它会根据集群提供的任何内容自动调整并行性。

您不必在作业本身中设置并行度。您可以在启动作业时在命令行中设置它:

flink run -p <parallelism> <jar-file> <arguments>

如果您不知道集群有多少插槽可用,您可以从 REST API 中获取信息。 /overview returns 像这样:

{
  taskmanagers: 2,
  slots-total: 2,
  slots-available: 2,
  jobs-running: 0,
  jobs-finished: 0,
  jobs-cancelled: 0,
  jobs-failed: 0,
  flink-version: "1.13.1",
  flink-commit: "a7f3192"
}

slots-available 就是您要找的。所以你可以做一些像

flink run -p `curl -s http://localhost:8081/overview | jq '.["slots-available"]'` ...