如何使用 Pyspark 合并两个 Dstreams(类似于普通 RDD 上的 .zip)
How to Combine two Dstreams using Pyspark (similar to .zip on normal RDD)
我知道我们可以在 pyspark 中组合(就像 R 中的 cbind)两个 RDD,如下所示:
rdd3 = rdd1.zip(rdd2)
我想在 pyspark 中对两个 Dstreams 执行相同的操作。有可能或有其他选择吗?
事实上,我正在使用 MLlib 随机森林模型来预测使用火花流。
最后想把feature Dstream & prediction Dstream结合起来做进一步的下游处理
提前致谢。
-奥拜德
最后,我在下面使用
诀窍是将 "native python map" 与 "spark spreaming transform" 一起使用。
可能不是一种优雅的方式,但它有效:)。
def predictScore(texts, modelRF):
predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\
map(lambda (txt, features) : (txt ,(features.split(','))) ).\
map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
transform( lambda rdd: sc.parallelize(\
map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
)\
)
# in the transform operation: x=text and y=features
# Return will be tuple of (score,'original text')
return predictions
希望,它能帮助面临同样问题的人。
如果有人有更好的想法,请post在这里。
-奥拜德
注意:我也在 spark 用户列表上提交了问题,post 我的答案也在那里。
我知道我们可以在 pyspark 中组合(就像 R 中的 cbind)两个 RDD,如下所示:
rdd3 = rdd1.zip(rdd2)
我想在 pyspark 中对两个 Dstreams 执行相同的操作。有可能或有其他选择吗?
事实上,我正在使用 MLlib 随机森林模型来预测使用火花流。 最后想把feature Dstream & prediction Dstream结合起来做进一步的下游处理
提前致谢。
-奥拜德
最后,我在下面使用
诀窍是将 "native python map" 与 "spark spreaming transform" 一起使用。 可能不是一种优雅的方式,但它有效:)。
def predictScore(texts, modelRF):
predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\
map(lambda (txt, features) : (txt ,(features.split(','))) ).\
map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
transform( lambda rdd: sc.parallelize(\
map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
)\
)
# in the transform operation: x=text and y=features
# Return will be tuple of (score,'original text')
return predictions
希望,它能帮助面临同样问题的人。 如果有人有更好的想法,请post在这里。
-奥拜德
注意:我也在 spark 用户列表上提交了问题,post 我的答案也在那里。