Apache Flink - 是否可以平均分配插槽共享组?
Apache Flink - is it possible to evenly distribute slot sharing groups?
我们有一个带有操作的管道,分为 2 个工作负载 - Source -> Transform
在第一组并且是 CPU 密集型工作负载,它们被放入同一个槽共享组,假设source
。 Sink
,RAM 密集型工作负载,因为它使用批量上传并将大量数据保存在内存中。已发送至 sink
插槽共享组。
此外,我们有不同的并行级别 Source -> Transform
工作负载和 Sink
工作负载,因为第一个受源并行性限制。因此,例如,我们的 Source -> Transform
并行度为 50,同时 Sink
并行度等于 78。我们有 8 个 TM,每个有 16 个核心(因此有插槽)。
在这种情况下,我们理想的插槽分配策略似乎是在每个 TM 上为 Source -> Transform
分配 6-7 个插槽,其余的 - Sink
领先 CPU -RAM 工作负载大致均匀分布在所有 TM 上。
所以,我想知道是否有一些配置设置可以告诉我们均匀分配插槽共享组?
我只找到了 cluster.evenly-spread-out-slots 配置参数,但我不确定它是否真的均匀分布了插槽共享组,而不仅仅是插槽 - 例如,我同时获得了 10 个 Source -> Transform
任务的 TM我希望有 6 或 7 个。
那么,问题来了,能否让Flink在集群中均匀分配slot sharing groups?或者可能还有其他可能性吗?
在 taskmanagers 之间均匀分布一个 Flink operator 似乎有点类似于我的问题,但我主要问的是 slot sharing groups distribution。本主题还仅包含使用 cluster.evenly-spread-out-slots 的建议,但此后可能发生了一些变化。
我试过一次实现这个,但问题是 Flink 没有提供启用运算符放置的功能。我能得到的关闭是使用 .map(...).slotSharingGroup("name");
。正如关于“Set slot sharing group”的文档所说:
Set the slot sharing group of an operation. Flink will put operations
with the same slot sharing group into the same slot while keeping
operations that don't have the slot sharing group in other slots. This
can be used to isolate slots. The slot sharing group is inherited from
input operations if all input operations are in the same slot sharing
group. The name of the default slot sharing group is "default",
operations can explicitly be put into this group by calling
slotSharingGroup("default").
someStream.filter(...).slotSharingGroup("name");
因此,我根据我拥有的任务槽数以及并行度定义了不同的组。
我找到了一种解决方法来均匀分配插槽共享组。
从flink 1.9.2开始,引入了even tasks分发特性,可以通过flink-conf.yaml
中的cluster.evenly-spread-out-slots: true
开启:FLINK-12122 Spread out tasks evenly across all available registered TaskManagers. I tried to enable it and it didn't work. After digging a bit, I managed to find the developer's comment which stated that this feature works only in standalone mode as it requires resources to be preliminary pre-allocated - https://issues.apache.org/jira/browse/FLINK-12122?focusedCommentId=17013089&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17013089":
the feature only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling. Hence, when you are using the active Yarn mode and submit the first job, then there won't be any TMs registered. Consequently, Flink will allocate the first container, fill it up and then only allocate a new container. However, if you start Flink in standalone mode or after your first job finishes on Yarn there are still some TMs registered, then the next job would be spread out.
所以,想法是开始 a detached yarn session 增加空闲容器超时设置,首先提交一些短暂的假工作,它将简单地从 YARN 获取所需数量的资源并完成,然后开始立即将主管道分配给已经分配的容器,在这种情况下,cluster.evenly-spread-out-slots: true
可以做到这一点并平均分配所有插槽共享组。
因此,总而言之,完成了以下操作以获得作业中均匀分布的槽共享组:
resourcemanager.taskmanager-timeout
已增加,以允许在为空闲任务管理器释放容器之前提交主要作业。我把它增加到 1 分钟,这就足够了。
- 启动了一个
yarn-session
并向其动态提交了作业。
- 调整了主要工作,首先调用一个简单分配资源的假工作。在我的例子中,这个简单的代码在配置主管道之前起到了作用:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val job = env
.fromElements(0)
.map { x =>
x * 2
}
.setParallelism(parallelismMax)
.print()
val jobResult = env.execute("Resources pre-allocation job")
println(jobResult)
print("Done. Starting main job!")
我们有一个带有操作的管道,分为 2 个工作负载 - Source -> Transform
在第一组并且是 CPU 密集型工作负载,它们被放入同一个槽共享组,假设source
。 Sink
,RAM 密集型工作负载,因为它使用批量上传并将大量数据保存在内存中。已发送至 sink
插槽共享组。
此外,我们有不同的并行级别 Source -> Transform
工作负载和 Sink
工作负载,因为第一个受源并行性限制。因此,例如,我们的 Source -> Transform
并行度为 50,同时 Sink
并行度等于 78。我们有 8 个 TM,每个有 16 个核心(因此有插槽)。
在这种情况下,我们理想的插槽分配策略似乎是在每个 TM 上为 Source -> Transform
分配 6-7 个插槽,其余的 - Sink
领先 CPU -RAM 工作负载大致均匀分布在所有 TM 上。
所以,我想知道是否有一些配置设置可以告诉我们均匀分配插槽共享组?
我只找到了 cluster.evenly-spread-out-slots 配置参数,但我不确定它是否真的均匀分布了插槽共享组,而不仅仅是插槽 - 例如,我同时获得了 10 个 Source -> Transform
任务的 TM我希望有 6 或 7 个。
那么,问题来了,能否让Flink在集群中均匀分配slot sharing groups?或者可能还有其他可能性吗?
在 taskmanagers 之间均匀分布一个 Flink operator 似乎有点类似于我的问题,但我主要问的是 slot sharing groups distribution。本主题还仅包含使用 cluster.evenly-spread-out-slots 的建议,但此后可能发生了一些变化。
我试过一次实现这个,但问题是 Flink 没有提供启用运算符放置的功能。我能得到的关闭是使用 .map(...).slotSharingGroup("name");
。正如关于“Set slot sharing group”的文档所说:
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
someStream.filter(...).slotSharingGroup("name");
因此,我根据我拥有的任务槽数以及并行度定义了不同的组。
我找到了一种解决方法来均匀分配插槽共享组。
从flink 1.9.2开始,引入了even tasks分发特性,可以通过flink-conf.yaml
中的cluster.evenly-spread-out-slots: true
开启:FLINK-12122 Spread out tasks evenly across all available registered TaskManagers. I tried to enable it and it didn't work. After digging a bit, I managed to find the developer's comment which stated that this feature works only in standalone mode as it requires resources to be preliminary pre-allocated - https://issues.apache.org/jira/browse/FLINK-12122?focusedCommentId=17013089&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17013089":
the feature only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling. Hence, when you are using the active Yarn mode and submit the first job, then there won't be any TMs registered. Consequently, Flink will allocate the first container, fill it up and then only allocate a new container. However, if you start Flink in standalone mode or after your first job finishes on Yarn there are still some TMs registered, then the next job would be spread out.
所以,想法是开始 a detached yarn session 增加空闲容器超时设置,首先提交一些短暂的假工作,它将简单地从 YARN 获取所需数量的资源并完成,然后开始立即将主管道分配给已经分配的容器,在这种情况下,cluster.evenly-spread-out-slots: true
可以做到这一点并平均分配所有插槽共享组。
因此,总而言之,完成了以下操作以获得作业中均匀分布的槽共享组:
resourcemanager.taskmanager-timeout
已增加,以允许在为空闲任务管理器释放容器之前提交主要作业。我把它增加到 1 分钟,这就足够了。- 启动了一个
yarn-session
并向其动态提交了作业。 - 调整了主要工作,首先调用一个简单分配资源的假工作。在我的例子中,这个简单的代码在配置主管道之前起到了作用:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val job = env
.fromElements(0)
.map { x =>
x * 2
}
.setParallelism(parallelismMax)
.print()
val jobResult = env.execute("Resources pre-allocation job")
println(jobResult)
print("Done. Starting main job!")