Spark UDF 将列值拆分为多列
Spark UDF to split a column value to multiple columns
我有一个名为 'description' 值的数据框列,格式如下
ABC XXXXXXXXXXXX STORE NAME ABC TYPE1
我想将其解析为不同的 3 列,如下所示
| mode | type | store | description |
|------------------------------------------------------------------------|
| ABC | TYPE1 | STORE NAME | ABC XXXXXXXXXXXX STORE NAME ABC TYPE1 |
我尝试了like中建议的方法。它适用于简单的 UDF 函数,但不适用于我编写的函数。挑战在于store的值可以是2个字以上,也可以是没有固定字数的。
def myFunc1: (String => (String, String, String)) = { description =>
var descripe = description.split(" ")
val type = descripe(descripe.size - 1)
descripe = description.substring(description.indexOf("ABC") + 4, description.lastIndexOf("ABC")).split(" ")
val mode = descripe(0)
descripe(0) = ""
val store = descripe.mkString(" ").trim
(mode, store, type)
}
val schema = StructType(Array(
StructField("mode", StringType, true),
StructField("store", StringType, true),
StructField("type", StringType, true)
))
val myUDF = udf(myFunc1, schema)
val test = pos.withColumn("test", myUDF(col("description")))
test.printSchema()
val a =test.withColumn("mode", col("test").getItem("_1"))
.withColumn("store", col("test").getItem("_2"))
.withColumn("type", col("test").getItem("_3"))
.drop(col("test"))
a.printSchema()
a.show(5, false)
执行时出现以下错误
18/10/06 21:38:02 ERROR Executor: Exception in task 0.0 in stage 5.0
(TID 5) org.apache.spark.SparkException: Failed to execute user
defined function($anonfun$myFunc1: (string) =>
struct(mode:string,store:string,type:string)) at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.StringIndexOutOfBoundsException: String index out of range:
-4 at java.lang.String.substring(String.java:1967) at com.hasif.bank.track.trasaction.TransactionParser$$anonfun$myFunc1.apply(TransactionParser.scala:26)
at
com.hasif.bank.track.trasaction.TransactionParser$$anonfun$myFunc1.apply(TransactionParser.scala:22)
... 16 more
如有任何建议,我们将不胜感激。
看看这个。
scala> val df = Seq("ABC XXXXXXXXXXXX STORE NAME ABC TYPE1").toDF("desc")
df: org.apache.spark.sql.DataFrame = [desc: string]
scala> df.withColumn("mode",split('desc," ")(0)).withColumn("type",split('desc," ")(5)).withColumn("store",concat(split('desc," ")(2), lit(" "), split('desc," ")(3))).show(false)
+-------------------------------------+----+-----+----------+
|desc |mode|type |store |
+-------------------------------------+----+-----+----------+
|ABC XXXXXXXXXXXX STORE NAME ABC TYPE1|ABC |TYPE1|STORE NAME|
+-------------------------------------+----+-----+----------+
scala>
更新1:
scala> def splitStore(x:String):String=
| return x.split(" ").drop(2).init.init.mkString(" ")
splitStore: (x: String)String
scala> val mysplitstore = udf(splitStore(_:String):String)
mysplitstore: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> val df2 = Seq("ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1").toDF("desc")
df2: org.apache.spark.sql.DataFrame = [desc: string]
scala> val df3 = df2.withColumn("length",split('desc," "))
df3: org.apache.spark.sql.DataFrame = [desc: string, length: array<string>]
scala> val df4 = df3.withColumn("mode",split('desc," ")(size('length)-2)).withColumn("type",split('desc," ")(size('length)-1)).withColumn("store",mysplitstore('desc))
df4: org.apache.spark.sql.DataFrame = [desc: string, length: array<string> ... 3 more fields]
scala> df4.drop('length).show(false)
+-----------------------------------------+----+-----+--------------+
|desc |mode|type |store |
+-----------------------------------------+----+-----+--------------+
|ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1|ABC |TYPE1|STORE NAME XYZ|
+-----------------------------------------+----+-----+--------------+
scala>
我有一个名为 'description' 值的数据框列,格式如下
ABC XXXXXXXXXXXX STORE NAME ABC TYPE1
我想将其解析为不同的 3 列,如下所示
| mode | type | store | description |
|------------------------------------------------------------------------|
| ABC | TYPE1 | STORE NAME | ABC XXXXXXXXXXXX STORE NAME ABC TYPE1 |
我尝试了like
def myFunc1: (String => (String, String, String)) = { description =>
var descripe = description.split(" ")
val type = descripe(descripe.size - 1)
descripe = description.substring(description.indexOf("ABC") + 4, description.lastIndexOf("ABC")).split(" ")
val mode = descripe(0)
descripe(0) = ""
val store = descripe.mkString(" ").trim
(mode, store, type)
}
val schema = StructType(Array(
StructField("mode", StringType, true),
StructField("store", StringType, true),
StructField("type", StringType, true)
))
val myUDF = udf(myFunc1, schema)
val test = pos.withColumn("test", myUDF(col("description")))
test.printSchema()
val a =test.withColumn("mode", col("test").getItem("_1"))
.withColumn("store", col("test").getItem("_2"))
.withColumn("type", col("test").getItem("_3"))
.drop(col("test"))
a.printSchema()
a.show(5, false)
执行时出现以下错误
18/10/06 21:38:02 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) org.apache.spark.SparkException: Failed to execute user defined function($anonfun$myFunc1: (string) => struct(mode:string,store:string,type:string)) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -4 at java.lang.String.substring(String.java:1967) at com.hasif.bank.track.trasaction.TransactionParser$$anonfun$myFunc1.apply(TransactionParser.scala:26) at com.hasif.bank.track.trasaction.TransactionParser$$anonfun$myFunc1.apply(TransactionParser.scala:22) ... 16 more
如有任何建议,我们将不胜感激。
看看这个。
scala> val df = Seq("ABC XXXXXXXXXXXX STORE NAME ABC TYPE1").toDF("desc")
df: org.apache.spark.sql.DataFrame = [desc: string]
scala> df.withColumn("mode",split('desc," ")(0)).withColumn("type",split('desc," ")(5)).withColumn("store",concat(split('desc," ")(2), lit(" "), split('desc," ")(3))).show(false)
+-------------------------------------+----+-----+----------+
|desc |mode|type |store |
+-------------------------------------+----+-----+----------+
|ABC XXXXXXXXXXXX STORE NAME ABC TYPE1|ABC |TYPE1|STORE NAME|
+-------------------------------------+----+-----+----------+
scala>
更新1:
scala> def splitStore(x:String):String=
| return x.split(" ").drop(2).init.init.mkString(" ")
splitStore: (x: String)String
scala> val mysplitstore = udf(splitStore(_:String):String)
mysplitstore: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> val df2 = Seq("ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1").toDF("desc")
df2: org.apache.spark.sql.DataFrame = [desc: string]
scala> val df3 = df2.withColumn("length",split('desc," "))
df3: org.apache.spark.sql.DataFrame = [desc: string, length: array<string>]
scala> val df4 = df3.withColumn("mode",split('desc," ")(size('length)-2)).withColumn("type",split('desc," ")(size('length)-1)).withColumn("store",mysplitstore('desc))
df4: org.apache.spark.sql.DataFrame = [desc: string, length: array<string> ... 3 more fields]
scala> df4.drop('length).show(false)
+-----------------------------------------+----+-----+--------------+
|desc |mode|type |store |
+-----------------------------------------+----+-----+--------------+
|ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1|ABC |TYPE1|STORE NAME XYZ|
+-----------------------------------------+----+-----+--------------+
scala>