pyspark 中转换后的 DStream 在调用 pprint 时出错

Transformed DStream in pyspark gives error when pprint called on it

我正在通过 PySpark 探索 Spark Streaming,当我尝试将 transform 函数与 take 结合使用时遇到错误。

我可以通过 transformpprint 结果成功地对 DStream 使用 sortBy

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

但是如果我按照相同的模式使用 take 并尝试 pprint 它:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

作业失败

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

您可以在 the notebook here.

中查看完整代码和输出

我做错了什么?

传递给 transform 的函数应该从 RDD 转换为 RDD。如果你使用一个动作,比如 take,你必须将结果转换回 RDD:

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
  lambda rdd: sc.parallelize(rdd.take(5))
)

相比之下RDD.sortBy使用的是一个转换(returns一个RDD)所以不需要进一步的并行化。

旁注以下函数:

lambda foo: foo \
    .sortBy(lambda x:x[0].lower()) \
    .sortBy(lambda x:x[1], ascending=False)

没有多大意义。请记住,Spark 按随机排序,因此它不稳定。如果你想按多个字段排序,你应该使用像这样的组合键:

lambda x: (x[0].lower(), -x[1])