在 Spark SQL table 中分解多个列
Explode multiple columns in Spark SQL table
这里有一个关于这个问题的问题:
假设我们有如下额外的列:
**userId someString varA varB varC varD**
1 "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow]
2 "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan]
总结如下输出:
userId someString varA varB varC varD
1 "example1" 0 1 a red
1 "example1" 2 2 b green
1 "example1" 5 9 c yellow
2 "example2" 1 9 d white
2 "example2" 20 null e black
2 "example2" 5 6 f Cyan
答案是将 udf
定义为:
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
并定义 "withColumn".
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
如果我们需要扩展上面的答案,添加更多列,修改上面代码的最简单方法是什么。请帮忙。
我假设 varA,varB,varC,varD 的大小与您的示例相同。
scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input
scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result
scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)
scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)
scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]
scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString| varA| varB| varC| varD|
+-------+----------+----------+------------+---------+--------------------+
| 1| example1| [0, 2, 5]| [1, 2, 9]|[a, b, c]|[red, green, yellow]|
| 2| example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+
scala> def getResult(row : Input) : Iterable[Result] = {
| val user_id = row.user_id
| val someString = row.someString
| val varA = row.varA
| val varB = row.varB
| val varC = row.varC
| val varD = row.varD
| val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
| seq.toSeq
| }
getResult: (row: Input)Iterable[Result]
scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]
scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC| varD|
+-------+----------+----+----+----+------+
| 1| example1| 0| 1| a| red|
| 1| example1| 2| 2| b| green|
| 1| example1| 5| 9| c|yellow|
| 2| example2| 1| 9| d| white|
| 2| example2| 20|null| e| black|
| 2| example2| 5| 6| f| cyan|
+-------+----------+----+----+----+------+
如果列 varA、varB、varC 或 varD 的大小不同,则需要处理这些情况。
您可以迭代最大大小,如果值不存在于任何列中,则可以通过处理异常输出空值。
zip
udf 的方法似乎没问题,但您需要扩展 if 以获取更多集合。不幸的是,没有很好的方法来压缩 4 个序列,但这应该可行:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
如果您想为更多列扩展 UDF,请执行以下操作:
val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) =>
for (((xs,ys),zs) <- xs zip ys zip zs) yield (xs,ys,zs))
df.withColumn("vars", explode(zip($"varA", $"varB", $"varC"))).select(
$"userId", $"someString", $"vars._1".alias("varA"),
$"vars._2".alias("varB"),$"vars._3".alias("varC")).show
这个逻辑可以根据需要应用于n列。
这里有一个关于这个问题的问题:
假设我们有如下额外的列:
**userId someString varA varB varC varD**
1 "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow]
2 "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan]
总结如下输出:
userId someString varA varB varC varD
1 "example1" 0 1 a red
1 "example1" 2 2 b green
1 "example1" 5 9 c yellow
2 "example2" 1 9 d white
2 "example2" 20 null e black
2 "example2" 5 6 f Cyan
答案是将 udf
定义为:
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
并定义 "withColumn".
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
如果我们需要扩展上面的答案,添加更多列,修改上面代码的最简单方法是什么。请帮忙。
我假设 varA,varB,varC,varD 的大小与您的示例相同。
scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input
scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result
scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)
scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)
scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]
scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString| varA| varB| varC| varD|
+-------+----------+----------+------------+---------+--------------------+
| 1| example1| [0, 2, 5]| [1, 2, 9]|[a, b, c]|[red, green, yellow]|
| 2| example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+
scala> def getResult(row : Input) : Iterable[Result] = {
| val user_id = row.user_id
| val someString = row.someString
| val varA = row.varA
| val varB = row.varB
| val varC = row.varC
| val varD = row.varD
| val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
| seq.toSeq
| }
getResult: (row: Input)Iterable[Result]
scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]
scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC| varD|
+-------+----------+----+----+----+------+
| 1| example1| 0| 1| a| red|
| 1| example1| 2| 2| b| green|
| 1| example1| 5| 9| c|yellow|
| 2| example2| 1| 9| d| white|
| 2| example2| 20|null| e| black|
| 2| example2| 5| 6| f| cyan|
+-------+----------+----+----+----+------+
如果列 varA、varB、varC 或 varD 的大小不同,则需要处理这些情况。
您可以迭代最大大小,如果值不存在于任何列中,则可以通过处理异常输出空值。
zip
udf 的方法似乎没问题,但您需要扩展 if 以获取更多集合。不幸的是,没有很好的方法来压缩 4 个序列,但这应该可行:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
如果您想为更多列扩展 UDF,请执行以下操作:
val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) =>
for (((xs,ys),zs) <- xs zip ys zip zs) yield (xs,ys,zs))
df.withColumn("vars", explode(zip($"varA", $"varB", $"varC"))).select(
$"userId", $"someString", $"vars._1".alias("varA"),
$"vars._2".alias("varB"),$"vars._3".alias("varC")).show
这个逻辑可以根据需要应用于n列。