在 Flink 数据集中保存批量迭代的部分输出的可能性?

Possibility of saving partial outputs from bulk iteration in Flink Dataset?

我正在使用 flink 数据集进行迭代计算 API。
但每次迭代的结果都是我完整解决方案的一部分。
(如果需要更多详细信息:我在每次迭代中从上到下逐层计算网格节点,请参阅形式概念分析)

如果我使用 flink 数据集 API 进行批量迭代而不保存我的结果,代码将如下所示:

val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    (result,result)
}
end.count()

但是,如果我尝试在迭代 (_.writeAsText()) 或任何操作中写入部分结果,我将收到错误消息:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

没有批量迭代的替代方案似乎如下:

var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    count = start.count()
    all = all + count
}
println("total nodes: " + all)

但是这种方法在最小输入数据上特别慢,迭代版本需要 <30 秒,循环版本需要 >3 分钟。
我猜 flink 无法创建执行循环的最佳计划。

我应该尝试任何解决方法吗?是否可以对 flink 进行一些修改,以便能够在 hadoop 等上保存部分结果?

遗憾的是,目前无法从批量迭代中输出中间结果。只能在迭代结束时输出最终结果。

此外,正如您正确注意到的那样,Flink 无法有效地展开 while 循环或 for 循环,因此这也不起作用。

如果您的中间结果不是那么大,您可以尝试的一件事是将中间结果附加到部分解决方案中,然后在迭代结束时输出所有内容。在 TransitiveClosureNaive example 中实现了类似的方法,其中在迭代中发现的路径在下一个部分解决方案中累积。