无法在pyspark中的可迭代RDD上使用过滤器
Unable to use filter on an iterable RDD in pyspark
我正在尝试应用一个函数,该函数根据另一个数据集中的数据范围过滤掉数据集中的某些值。我已经执行了一些 groupBys 和连接,所以我传递给函数的参数格式有两个 Iterables,如下所示:
g1 = g0.map(lambda x: timefilter(x[0]))
其中 x[0] 是 <pyspark.resultiterable.ResultIterable object at 0x23b6610>, <pyspark.resultiterable.ResultIterable object at 0x23b6310>)
当我输入函数 timefilter
时,我现在需要能够根据 x[0] 中的值过滤掉 x[1] 中的值。但是当我尝试以下操作时(在 twoList
和 twoRDD
上,尽管我在这里只显示 twoList):
def timefilter(RDDList):
oneList = list(RDDList[0])
twoList = list(RDDList[1])
twoRDD = RDDList[1]
test = twoList.filter(lambda x: x[4]=='helloworld')
return test
它给我以下错误:AttributeError: 'ResultIterable' object has no attribute 'filter'
然后是一堆错误。
似乎我不能对任何格式的可迭代对象使用过滤器,但感觉我遗漏了一些非常简单的东西。函数中是否缺少转换?
事实证明,在可迭代的 RDD 上进行过滤是不可能的,所以我只使用了 python 内置的过滤函数。大致如下:filter(lambda x: x[1] in oneList, twoList)
.
我正在尝试应用一个函数,该函数根据另一个数据集中的数据范围过滤掉数据集中的某些值。我已经执行了一些 groupBys 和连接,所以我传递给函数的参数格式有两个 Iterables,如下所示:
g1 = g0.map(lambda x: timefilter(x[0]))
其中 x[0] 是 <pyspark.resultiterable.ResultIterable object at 0x23b6610>, <pyspark.resultiterable.ResultIterable object at 0x23b6310>)
当我输入函数 timefilter
时,我现在需要能够根据 x[0] 中的值过滤掉 x[1] 中的值。但是当我尝试以下操作时(在 twoList
和 twoRDD
上,尽管我在这里只显示 twoList):
def timefilter(RDDList):
oneList = list(RDDList[0])
twoList = list(RDDList[1])
twoRDD = RDDList[1]
test = twoList.filter(lambda x: x[4]=='helloworld')
return test
它给我以下错误:AttributeError: 'ResultIterable' object has no attribute 'filter'
然后是一堆错误。
似乎我不能对任何格式的可迭代对象使用过滤器,但感觉我遗漏了一些非常简单的东西。函数中是否缺少转换?
事实证明,在可迭代的 RDD 上进行过滤是不可能的,所以我只使用了 python 内置的过滤函数。大致如下:filter(lambda x: x[1] in oneList, twoList)
.