PySpark 中的 mapPartitions 返回的空数组

Empty array returned by mapPartitions in PySpark

你好有人可以向我解释为什么 mapPartitions 对这两个函数的反应不同吗? (我看过这个,我认为我的问题不是因为我的可迭代对象是TraversableOnce,因为我创建它。

L=range(10)
J=range(5,15)
K=range(8,18)

data=J+K+L

def function_1(iter_listoflist):
    final_iterator=[]
    for sublist in iter_listoflist:
        final_iterator.append([x for x in sublist if x%9!=0])
    return iter(final_iterator)  

def function_2(iter_listoflist):
    final_iterator=[]
    listoflist=list(iter_listoflist)
    for i in range(len(listoflist)):
        for j in range(i+1,len(listoflist)):
            sublist=listoflist[i]+listoflist[j]
            final_iterator.append([x for x in sublist if x%9!=0])
            pass
        pass
    return iter(final_iterator)



sc.parallelize(data,3).glom().mapPartitions(function_1).collect()

returns

应该是什么
sc.parallelize(data,3).glom().mapPartitions(function_2).collect()

returns 一个空数组,我通过在末尾返回一个列表来检查代码,它做了我想要的。

感谢您的帮助

菲利普·C

其实很简单。 listoflist 的长度始终等于 1。要理解为什么会这样,您必须考虑调用 glom 时发生了什么。引用the docs吧returns:

an RDD created by coalescing all elements within each partition into a list.

意思是当你调用:

listoflist=list(iter_listoflist)

您将获得一个列表,其中包含该分区中的所有元素的单个元素列表。忽略所有细节:

(sc.parallelize(data, 3)
    .glom()
    .mapPartitionsWithIndex(lambda i, iter: [(i, list(iter))])
    .collect())

## [(0, [[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]]),
##     (1, [[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]]),
##     (2, [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])]

表示

  • range(len(listoflist)) 包含一个等于 0 的元素。
  • range(i+1,len(listoflist)) 替换为空 range(1, 1)

因此无事可做,你得到一个空的迭代器。

附带说明一下,所有这些 pass 语句以及 iter 调用都已完全过时。