flink :重叠阅读
flink : overlapping reading
我想用以下访问模式实现一个算法(类似于有限差分算法):
在本例中,dataset_1的第一个值用于计算dataset_2的第一个和第二个值。所以,对于这个值我应该有 2 个不同的键。
因此,dataset_1 的某些值 必须多次读取(2 或 3 次)。
我想我必须使用 groupBy(key).reduce(Algorithm)
转换,但我不知道如何定义键。
这听起来像是滑动 window 计算。您应该使用 DataStream
而不是 DataSet
并应用大小为 3 和步长为 1 的 window。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataset_2 = env.readTextFile(textPathTo-Dataset_1).window(Count.of(3)).every(Count.of(1)).WINDOW_FUNCTION(...).flatten();
有多个 WINDOW_FUNCTION
可用(例如,mxn、min、sum 或通用 mapWindow、foldWindow、reduceWindow)。请查看适合您的用例的函数的文档:https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html
除非您在单线程中处理 Flink 数据集,即并行度为 1,否则 Flink 数据集是无序的。您可以如何为数据添加顺序索引并将该索引用作键。
根据您的示例,我假设 dataset_2 的索引 4 的值是根据 dataset_1 的值 3、4 和 5 计算的,即 dataset_2 的每个值从 dataset_1.
的三个(或两个)值导出
有多种方法可以做你想做的事,有些很容易实现,有些则更有效率。
一种简单的方法是在 dataset_1 上应用 FlatMapFunction
,它使用三个键 i-1
发出三次索引为 i
的每个值, i
,和 i+1
。之后,您根据新键对结果数据集进行分组,并使用 GroupReduce 函数计算新值。这种方法使 dataset_1 的数据量增加了三倍,但可以轻松并行化。
另一种选择是手动进行范围分区,这与第一种方法类似,但更通用一些。我再次假设 dataset_1 的值具有顺序 idx
属性。使用 FlatMapFunction 将 partitionIds
分配给值,即,对于 100 个元素的分区大小,执行类似 partitionId = idx / 100
的操作。分区的第一个和最后一个元素需要发出两次。例如,partitionId 1 的 idx
100 和 199 的元素(值 100 到 199)需要通过两次发出这些值分别复制到分区 0 和 2。分配 partitionIds
后,您可以对分区的所有元素进行 groupBy(partitionId)
、sortGroup(idx)
和 groupReduce
。分区的大小是可配置的。
我想用以下访问模式实现一个算法(类似于有限差分算法):
在本例中,dataset_1的第一个值用于计算dataset_2的第一个和第二个值。所以,对于这个值我应该有 2 个不同的键。 因此,dataset_1 的某些值 必须多次读取(2 或 3 次)。
我想我必须使用 groupBy(key).reduce(Algorithm)
转换,但我不知道如何定义键。
这听起来像是滑动 window 计算。您应该使用 DataStream
而不是 DataSet
并应用大小为 3 和步长为 1 的 window。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataset_2 = env.readTextFile(textPathTo-Dataset_1).window(Count.of(3)).every(Count.of(1)).WINDOW_FUNCTION(...).flatten();
有多个 WINDOW_FUNCTION
可用(例如,mxn、min、sum 或通用 mapWindow、foldWindow、reduceWindow)。请查看适合您的用例的函数的文档:https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html
除非您在单线程中处理 Flink 数据集,即并行度为 1,否则 Flink 数据集是无序的。您可以如何为数据添加顺序索引并将该索引用作键。
根据您的示例,我假设 dataset_2 的索引 4 的值是根据 dataset_1 的值 3、4 和 5 计算的,即 dataset_2 的每个值从 dataset_1.
的三个(或两个)值导出有多种方法可以做你想做的事,有些很容易实现,有些则更有效率。
一种简单的方法是在 dataset_1 上应用 FlatMapFunction
,它使用三个键 i-1
发出三次索引为 i
的每个值, i
,和 i+1
。之后,您根据新键对结果数据集进行分组,并使用 GroupReduce 函数计算新值。这种方法使 dataset_1 的数据量增加了三倍,但可以轻松并行化。
另一种选择是手动进行范围分区,这与第一种方法类似,但更通用一些。我再次假设 dataset_1 的值具有顺序 idx
属性。使用 FlatMapFunction 将 partitionIds
分配给值,即,对于 100 个元素的分区大小,执行类似 partitionId = idx / 100
的操作。分区的第一个和最后一个元素需要发出两次。例如,partitionId 1 的 idx
100 和 199 的元素(值 100 到 199)需要通过两次发出这些值分别复制到分区 0 和 2。分配 partitionIds
后,您可以对分区的所有元素进行 groupBy(partitionId)
、sortGroup(idx)
和 groupReduce
。分区的大小是可配置的。