pyspark 中转换后的 DStream 在调用 pprint 时出错
Transformed DStream in pyspark gives error when pprint called on it
我正在通过 PySpark 探索 Spark Streaming,当我尝试将 transform
函数与 take
结合使用时遇到错误。
我可以通过 transform
和 pprint
结果成功地对 DStream
使用 sortBy
。
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
但是如果我按照相同的模式使用 take
并尝试 pprint
它:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
作业失败
Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'
您可以在 the notebook here.
中查看完整代码和输出
我做错了什么?
传递给 transform
的函数应该从 RDD
转换为 RDD
。如果你使用一个动作,比如 take
,你必须将结果转换回 RDD
:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
相比之下RDD.sortBy
使用的是一个转换(returns一个RDD)所以不需要进一步的并行化。
旁注以下函数:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
没有多大意义。请记住,Spark 按随机排序,因此它不稳定。如果你想按多个字段排序,你应该使用像这样的组合键:
lambda x: (x[0].lower(), -x[1])
我正在通过 PySpark 探索 Spark Streaming,当我尝试将 transform
函数与 take
结合使用时遇到错误。
我可以通过 transform
和 pprint
结果成功地对 DStream
使用 sortBy
。
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
但是如果我按照相同的模式使用 take
并尝试 pprint
它:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
作业失败
Py4JJavaError: An error occurred while calling o25.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call return r._jrdd AttributeError: 'list' object has no attribute '_jrdd'
您可以在 the notebook here.
中查看完整代码和输出我做错了什么?
传递给 transform
的函数应该从 RDD
转换为 RDD
。如果你使用一个动作,比如 take
,你必须将结果转换回 RDD
:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
相比之下RDD.sortBy
使用的是一个转换(returns一个RDD)所以不需要进一步的并行化。
旁注以下函数:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
没有多大意义。请记住,Spark 按随机排序,因此它不稳定。如果你想按多个字段排序,你应该使用像这样的组合键:
lambda x: (x[0].lower(), -x[1])