Pyspark:如何 return 现有非空列的元组列表作为数据框中的列值之一
Pyspark: How to return a tuple list of existing non null columns as one of the column values in dataframe
我正在使用 pyspark 数据框,它是:
+----+----+---+---+---+----+
| a| b| c| d| e| f|
+----+----+---+---+---+----+
| 2|12.3| 5|5.6| 6|44.7|
|null|null| 9|9.3| 19|23.5|
| 8| 4.3| 7|0.5| 21| 8.2|
| 9| 3.8| 3|6.5| 45| 4.9|
| 3| 8.7| 2|2.8| 32| 2.9|
+----+----+---+---+---+----+
创建以上数据框:
rdd = sc.parallelize([(2,12.3,5,5.6,6,44.7),
(None,None,9,9.3,19,23.5),
(8,4.3,7,0.5,21,8.2),
(9,3.8,3,6.5,45,4.9),
(3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()
我想创建另一个列 'g',其值是基于现有 非空 列的元组列表。元组列表的形式为:
((a 列,b 列),(c 列,d 列),(e 列,f 列))
输出列要求:
1) 创建元组列表时只考虑非空列。
2) Return 元组列表。
所以带有 'g' 列的最终数据框将是:
+---+----+---+---+---+----+--------------------------+
| a| b| c| d| e| f| g |
+---+----+---+---+---+----+--------------------------+
| 2|12.3| 5|5.6| 6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul| 9 |9.3| 19|23.5|[[9,9.3],[19,23.5] |
| 8| 4.3| 7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
| 9| 3.8| 3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
| 3| 8.7| 2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+
在列 "g" 中,第二行元组只有两对而不是三对,因为对于第二行,我们省略了列 'a' 和 'b' 值,因为它们是空值.
我不确定如何动态省略列中的空值并形成元组列表
我试图通过 udf 部分实现最后一列:
l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
l=[]
for i in range(len(l1)):
l.append((r1[i],r2[i]))
return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()
我尝试将 udf 声明为 ArrayType,但没有成功。任何帮助将非常感激。我正在使用 pyspark 1.6。谢谢!
您可以尝试这样的操作:
df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(),
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h",
when(col("c").isNotNull() & col("d").isNotNull(),
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i",
when(col("e").isNotNull() & col("f").isNotNull(),
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat",
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)
结果 df:
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a |b |c |d |e |f |concat
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2 |12.3|5 |5.6|6 |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0,
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9 |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3),
WrappedArray(19.0, 23.5)] |
|8 |4.3 |7 |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5),
WrappedArray(21.0, 8.2)] |
|9 |3.8 |3 |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5),
WrappedArray(45.0, 4.9)] |
|3 |8.7 |2 |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8),
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
我认为 UDF 应该可以正常工作。
import pyspark.sql.functions as F
from pyspark.sql.types import *
rdd = sc.parallelize([(2,12.3,5,5.6,6,44.7),
(None,None,9,9.3,19,23.5),
(8,4.3,7,0.5,21,8.2),
(9,3.8,3,6.5,45,4.9),
(3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))
def combine(a,b,c,d,e,f):
combine_ = []
if None not in [a,b]:
combine_.append([a,b])
if None not in [c,d]:
combine_.append([c,d])
if None not in [e,f]:
combine_.append([e,f])
return combine_
combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
F.col('d'),F.col('e'),F.col('f')))
df.show()
另一个使用udf的解决方案,
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *
>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a |b |c |d |e |f |g |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2 |12.3|5 |5.6|6 |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9 |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)] |
|8 |4.3 |7 |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9 |3.8 |3 |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3 |8.7 |2 |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+
我正在使用 pyspark 数据框,它是:
+----+----+---+---+---+----+
| a| b| c| d| e| f|
+----+----+---+---+---+----+
| 2|12.3| 5|5.6| 6|44.7|
|null|null| 9|9.3| 19|23.5|
| 8| 4.3| 7|0.5| 21| 8.2|
| 9| 3.8| 3|6.5| 45| 4.9|
| 3| 8.7| 2|2.8| 32| 2.9|
+----+----+---+---+---+----+
创建以上数据框:
rdd = sc.parallelize([(2,12.3,5,5.6,6,44.7),
(None,None,9,9.3,19,23.5),
(8,4.3,7,0.5,21,8.2),
(9,3.8,3,6.5,45,4.9),
(3,8.7,2,2.8,32,2.9)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df.show()
我想创建另一个列 'g',其值是基于现有 非空 列的元组列表。元组列表的形式为:
((a 列,b 列),(c 列,d 列),(e 列,f 列))
输出列要求: 1) 创建元组列表时只考虑非空列。 2) Return 元组列表。
所以带有 'g' 列的最终数据框将是:
+---+----+---+---+---+----+--------------------------+
| a| b| c| d| e| f| g |
+---+----+---+---+---+----+--------------------------+
| 2|12.3| 5|5.6| 6|44.7|[[2,12.3],[5,5.6],[6,44.7]|
|nul|nul| 9 |9.3| 19|23.5|[[9,9.3],[19,23.5] |
| 8| 4.3| 7|0.5| 21| 8.2|[[8,4.3],[7,0.5],[21,8.2] |
| 9| 3.8| 3|6.5| 45| 4.9|[[9,3.8],[3,6.5],[45,4.9] |
| 3| 8.7| 2|2.8| 32| 2.9|[[3,8.7],[2,2.8],[32,2.9] |
+---+----+---+---+---+----+--------------------------+
在列 "g" 中,第二行元组只有两对而不是三对,因为对于第二行,我们省略了列 'a' 和 'b' 值,因为它们是空值.
我不确定如何动态省略列中的空值并形成元组列表
我试图通过 udf 部分实现最后一列:
l1=['a','c','e']
l2=['b','d','f']
def func1(r1,r2):
l=[]
for i in range(len(l1)):
l.append((r1[i],r2[i]))
return l
func1_udf=udf(func1)
df=df.withColumn('g',func1_udf(array(l1),array(l2)))
df.show()
我尝试将 udf 声明为 ArrayType,但没有成功。任何帮助将非常感激。我正在使用 pyspark 1.6。谢谢!
您可以尝试这样的操作:
df.withColumn("g", when(col("a").isNotNull() & col("b").isNotNull(),
array(col("a"),col("b"))).otherwise(array(lit("")))).withColumn("h",
when(col("c").isNotNull() & col("d").isNotNull(),
array(col("c"),col("d"))).otherwise(array(lit ("")))).withColumn("i",
when(col("e").isNotNull() & col("f").isNotNull(),
array(col("e"),col("f"))).otherwise(array(lit("")))).withColumn("concat",
array(col("g"),col("h"),col("i"))).drop('g','h','i').show(truncate=False)
结果 df:
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|a |b |c |d |e |f |concat
|
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
|2 |12.3|5 |5.6|6 |44.7|[WrappedArray(2.0, 12.3), WrappedArray(5.0,
5.6), WrappedArray(6.0, 44.7)]|
|null|null|9 |9.3|19 |23.5|[WrappedArray(), WrappedArray(9.0, 9.3),
WrappedArray(19.0, 23.5)] |
|8 |4.3 |7 |0.5|21 |8.2 |[WrappedArray(8.0, 4.3), WrappedArray(7.0, 0.5),
WrappedArray(21.0, 8.2)] |
|9 |3.8 |3 |6.5|45 |4.9 |[WrappedArray(9.0, 3.8), WrappedArray(3.0, 6.5),
WrappedArray(45.0, 4.9)] |
|3 |8.7 |2 |2.8|32 |2.9 |[WrappedArray(3.0, 8.7), WrappedArray(2.0, 2.8),
WrappedArray(32.0, 2.9)] |
+----+----+---+---+---+----+------------------------------------------------
--------------------------+
我认为 UDF 应该可以正常工作。
import pyspark.sql.functions as F
from pyspark.sql.types import *
rdd = sc.parallelize([(2,12.3,5,5.6,6,44.7),
(None,None,9,9.3,19,23.5),
(8,4.3,7,0.5,21,8.2),
(9,3.8,3,6.5,45,4.9),
(3,8.7,2,2.8,32,2.9)])
df = sql.createDataFrame(rdd, ('a', 'b','c','d','e','f'))
df = df.select(*(F.col(c).cast("float").alias(c) for c in df.columns))
def combine(a,b,c,d,e,f):
combine_ = []
if None not in [a,b]:
combine_.append([a,b])
if None not in [c,d]:
combine_.append([c,d])
if None not in [e,f]:
combine_.append([e,f])
return combine_
combine_udf = F.udf(combine,ArrayType(ArrayType(FloatType())))
df = df.withColumn('combined', combine_udf(F.col('a'),F.col('b'),F.col('c'),\
F.col('d'),F.col('e'),F.col('f')))
df.show()
另一个使用udf的解决方案,
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *
>>> arr_udf = F.udf(lambda row : [x for x in [row[0:2],row[2:4],row[4:6]] if all(x)],ArrayType(ArrayType(StringType())))
>>> df.select("*",arr_udf(F.struct([df[x] for x in df.columns])).alias('g')).show(truncate=False)
+----+----+---+---+---+----+--------------------------------------------------------------------+
|a |b |c |d |e |f |g |
+----+----+---+---+---+----+--------------------------------------------------------------------+
|2 |12.3|5 |5.6|6 |44.7|[WrappedArray(2, 12.3), WrappedArray(5, 5.6), WrappedArray(6, 44.7)]|
|null|null|9 |9.3|19 |23.5|[WrappedArray(9, 9.3), WrappedArray(19, 23.5)] |
|8 |4.3 |7 |0.5|21 |8.2 |[WrappedArray(8, 4.3), WrappedArray(7, 0.5), WrappedArray(21, 8.2)] |
|9 |3.8 |3 |6.5|45 |4.9 |[WrappedArray(9, 3.8), WrappedArray(3, 6.5), WrappedArray(45, 4.9)] |
|3 |8.7 |2 |2.8|32 |2.9 |[WrappedArray(3, 8.7), WrappedArray(2, 2.8), WrappedArray(32, 2.9)] |
+----+----+---+---+---+----+--------------------------------------------------------------------+