在 Flink 管道中集成具有高内存需求的不可并行化任务

Integration of non-parallelizable task with high memory demands in Flink pipeline

我在 Yarn 集群中使用 Flink 来处理使用各种来源和接收器的数据。在拓扑中的某个点,有一个操作无法并行化,而且需要访问大量内存。事实上,我在这一步中使用的 API 需要以数组形式输入。现在,我已经实现了类似

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo> input = ...
List<Pojo> inputList = input.collect();
Pojo[] inputArray = inputList.toArray();
Pojo[] resultArray = costlyOperation(inputArray);
List<Pojo> resultList = Arrays.asList(resultArray);
DataSet<Pojo> result = env.fromCollection(resultList);
result.otherStuff()
  1. 这个解决方案看起来很不自然。是否有直接的方法将此任务合并到我的 Flink 管道中?
  2. 我在 中读到 collect() 函数不应该用于大型数据集。我相信将数据集收集到一个列表然后一个数组不会并行发生的事实不是我现在最大的问题,但是你仍然愿意将我上面所说的 input 写入一个文件并构建一个数组吗从那?
  3. 我也看过options在flink中配置托管内存。原则上,可以通过某种方式对其进行调整,以便为昂贵的操作留下足够的堆。另一方面,我担心拓扑中所有其他运算符的性能可能会受到影响。您对此有何看法?

您可以将 "collect->array->costlyOperation->array->fromCollection" 步骤替换为具有代理键的无键归约操作,该代理键对所有元组都具有唯一值,这样您就只能得到一个分区。这会像 Flink。

在作为 GroupReduceFunction 实现的高成本操作本身中,您将获得一个数据迭代器。如果您不需要访问所有数据 "at once",您也可以使用安全堆 space,因为您不需要将所有数据保存在内存中 reduce 中(但这当然取决于您的昂贵操作的计算结果) .

作为替代方案,您也可以调用 reduce() ,而无需 之前的 groupBy()。但是,您没有迭代器或输出收集器,只能计算部分聚合。 (参见 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations 中的 "Reduce")

使用 Flink 风格的操作有一个好处,那就是数据保存在集群中。如果你这样做 collect() 结果被传输到客户端,成本高昂的操作在客户端执行,结果被传输回集群。此外,如果输入量很大,Flink 会自动为你把中间结果 spill 到 disc。