如何在 python spark 中正确使用具有两个输入和三个预期输出的枚举

How to correctly use enumerate with two inputs and three expected outputs in python spark

我一直在尝试复制 http://www.data-intuitive.com/2015/01/transposing-a-spark-rdd/ 中的代码以在 pyspark 中转置 RDD。我能够正确地加载我的 RDD 并将 zipWithIndex 方法应用到它,如下所示:

m1.rdd.zipWithIndex().collect()
[(Row(c1_1=1, c1_2=2, c1_3=3), 0),
(Row(c1_1=4, c1_2=5, c1_3=6), 1),
(Row(c1_1=7, c1_2=8, c1_3=9), 2)]

但是,当我想将它应用到带有 lambda 枚举该数组的 flatMap 时,语法无效:

m1.rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)]).take(1)

或者,位置参数 i 显示为缺失:

m1.rdd.zipWithIndex().flatMap(lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]).take(1)

当我运行 python中的lambda时,它需要一个额外的索引参数来捕获函数。

aa = m1.rdd.zipWithIndex().collect()
g = lambda x,i: [(i,j,e) for (j,e) in enumerate(x)]
g(aa,3) #extra parameter

这在我看来是不必要的,因为之前已经计算了指数。

我在 python 和 spark 方面相当业余,我想知道索引有什么问题以及为什么 spark 和 python 都没有捕捉到它们。谢谢你。

首先让我们看一下 RDD.flatMap 的签名(为清楚起见删除了 preservesPartitioning 参数):

flatMap(self: RDD[T], f: Callable[[T], Iterable[U]]) -> RDD[U]: ...

如您所见,flatMap 需要一个 unary function

回到你的代码:

  • lambda x, i: ... 是一个 binary function,所以显然它不会起作用。
  • lambda (x, i): ... 曾经是具有 tuple argument unpacking 的一元函数的语法。它使用结构匹配来解构(在 Python 命名法中解包)单个输入参数(此处为 Tuple[Any, Any])。此语法很脆弱,已在 Python 3 中删除。在 Python 3 中实现相同结果的正确方法是索引:

    lambda xi: ((x[1], j, e) for e, j in enumerate(x[0]))
    

    如果您更喜欢结构匹配,只需使用标准函数:

    def flatten(xsi):
        xs, i = xsi
        for j, x in enumerate(xs):
            yield i, j, x
    
    rdd.flatMap(flatten)