在 Spark Streaming 中寻找中位数
Finding median in Spark Streaming
我正在尝试编写最简单的代码示例:
from numpy import median
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 30)
qs = ssc.queueStream([[1,2,3],[4,5],[6,7,8,9,9]])
output = qs.foreachRDD(median)
output.pprint()
ssc.start(); ssc.awaitTermination()
我想为流中的每个 rdd 生成中值。我的流每 30 秒出现一次。
为了测试我的代码,我制作了一个 queueStream
当我查看输出类型时,我得到以下信息:
type(output)
<type 'NoneType'>
为什么会这样?当我尝试使用 map
将 median
应用于我的流时,它一次将中值函数应用于列表的每个成员。我想将中值函数作为聚合应用于整个 RDD,因此 map
函数是不可能的。
如何在 Spark Streaming 中计算流的中值?
原因是 foreachRDD
没有 return 任何东西。它只是为了执行一些操作。所以,你回来了一个空白。您可能想看看使用某些 window 操作。
扩展@Justin 的回答:发生了什么:
median()
是 分别应用于每个 DStream。然而结果没有被任何人使用.. 为什么? foreachRdd() 是一个 动作 而不是转换。
您应该查看 DStream 转换:例如map():这里是尚未 100% 调试的代码 - 但它提供了一个结构:
from pyspark.streaming import *
ssc = StreamingContext(sc, 30)
dataRdd = [sc.parallelize(d, 1) for d in [[1,2,3],[4,5],[6,7,8,9,9]]]
qs = ssc.queueStream(dataRdd)
def list_median((med,mylist),newval):
mylist = [newval] if not mylist else mylist.append(newval)
mylist = sorted(mylist)
return (mylist[int(len(mylist)/2)], mylist)
medians = qs.reduce(list_median).map(lambda (med,list): med)
def printRec(rdd):
import sys
rdd.foreach(lambda rec: sys.stderr.write(repr(rec)))
medians.foreachRDD(printRec)
ssc.start(); ssc.awaitTermination()
我正在尝试编写最简单的代码示例:
from numpy import median
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 30)
qs = ssc.queueStream([[1,2,3],[4,5],[6,7,8,9,9]])
output = qs.foreachRDD(median)
output.pprint()
ssc.start(); ssc.awaitTermination()
我想为流中的每个 rdd 生成中值。我的流每 30 秒出现一次。 为了测试我的代码,我制作了一个 queueStream
当我查看输出类型时,我得到以下信息:
type(output)
<type 'NoneType'>
为什么会这样?当我尝试使用 map
将 median
应用于我的流时,它一次将中值函数应用于列表的每个成员。我想将中值函数作为聚合应用于整个 RDD,因此 map
函数是不可能的。
如何在 Spark Streaming 中计算流的中值?
原因是 foreachRDD
没有 return 任何东西。它只是为了执行一些操作。所以,你回来了一个空白。您可能想看看使用某些 window 操作。
扩展@Justin 的回答:发生了什么:
median()
是 分别应用于每个 DStream。然而结果没有被任何人使用.. 为什么? foreachRdd() 是一个 动作 而不是转换。
您应该查看 DStream 转换:例如map():这里是尚未 100% 调试的代码 - 但它提供了一个结构:
from pyspark.streaming import *
ssc = StreamingContext(sc, 30)
dataRdd = [sc.parallelize(d, 1) for d in [[1,2,3],[4,5],[6,7,8,9,9]]]
qs = ssc.queueStream(dataRdd)
def list_median((med,mylist),newval):
mylist = [newval] if not mylist else mylist.append(newval)
mylist = sorted(mylist)
return (mylist[int(len(mylist)/2)], mylist)
medians = qs.reduce(list_median).map(lambda (med,list): med)
def printRec(rdd):
import sys
rdd.foreach(lambda rec: sys.stderr.write(repr(rec)))
medians.foreachRDD(printRec)
ssc.start(); ssc.awaitTermination()