PySpark groupByKey 返回 pyspark.resultiterable.ResultIterable

PySpark groupByKey returning pyspark.resultiterable.ResultIterable

我想弄清楚为什么我的 groupByKey 返回以下内容:

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]

我有如下所示的 flatMapped 值:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]

我只是在做一个简单的:

groupRDD = columnRDD.groupByKey()

您返回的是一个允许您迭代结果的对象。您可以通过对值调用 list() 将 groupByKey 的结果转换为列表,例如

example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')])

example.groupByKey().collect()
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......]

example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])]

你也可以使用

example.groupByKey().mapValues(list)

我建议您使用 cogroup(),而不是使用 groupByKey()。你可以参考下面的例子。

[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]

示例:

>>> x = sc.parallelize([("foo", 1), ("bar", 4)])
>>> y = sc.parallelize([("foo", -1)])
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
>>> print(z)

你应该得到想要的输出...

示例:

r1 = sc.parallelize([('a',1),('b',2)])
r2 = sc.parallelize([('b',1),('d',2)])
r1.cogroup(r2).mapValues(lambda x:tuple(reduce(add,__builtin__.map(list,x))))

结果:

[('d', (2,)), ('b', (2, 1)), ('a', (1,))]

除了上述答案之外,如果您想要唯一项的排序列表,请使用以下内容:

不同值和排序值列表

example.groupByKey().mapValues(set).mapValues(sorted)

仅排序值列表

example.groupByKey().mapValues(sorted)

上面的替代方法

# List of distinct sorted items
example.groupByKey().map(lambda x: (x[0], sorted(set(x[1]))))

# just sorted list of items
example.groupByKey().map(lambda x: (x[0], sorted(x[1])))

说你的代码是..

ex2 = ex1.groupByKey()

然后你运行..

ex2.take(5)

您将看到一个可迭代对象。如果您要对这些数据执行某些操作,这没关系,您可以继续前进。但是,如果您只想 print/see 在继续之前先设置值,这里有一些 hack..

ex2.toDF().show(20, False)

或者只是

ex2.toDF().show()

这将显示数据的值。您不应该使用 collect(),因为这会将 return 数据发送给驱动程序,如果您正在处理大量数据,那将会对您造成打击。现在,如果 ex2 = ex1.groupByKey() 是您的最后一步,并且您想要这些结果 returned,那么可以使用 collect(),但请确保您知道正在 returned 的数据量很小.

print(ex2.collect())

这是关于在 RDD 上使用 collect() 的另一个很好的 post

View RDD contents in Python Spark?