PySpark 中的 mapPartitions 返回的空数组
Empty array returned by mapPartitions in PySpark
你好有人可以向我解释为什么 mapPartitions
对这两个函数的反应不同吗? (我看过这个,我认为我的问题不是因为我的可迭代对象是TraversableOnce
,因为我创建它。
L=range(10)
J=range(5,15)
K=range(8,18)
data=J+K+L
def function_1(iter_listoflist):
final_iterator=[]
for sublist in iter_listoflist:
final_iterator.append([x for x in sublist if x%9!=0])
return iter(final_iterator)
def function_2(iter_listoflist):
final_iterator=[]
listoflist=list(iter_listoflist)
for i in range(len(listoflist)):
for j in range(i+1,len(listoflist)):
sublist=listoflist[i]+listoflist[j]
final_iterator.append([x for x in sublist if x%9!=0])
pass
pass
return iter(final_iterator)
sc.parallelize(data,3).glom().mapPartitions(function_1).collect()
returns
应该是什么
sc.parallelize(data,3).glom().mapPartitions(function_2).collect()
returns 一个空数组,我通过在末尾返回一个列表来检查代码,它做了我想要的。
感谢您的帮助
菲利普·C
其实很简单。 listoflist
的长度始终等于 1。要理解为什么会这样,您必须考虑调用 glom
时发生了什么。引用the docs吧returns:
an RDD created by coalescing all elements within each partition
into a list.
意思是当你调用:
listoflist=list(iter_listoflist)
您将获得一个列表,其中包含该分区中的所有元素的单个元素列表。忽略所有细节:
(sc.parallelize(data, 3)
.glom()
.mapPartitionsWithIndex(lambda i, iter: [(i, list(iter))])
.collect())
## [(0, [[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]]),
## (1, [[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]]),
## (2, [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])]
表示
range(len(listoflist))
包含一个等于 0 的元素。
range(i+1,len(listoflist))
替换为空 range(1, 1)
因此无事可做,你得到一个空的迭代器。
附带说明一下,所有这些 pass
语句以及 iter
调用都已完全过时。
你好有人可以向我解释为什么 mapPartitions
对这两个函数的反应不同吗? (我看过这个TraversableOnce
,因为我创建它。
L=range(10)
J=range(5,15)
K=range(8,18)
data=J+K+L
def function_1(iter_listoflist):
final_iterator=[]
for sublist in iter_listoflist:
final_iterator.append([x for x in sublist if x%9!=0])
return iter(final_iterator)
def function_2(iter_listoflist):
final_iterator=[]
listoflist=list(iter_listoflist)
for i in range(len(listoflist)):
for j in range(i+1,len(listoflist)):
sublist=listoflist[i]+listoflist[j]
final_iterator.append([x for x in sublist if x%9!=0])
pass
pass
return iter(final_iterator)
sc.parallelize(data,3).glom().mapPartitions(function_1).collect()
returns
应该是什么sc.parallelize(data,3).glom().mapPartitions(function_2).collect()
returns 一个空数组,我通过在末尾返回一个列表来检查代码,它做了我想要的。
感谢您的帮助
菲利普·C
其实很简单。 listoflist
的长度始终等于 1。要理解为什么会这样,您必须考虑调用 glom
时发生了什么。引用the docs吧returns:
an RDD created by coalescing all elements within each partition into a list.
意思是当你调用:
listoflist=list(iter_listoflist)
您将获得一个列表,其中包含该分区中的所有元素的单个元素列表。忽略所有细节:
(sc.parallelize(data, 3)
.glom()
.mapPartitionsWithIndex(lambda i, iter: [(i, list(iter))])
.collect())
## [(0, [[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]]),
## (1, [[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]]),
## (2, [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])]
表示
range(len(listoflist))
包含一个等于 0 的元素。range(i+1,len(listoflist))
替换为空range(1, 1)
因此无事可做,你得到一个空的迭代器。
附带说明一下,所有这些 pass
语句以及 iter
调用都已完全过时。