Spark Streaming:如何使用多个输入来处理工作?
Spark Streaming: How to process using multiple inputs to job?
输入 1: KV 数据流。
输入 2: 一些静态数据分区(用于处理输入 1 中的流)
问题可以建模为下图:
与 HDFS/RDD 分区共置: 我们如何确保流式任务 Map1、Map2 和 Map3 运行 在 HDFS/RDD 分区所在的机器上当前的?
图像描述: 假设 K 是流式密钥(不是元组)。 First Map 将其转换为元组(具有空值)并将其广播给 3 个映射器。每个映射器 运行ning 在不同的节点上,包含 RDD 的不同分区(或 HDFS 文件,这是第二个输入和静态数据)。每个 Mapper 使用 RDD 分区来计算键的值。最后我们要聚合键的值(使用 reduceByKey _+_)。
如果我理解正确的话:
K
是您通过 streaming
工作从 ***DStream
获得的 RDD
。我不知道您传入数据的来源。这个数据基本上是array/seq/list
个Keys.
- 您提到的静态数据是
PairedRDD
形式的 <K, Object>
。从 Object
中,您想为 incoming RDD
. 中的键提取 Val_n
- 您的目标是 avoid/minimize
shuffle
,而此 join
(或查找)过程。
为此,最好的策略是使用 Join
operation with incoming RDD
and Static RDD
with both RDDs partitioned
using the same Partitioner
. In case either of the data RDD is much smaller than the other one, you can explore broadcasting
the smaller one. I have recently tried this in my project and shared experience in the post: Random Partitioner behavior on the joined RDD
编辑:因为你想处理你的密钥,K
(假设K=Set{K1, K2...Kn}),使用StaticRDD
,代替分区,我建议采用如下方法。我没有检查语法和正确性,但你会明白意图。
val kRddBroadcastVar = .... // broadcasted variable
val keyValRDD = staticRDD.mapPartitions {
iter => transformKRddToTuple2Events(iter, kRddBroadcastVar )
}
def transformKRddToTuple2Events( iter: Iterator[Object], kRddBroadcastVar: List[KeyObjectType] ) : Iterator[(keyObjectType, valueObjectType )] {
val staticList = iter.toList
val toReturn = kRddBroadcastVar.map ( k => getKeyValue(k, staticList) )
toReturn.iterator
}
val outRdd = keyValRDD.reduceByKey( _ + _ )
如果这有意义,请将此答案标记为已接受。
您的静态 RDD 是否小到可以缓存。在这种情况下,Spark 将尝试 运行 在这些节点上流式处理任务。虽然不能保证。
此外,如果参考数据很小,为什么不广播该数据集。
我们一直在尝试解决与我们的数据存储 SnappyData ( http://www.snappydata.io/) 中的首选位置有关的类似问题,其中数据位置首先是 class 公民。
输入 1: KV 数据流。
输入 2: 一些静态数据分区(用于处理输入 1 中的流)
问题可以建模为下图:
与 HDFS/RDD 分区共置: 我们如何确保流式任务 Map1、Map2 和 Map3 运行 在 HDFS/RDD 分区所在的机器上当前的?
图像描述: 假设 K 是流式密钥(不是元组)。 First Map 将其转换为元组(具有空值)并将其广播给 3 个映射器。每个映射器 运行ning 在不同的节点上,包含 RDD 的不同分区(或 HDFS 文件,这是第二个输入和静态数据)。每个 Mapper 使用 RDD 分区来计算键的值。最后我们要聚合键的值(使用 reduceByKey _+_)。
如果我理解正确的话:
K
是您通过streaming
工作从***DStream
获得的RDD
。我不知道您传入数据的来源。这个数据基本上是array/seq/list
个Keys.- 您提到的静态数据是
PairedRDD
形式的<K, Object>
。从Object
中,您想为incoming RDD
. 中的键提取 - 您的目标是 avoid/minimize
shuffle
,而此join
(或查找)过程。
Val_n
为此,最好的策略是使用 Join
operation with incoming RDD
and Static RDD
with both RDDs partitioned
using the same Partitioner
. In case either of the data RDD is much smaller than the other one, you can explore broadcasting
the smaller one. I have recently tried this in my project and shared experience in the post: Random Partitioner behavior on the joined RDD
编辑:因为你想处理你的密钥,K
(假设K=Set{K1, K2...Kn}),使用StaticRDD
,代替分区,我建议采用如下方法。我没有检查语法和正确性,但你会明白意图。
val kRddBroadcastVar = .... // broadcasted variable
val keyValRDD = staticRDD.mapPartitions {
iter => transformKRddToTuple2Events(iter, kRddBroadcastVar )
}
def transformKRddToTuple2Events( iter: Iterator[Object], kRddBroadcastVar: List[KeyObjectType] ) : Iterator[(keyObjectType, valueObjectType )] {
val staticList = iter.toList
val toReturn = kRddBroadcastVar.map ( k => getKeyValue(k, staticList) )
toReturn.iterator
}
val outRdd = keyValRDD.reduceByKey( _ + _ )
如果这有意义,请将此答案标记为已接受。
您的静态 RDD 是否小到可以缓存。在这种情况下,Spark 将尝试 运行 在这些节点上流式处理任务。虽然不能保证。
此外,如果参考数据很小,为什么不广播该数据集。
我们一直在尝试解决与我们的数据存储 SnappyData ( http://www.snappydata.io/) 中的首选位置有关的类似问题,其中数据位置首先是 class 公民。