在 Flink 数据集中保存批量迭代的部分输出的可能性?
Possibility of saving partial outputs from bulk iteration in Flink Dataset?
我正在使用 flink 数据集进行迭代计算 API。
但每次迭代的结果都是我完整解决方案的一部分。
(如果需要更多详细信息:我在每次迭代中从上到下逐层计算网格节点,请参阅形式概念分析)
如果我使用 flink 数据集 API 进行批量迭代而不保存我的结果,代码将如下所示:
val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
(result,result)
}
end.count()
但是,如果我尝试在迭代 (_.writeAsText()) 或任何操作中写入部分结果,我将收到错误消息:
org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
没有批量迭代的替代方案似乎如下:
var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
count = start.count()
all = all + count
}
println("total nodes: " + all)
但是这种方法在最小输入数据上特别慢,迭代版本需要 <30 秒,循环版本需要 >3 分钟。
我猜 flink 无法创建执行循环的最佳计划。
我应该尝试任何解决方法吗?是否可以对 flink 进行一些修改,以便能够在 hadoop 等上保存部分结果?
遗憾的是,目前无法从批量迭代中输出中间结果。只能在迭代结束时输出最终结果。
此外,正如您正确注意到的那样,Flink 无法有效地展开 while 循环或 for 循环,因此这也不起作用。
如果您的中间结果不是那么大,您可以尝试的一件事是将中间结果附加到部分解决方案中,然后在迭代结束时输出所有内容。在 TransitiveClosureNaive example 中实现了类似的方法,其中在迭代中发现的路径在下一个部分解决方案中累积。
我正在使用 flink 数据集进行迭代计算 API。
但每次迭代的结果都是我完整解决方案的一部分。
(如果需要更多详细信息:我在每次迭代中从上到下逐层计算网格节点,请参阅形式概念分析)
如果我使用 flink 数据集 API 进行批量迭代而不保存我的结果,代码将如下所示:
val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
(result,result)
}
end.count()
但是,如果我尝试在迭代 (_.writeAsText()) 或任何操作中写入部分结果,我将收到错误消息:
org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?
没有批量迭代的替代方案似乎如下:
var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
count = start.count()
all = all + count
}
println("total nodes: " + all)
但是这种方法在最小输入数据上特别慢,迭代版本需要 <30 秒,循环版本需要 >3 分钟。
我猜 flink 无法创建执行循环的最佳计划。
我应该尝试任何解决方法吗?是否可以对 flink 进行一些修改,以便能够在 hadoop 等上保存部分结果?
遗憾的是,目前无法从批量迭代中输出中间结果。只能在迭代结束时输出最终结果。
此外,正如您正确注意到的那样,Flink 无法有效地展开 while 循环或 for 循环,因此这也不起作用。
如果您的中间结果不是那么大,您可以尝试的一件事是将中间结果附加到部分解决方案中,然后在迭代结束时输出所有内容。在 TransitiveClosureNaive example 中实现了类似的方法,其中在迭代中发现的路径在下一个部分解决方案中累积。