如何在 python spark 中正确使用具有两个输入和三个预期输出的枚举
How to correctly use enumerate with two inputs and three expected outputs in python spark
我一直在尝试复制 http://www.data-intuitive.com/2015/01/transposing-a-spark-rdd/ 中的代码以在 pyspark 中转置 RDD。我能够正确地加载我的 RDD 并将 zipWithIndex 方法应用到它,如下所示:
m1.rdd.zipWithIndex().collect()
[(Row(c1_1=1, c1_2=2, c1_3=3), 0),
(Row(c1_1=4, c1_2=5, c1_3=6), 1),
(Row(c1_1=7, c1_2=8, c1_3=9), 2)]
但是,当我想将它应用到带有 lambda 枚举该数组的 flatMap 时,语法无效:
m1.rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)]).take(1)
或者,位置参数 i 显示为缺失:
m1.rdd.zipWithIndex().flatMap(lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]).take(1)
当我运行 python中的lambda时,它需要一个额外的索引参数来捕获函数。
aa = m1.rdd.zipWithIndex().collect()
g = lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]
g(aa,3) #extra parameter
这在我看来是不必要的,因为之前已经计算了指数。
我在 python 和 spark 方面相当业余,我想知道索引有什么问题以及为什么 spark 和 python 都没有捕捉到它们。谢谢你。
首先让我们看一下 RDD.flatMap
的签名(为清楚起见删除了 preservesPartitioning
参数):
flatMap(self: RDD[T], f: Callable[[T], Iterable[U]]) -> RDD[U]: ...
如您所见,flatMap
需要一个 unary function。
回到你的代码:
lambda x, i: ...
是一个 binary function,所以显然它不会起作用。
lambda (x, i): ...
曾经是具有 tuple argument unpacking 的一元函数的语法。它使用结构匹配来解构(在 Python 命名法中解包)单个输入参数(此处为 Tuple[Any, Any]
)。此语法很脆弱,已在 Python 3 中删除。在 Python 3 中实现相同结果的正确方法是索引:
lambda xi: ((x[1], j, e) for e, j in enumerate(x[0]))
如果您更喜欢结构匹配,只需使用标准函数:
def flatten(xsi):
xs, i = xsi
for j, x in enumerate(xs):
yield i, j, x
rdd.flatMap(flatten)
我一直在尝试复制 http://www.data-intuitive.com/2015/01/transposing-a-spark-rdd/ 中的代码以在 pyspark 中转置 RDD。我能够正确地加载我的 RDD 并将 zipWithIndex 方法应用到它,如下所示:
m1.rdd.zipWithIndex().collect()
[(Row(c1_1=1, c1_2=2, c1_3=3), 0),
(Row(c1_1=4, c1_2=5, c1_3=6), 1),
(Row(c1_1=7, c1_2=8, c1_3=9), 2)]
但是,当我想将它应用到带有 lambda 枚举该数组的 flatMap 时,语法无效:
m1.rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)]).take(1)
或者,位置参数 i 显示为缺失:
m1.rdd.zipWithIndex().flatMap(lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]).take(1)
当我运行 python中的lambda时,它需要一个额外的索引参数来捕获函数。
aa = m1.rdd.zipWithIndex().collect()
g = lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]
g(aa,3) #extra parameter
这在我看来是不必要的,因为之前已经计算了指数。
我在 python 和 spark 方面相当业余,我想知道索引有什么问题以及为什么 spark 和 python 都没有捕捉到它们。谢谢你。
首先让我们看一下 RDD.flatMap
的签名(为清楚起见删除了 preservesPartitioning
参数):
flatMap(self: RDD[T], f: Callable[[T], Iterable[U]]) -> RDD[U]: ...
如您所见,flatMap
需要一个 unary function。
回到你的代码:
lambda x, i: ...
是一个 binary function,所以显然它不会起作用。lambda (x, i): ...
曾经是具有 tuple argument unpacking 的一元函数的语法。它使用结构匹配来解构(在 Python 命名法中解包)单个输入参数(此处为Tuple[Any, Any]
)。此语法很脆弱,已在 Python 3 中删除。在 Python 3 中实现相同结果的正确方法是索引:lambda xi: ((x[1], j, e) for e, j in enumerate(x[0]))
如果您更喜欢结构匹配,只需使用标准函数:
def flatten(xsi): xs, i = xsi for j, x in enumerate(xs): yield i, j, x rdd.flatMap(flatten)