在 pyspark 中使用嵌套元素从 RDD 获取平面 RDD
Get a flat RDD from RDD with nested elements in pyspark
我在 Pyspark 中有两个 RDD,嵌套元素如下:
a = sc.parallelize(( (1,2), 3,(4,(6,7,(8,9,(11),10)),5,12)))
b = sc.parallelize(1,2,(3,4))
嵌套可以有任意深度。
我想合并它们然后找到任意深度的最大元素,所以我尝试将它转换为 RDD 而没有像这样的嵌套值 (1,2,3,4,6,7,8,9 ,11,10,5,12,1,2,3,4) 并使用其中任何一个(map、reduce、filter、flatmap、lamda 函数)获得最大值。谁能告诉我如何转换或获取最大元素。
我有一个解决方案,但它只适用于像
这样的两个深度级别
a = sc.parallelize(( (1,2), 3,(4,5)))
b = sc.parallelize((2,(4,6,7),8))
def maxReduce(tup):
return int(functools.reduce(lambda a,b : a if a>b else b, tup))
maxFunc = lambda x: maxReduce(x) if type(x) == tuple else x
a.union(b).map(lambda x: maxFunc(x)).reduce(lambda a,b : a if a>b else b)
以上代码仅适用于深度二,我需要在任何给定深度下使用它 (1,(2,3,(4,5,(6,(7,(8))))))
。
听起来递归函数的一个很好的用例:
from collections import Iterable
a = sc.parallelize(((1, 2), 3, (4, (6, 7, (8, 9, (11), 10)), 5, 12)))
b = sc.parallelize((1, 2, (3, 4)))
def maxIterOrNum(ele):
"""
this method finds the maximum value in an iterable otherwise return the value itself
:param ele: An iterable of numeric values or a numeric value
:return: a numeric value
"""
res = -float('inf')
if isinstance(ele, Iterable):
for x in ele:
res = max(res, maxIterOrNum(x))
return res
else:
return ele
a.union(b).reduce(lambda x, y: max(maxIterOrNum(x), maxIterOrNum(y)))
我在 Pyspark 中有两个 RDD,嵌套元素如下:
a = sc.parallelize(( (1,2), 3,(4,(6,7,(8,9,(11),10)),5,12)))
b = sc.parallelize(1,2,(3,4))
嵌套可以有任意深度。
我想合并它们然后找到任意深度的最大元素,所以我尝试将它转换为 RDD 而没有像这样的嵌套值 (1,2,3,4,6,7,8,9 ,11,10,5,12,1,2,3,4) 并使用其中任何一个(map、reduce、filter、flatmap、lamda 函数)获得最大值。谁能告诉我如何转换或获取最大元素。
我有一个解决方案,但它只适用于像
这样的两个深度级别a = sc.parallelize(( (1,2), 3,(4,5)))
b = sc.parallelize((2,(4,6,7),8))
def maxReduce(tup):
return int(functools.reduce(lambda a,b : a if a>b else b, tup))
maxFunc = lambda x: maxReduce(x) if type(x) == tuple else x
a.union(b).map(lambda x: maxFunc(x)).reduce(lambda a,b : a if a>b else b)
以上代码仅适用于深度二,我需要在任何给定深度下使用它 (1,(2,3,(4,5,(6,(7,(8))))))
。
听起来递归函数的一个很好的用例:
from collections import Iterable
a = sc.parallelize(((1, 2), 3, (4, (6, 7, (8, 9, (11), 10)), 5, 12)))
b = sc.parallelize((1, 2, (3, 4)))
def maxIterOrNum(ele):
"""
this method finds the maximum value in an iterable otherwise return the value itself
:param ele: An iterable of numeric values or a numeric value
:return: a numeric value
"""
res = -float('inf')
if isinstance(ele, Iterable):
for x in ele:
res = max(res, maxIterOrNum(x))
return res
else:
return ele
a.union(b).reduce(lambda x, y: max(maxIterOrNum(x), maxIterOrNum(y)))