PySpark groupByKey 返回 pyspark.resultiterable.ResultIterable
PySpark groupByKey returning pyspark.resultiterable.ResultIterable
我想弄清楚为什么我的 groupByKey 返回以下内容:
[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
我有如下所示的 flatMapped 值:
[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
我只是在做一个简单的:
groupRDD = columnRDD.groupByKey()
您返回的是一个允许您迭代结果的对象。您可以通过对值调用 list() 将 groupByKey 的结果转换为列表,例如
example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')])
example.groupByKey().collect()
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......]
example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])]
你也可以使用
example.groupByKey().mapValues(list)
我建议您使用 cogroup(),而不是使用 groupByKey()。你可以参考下面的例子。
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
示例:
>>> x = sc.parallelize([("foo", 1), ("bar", 4)])
>>> y = sc.parallelize([("foo", -1)])
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
>>> print(z)
你应该得到想要的输出...
示例:
r1 = sc.parallelize([('a',1),('b',2)])
r2 = sc.parallelize([('b',1),('d',2)])
r1.cogroup(r2).mapValues(lambda x:tuple(reduce(add,__builtin__.map(list,x))))
结果:
[('d', (2,)), ('b', (2, 1)), ('a', (1,))]
除了上述答案之外,如果您想要唯一项的排序列表,请使用以下内容:
不同值和排序值列表
example.groupByKey().mapValues(set).mapValues(sorted)
仅排序值列表
example.groupByKey().mapValues(sorted)
上面的替代方法
# List of distinct sorted items
example.groupByKey().map(lambda x: (x[0], sorted(set(x[1]))))
# just sorted list of items
example.groupByKey().map(lambda x: (x[0], sorted(x[1])))
说你的代码是..
ex2 = ex1.groupByKey()
然后你运行..
ex2.take(5)
您将看到一个可迭代对象。如果您要对这些数据执行某些操作,这没关系,您可以继续前进。但是,如果您只想 print/see 在继续之前先设置值,这里有一些 hack..
ex2.toDF().show(20, False)
或者只是
ex2.toDF().show()
这将显示数据的值。您不应该使用 collect()
,因为这会将 return 数据发送给驱动程序,如果您正在处理大量数据,那将会对您造成打击。现在,如果 ex2 = ex1.groupByKey()
是您的最后一步,并且您想要这些结果 returned,那么可以使用 collect()
,但请确保您知道正在 returned 的数据量很小.
print(ex2.collect())
这是关于在 RDD 上使用 collect() 的另一个很好的 post
View RDD contents in Python Spark?
我想弄清楚为什么我的 groupByKey 返回以下内容:
[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
我有如下所示的 flatMapped 值:
[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
我只是在做一个简单的:
groupRDD = columnRDD.groupByKey()
您返回的是一个允许您迭代结果的对象。您可以通过对值调用 list() 将 groupByKey 的结果转换为列表,例如
example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')])
example.groupByKey().collect()
# Gives [(0, <pyspark.resultiterable.ResultIterable object ......]
example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
# Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])]
你也可以使用
example.groupByKey().mapValues(list)
我建议您使用 cogroup(),而不是使用 groupByKey()。你可以参考下面的例子。
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
示例:
>>> x = sc.parallelize([("foo", 1), ("bar", 4)])
>>> y = sc.parallelize([("foo", -1)])
>>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
>>> print(z)
你应该得到想要的输出...
示例:
r1 = sc.parallelize([('a',1),('b',2)])
r2 = sc.parallelize([('b',1),('d',2)])
r1.cogroup(r2).mapValues(lambda x:tuple(reduce(add,__builtin__.map(list,x))))
结果:
[('d', (2,)), ('b', (2, 1)), ('a', (1,))]
除了上述答案之外,如果您想要唯一项的排序列表,请使用以下内容:
不同值和排序值列表
example.groupByKey().mapValues(set).mapValues(sorted)
仅排序值列表
example.groupByKey().mapValues(sorted)
上面的替代方法
# List of distinct sorted items
example.groupByKey().map(lambda x: (x[0], sorted(set(x[1]))))
# just sorted list of items
example.groupByKey().map(lambda x: (x[0], sorted(x[1])))
说你的代码是..
ex2 = ex1.groupByKey()
然后你运行..
ex2.take(5)
您将看到一个可迭代对象。如果您要对这些数据执行某些操作,这没关系,您可以继续前进。但是,如果您只想 print/see 在继续之前先设置值,这里有一些 hack..
ex2.toDF().show(20, False)
或者只是
ex2.toDF().show()
这将显示数据的值。您不应该使用 collect()
,因为这会将 return 数据发送给驱动程序,如果您正在处理大量数据,那将会对您造成打击。现在,如果 ex2 = ex1.groupByKey()
是您的最后一步,并且您想要这些结果 returned,那么可以使用 collect()
,但请确保您知道正在 returned 的数据量很小.
print(ex2.collect())
这是关于在 RDD 上使用 collect() 的另一个很好的 post
View RDD contents in Python Spark?