UDF 无法在 spark scala 中获取文件名
UDF is not working to get file name in spark scala
这就是我在 spark 数据框中使用 UDF 的方式..
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show()
但是当我 运行 这个我得到低于错误
<console>:545: error: not found: value get_cus_val
df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show()
但是如果我这样做的话,我可以获得具有完整路径的文件名..
df1result.withColumn("DataPartition", input_file_name).show()
知道我错过了什么吗?
这不起作用,因为您只注册了 SQL 函数。你可以试试
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
或
df1result.selectExpr("*", "get_cus_val(input_file_name) as DataPartition").show()
你可以试试这个。它对我有用。
df.withColumn("file_name",callUDF("get_cus_val",input_file_name()))
这就是我在 spark 数据框中使用 UDF 的方式..
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show()
但是当我 运行 这个我得到低于错误
<console>:545: error: not found: value get_cus_val
df1result.withColumn("DataPartition", get_cus_val(input_file_name)).show()
但是如果我这样做的话,我可以获得具有完整路径的文件名..
df1result.withColumn("DataPartition", input_file_name).show()
知道我错过了什么吗?
这不起作用,因为您只注册了 SQL 函数。你可以试试
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
或
df1result.selectExpr("*", "get_cus_val(input_file_name) as DataPartition").show()
你可以试试这个。它对我有用。
df.withColumn("file_name",callUDF("get_cus_val",input_file_name()))