Spark streaming 示例使用附加参数调用 updateStateByKey
Spark streaming example calls updateStateByKey with additional parameters
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数而不是:
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
为什么需要(以及如何处理 - 这不在 updateStateByKey() 的签名中?)来传递分区器、布尔值和 RDD?
谢谢,
马特
是因为:
- 您会看到不同的 Spark 发布分支:https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala。在 Spark 1.2 中,此代码仅
updateStateByKey
接收单个函数作为参数,而在 1.3 中,他们对其进行了优化
- 1.2 和 1.3 中存在
updateStateByKey
的不同版本。但是1.2没有4个参数的版本,1.3才引入:https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
代码如下:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数而不是:
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
为什么需要(以及如何处理 - 这不在 updateStateByKey() 的签名中?)来传递分区器、布尔值和 RDD?
谢谢, 马特
是因为:
- 您会看到不同的 Spark 发布分支:https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala。在 Spark 1.2 中,此代码仅
updateStateByKey
接收单个函数作为参数,而在 1.3 中,他们对其进行了优化 - 1.2 和 1.3 中存在
updateStateByKey
的不同版本。但是1.2没有4个参数的版本,1.3才引入:https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
代码如下:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}