打破字符串后 Scala return UDF 中的多列
Scala return multiple columns in an UDF after breaking a string
我正在尝试打断一个字符串(从技术上讲,字符串是从数据框中的列传递的)并且 return 这些打断的字符串作为数据框的列表。斯卡拉版本 2.11。我更喜欢带有 udf 的 scala 或 pyspark 解决方案——因为 udf 内部发生了很多事情。
假设我有一个数据框:
val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")
我想要的结果(在udf中,因为里面发生了很多事情;Scala 2.11版)--
A B
123 ((a, b, c),
(d, e, f),
(x, y, z))
124 ((g, h, i),
(j, k, l),
(m, n, o))
写一个 udf 来打破这个和 return 列表 - 但是,我不知道如何定义或传递模式以将结果作为三列返回到数据框中。
def testUdf = udf( (s: String) => {
val a = s.split("\*").take(3).toList
val b = s.split("\*").drop(3).take(3).toList
val c = s.split("\*").drop(6).take(3).toList
val abc = (a, b, c).zipped.toList.asInstanceOf[List[String]]
// println (abc) // This does not work
} )
val df2 = df.select($"A", testUdf($"B").as("B")) // does not work because of type mismatch.
我尝试这样做 - 但是,我不知道如何将模式传递给上面的 Udf:
val schema = StructType(List(
StructField("C1", StringType),
StructField("C2", StringType),
StructField("C3", StringType)
))
此外,在此之后,我希望按照 中概述的过程来分解数据框。
不胜感激。
你定义的udf是一个from String to Unit - 去掉最后一行的abc
到return它
另请注意, asInstanceOf[] 不会更改类型 - 您仍然有一个元组
下面将为您提供列表列表
def testUdf = udf( (s: String) => {
val a = s.split("\*").take(3).toList
val b = s.split("\*").drop(3).take(3).toList
val c = s.split("\*").drop(6).take(3).toList
(a, b, c).zipped.toList.map(t=>List(t._1,t._2,t._3))
} )
问题是您的 UDF returns Unit
(最后一个语句是 return 值)。我会建议以下程序:
val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")
def testUdf = udf((s: String) => {
val Array(s1, s2, s3, s4, s5, s6, s7, s8, s9) = s.split(s"\*")
Seq(
(s1, s2, s3),
(s4, s5, s6),
(s7, s8, s9)
)
})
val df2 = df.select($"A", explode(testUdf($"B")).as("B"))
df2.show()
+---+-------+
| A| B|
+---+-------+
|123|[a,b,c]|
|123|[d,e,f]|
|123|[x,y,z]|
|124|[g,h,i]|
|124|[j,k,l]|
|124|[m,n,o]|
+---+-------+
您在 zipped
之前生成数组的方式无法正确呈现元素。以所需顺序生成元素的一种方法是在应用 zipped
.
之前使用二维数组预先转置元素
以下 UDF 将 1) 将一个字符串列拆分为一个数组,该数组被转置为一个二维数组,2) 将二维数组的行压缩为元组数组,以及 3) 转换该数组元组的元组到元组的元组(即结构的列类型结构):
val df = Seq(
("123", "a*b*c*d*e*f*x*y*z"),
("124", "g*h*i*j*k*l*m*n*o")
).toDF("A", "B")
import org.apache.spark.sql.functions._
def splitUdf = udf( (s: String) => {
val arr = s.split("\*")
val arr2d = Array.ofDim[String](3, 3)
for {
r <- 0 until 3
c <- 0 until 3
} arr2d(r)(c) = arr(c * 3 + r)
val arrTup = (arr2d(0), arr2d(1), arr2d(2)).zipped.toArray
(arrTup(0), arrTup(1), arrTup(2))
} )
val df2 = df.select($"A", splitUdf($"B").as("B"))
df2.show(false)
// +---+-------------------------+
// |A |B |
// +---+-------------------------+
// |123|[[a,b,c],[d,e,f],[x,y,z]]|
// |124|[[g,h,i],[j,k,l],[m,n,o]]|
// +---+-------------------------+
我正在尝试打断一个字符串(从技术上讲,字符串是从数据框中的列传递的)并且 return 这些打断的字符串作为数据框的列表。斯卡拉版本 2.11。我更喜欢带有 udf 的 scala 或 pyspark 解决方案——因为 udf 内部发生了很多事情。
假设我有一个数据框:
val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")
我想要的结果(在udf中,因为里面发生了很多事情;Scala 2.11版)--
A B
123 ((a, b, c),
(d, e, f),
(x, y, z))
124 ((g, h, i),
(j, k, l),
(m, n, o))
写一个 udf 来打破这个和 return 列表 - 但是,我不知道如何定义或传递模式以将结果作为三列返回到数据框中。
def testUdf = udf( (s: String) => {
val a = s.split("\*").take(3).toList
val b = s.split("\*").drop(3).take(3).toList
val c = s.split("\*").drop(6).take(3).toList
val abc = (a, b, c).zipped.toList.asInstanceOf[List[String]]
// println (abc) // This does not work
} )
val df2 = df.select($"A", testUdf($"B").as("B")) // does not work because of type mismatch.
我尝试这样做 - 但是,我不知道如何将模式传递给上面的 Udf:
val schema = StructType(List(
StructField("C1", StringType),
StructField("C2", StringType),
StructField("C3", StringType)
))
此外,在此之后,我希望按照
不胜感激。
你定义的udf是一个from String to Unit - 去掉最后一行的abc
到return它
另请注意, asInstanceOf[] 不会更改类型 - 您仍然有一个元组
下面将为您提供列表列表
def testUdf = udf( (s: String) => {
val a = s.split("\*").take(3).toList
val b = s.split("\*").drop(3).take(3).toList
val c = s.split("\*").drop(6).take(3).toList
(a, b, c).zipped.toList.map(t=>List(t._1,t._2,t._3))
} )
问题是您的 UDF returns Unit
(最后一个语句是 return 值)。我会建议以下程序:
val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")
def testUdf = udf((s: String) => {
val Array(s1, s2, s3, s4, s5, s6, s7, s8, s9) = s.split(s"\*")
Seq(
(s1, s2, s3),
(s4, s5, s6),
(s7, s8, s9)
)
})
val df2 = df.select($"A", explode(testUdf($"B")).as("B"))
df2.show()
+---+-------+
| A| B|
+---+-------+
|123|[a,b,c]|
|123|[d,e,f]|
|123|[x,y,z]|
|124|[g,h,i]|
|124|[j,k,l]|
|124|[m,n,o]|
+---+-------+
您在 zipped
之前生成数组的方式无法正确呈现元素。以所需顺序生成元素的一种方法是在应用 zipped
.
以下 UDF 将 1) 将一个字符串列拆分为一个数组,该数组被转置为一个二维数组,2) 将二维数组的行压缩为元组数组,以及 3) 转换该数组元组的元组到元组的元组(即结构的列类型结构):
val df = Seq(
("123", "a*b*c*d*e*f*x*y*z"),
("124", "g*h*i*j*k*l*m*n*o")
).toDF("A", "B")
import org.apache.spark.sql.functions._
def splitUdf = udf( (s: String) => {
val arr = s.split("\*")
val arr2d = Array.ofDim[String](3, 3)
for {
r <- 0 until 3
c <- 0 until 3
} arr2d(r)(c) = arr(c * 3 + r)
val arrTup = (arr2d(0), arr2d(1), arr2d(2)).zipped.toArray
(arrTup(0), arrTup(1), arrTup(2))
} )
val df2 = df.select($"A", splitUdf($"B").as("B"))
df2.show(false)
// +---+-------------------------+
// |A |B |
// +---+-------------------------+
// |123|[[a,b,c],[d,e,f],[x,y,z]]|
// |124|[[g,h,i],[j,k,l],[m,n,o]]|
// +---+-------------------------+