Pyspark:如何 return 现有非空列的元组列表作为数据框中的列值之一

Pyspark: How to return a tuple list of existing non null columns as one of the column values in dataframe

我正在使用 pyspark 数据框,它是:

+----+----+---+---+---+----+
|   a|   b|  c|  d|  e|   f|
+----+----+---+---+---+----+
|   2|12.3|  5|5.6|  6|44.7|
|null|null|  9|9.3| 19|23.5|
|   8| 4.3|  7|0.5| 21| 8.2|
|   9| 3.8|  3|6.5| 45| 4.9|
|   3| 8.7|  2|2.8| 32| 2.9|
+----+----+---+---+---+----+

创建以上数据框:

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
                (None,None,9,9.3,19,23.5), 
                (8,4.3,7,0.5,21,8.2),
                 (9,3.8,3,6.5,45,4.9),
                  (3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()

我想创建另一个列 'g',其值是基于现有 非空 列的元组列表。元组列表的形式为:

((a 列,b 列),(c 列,d 列),(e 列,f 列))

输出列要求: 1) 创建元组列表时只考虑非空列。 2) Return 元组列表。

所以带有 'g' 列的最终数据框将是:

+---+----+---+---+---+----+--------------------------+
|  a|   b|  c|  d|  e|   f|                   g      |
+---+----+---+---+---+----+--------------------------+
|  2|12.3|  5|5.6|  6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul|  9 |9.3| 19|23.5|[[9,9.3],[19,23.5]        |
|  8| 4.3|  7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
|  9| 3.8|  3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
|  3| 8.7|  2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+

在列 "g" 中,第二行元组只有两对而不是三对,因为对于第二行,我们省略了列 'a' 和 'b' 值,因为它们是空值.

我不确定如何动态省略列中的空值并形成元组列表

我试图通过 udf 部分实现最后一列:

l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
    l=[]
    for i in range(len(l1)):
        l.append((r1[i],r2[i]))
    return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()

我尝试将 udf 声明为 ArrayType,但没有成功。任何帮助将非常感激。我正在使用 pyspark 1.6。谢谢!

您可以尝试这样的操作:

df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(), 
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h", 
when(col("c").isNotNull() & col("d").isNotNull(), 
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i", 
when(col("e").isNotNull() & col("f").isNotNull(), 
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat", 
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)

结果 df:

+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a   |b   |c  |d  |e  |f   |concat                                                                    
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0, 
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3), 
WrappedArray(19.0, 23.5)]        |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5), 
WrappedArray(21.0, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5), 
WrappedArray(45.0, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8), 
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+

我认为 UDF 应该可以正常工作。

import pyspark.sql.functions as F
from pyspark.sql.types import *

rdd =  sc.parallelize([(2,12.3,5,5.6,6,44.7), 
            (None,None,9,9.3,19,23.5), 
            (8,4.3,7,0.5,21,8.2),
             (9,3.8,3,6.5,45,4.9),
              (3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))

def combine(a,b,c,d,e,f):

    combine_ = []
    if None not in [a,b]:
        combine_.append([a,b])
    if None not in [c,d]:
        combine_.append([c,d])
    if None not in [e,f]:
        combine_.append([e,f])
    return combine_

combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
               F.col('d'),F.col('e'),F.col('f')))
df.show()

另一个使用udf的解决方案,

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a   |b   |c  |d  |e  |f   |g                                                                   |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2   |12.3|5  |5.6|6  |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9  |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)]                      |
|8   |4.3 |7  |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9   |3.8 |3  |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3   |8.7 |2  |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+