除非我更改列名,否则不应用 DataFrame 用户定义函数
DataFrame user-defined function not applied unless I change column name
我想使用隐式函数定义转换我的 DataFrame 列。
我已经定义了我的 DataFrame 类型,它包含额外的功能:
class MyDF(df: DataFrame) {
def bytes2String(colName: String): DataFrame = df
.withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
.drop(colname)
.withColumnRenamed(colname + "_tmp", colname)
}
然后我定义我的隐式转换class:
object NpDataFrameImplicits {
implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}
最后,这是我在小型 FunSuite 单元测试中所做的:
test("example: call to bytes2String") {
val df: DataFrame = ...
df.select("header.ID").show() // (1)
df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
df.bytes2String("header.ID").select("header.ID").show() // (3)
}
显示#1
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
显示#2
+------------------------------------+
|id |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
显示#3
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
正如您在这里看到的那样,第三个 show
(也就是没有重命名列)没有按预期工作,并向我们展示了一个未转换的 ID 列。有人知道为什么吗?
编辑:
df.select(col("header.ID") as "ID").bytes2String("ID").show()
的输出:
+------------------------------------+
|ID |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
让我用下面的例子解释一下你的转换函数发生了什么。
首先创建数据框:
val jsonString: String =
"""{
| "employee": {
| "id": 12345,
| "name": "krishnan"
| },
| "_id": 1
|}""".stripMargin
val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))
val df: DataFrame = sparkSession.read.json(jsonRDD)
df.printSchema()
输出结构:
root
|-- _id: long (nullable = true)
|-- employee: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
转换函数与您的相似:
def myConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
场景一#
对 root
级别字段进行转换。
myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()
结果:
+----------------+---+
| employee|_id|
+----------------+---+
|[12345,krishnan]| 2|
|[12345,krishnan]| 2|
+----------------+---+
+---+
|_id|
+---+
| 2|
| 2|
+---+
场景 2# 为 employee.id
进行转换。在这里,当我们使用 employee.id
时,数据帧在 root
级别添加了新字段 id
。这是正确的行为。
myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()
结果:
+---+----------------+-----------+
|_id| employee|employee.id|
+---+----------------+-----------+
| 1|[12345,krishnan]| 12346|
| 1|[12345,krishnan]| 12346|
+---+----------------+-----------+
+-----+
| id|
+-----+
|12345|
|12345|
+-----+
场景3#Select将内层字段转为根级再进行转换
myConversion(df.select("employee.id"), "id").show()
结果:
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
我的新转换函数,获取结构类型字段并执行转换并将其存储到结构类型字段本身。在这里,传递 employee
字段并单独转换 id
字段,但在 root
级别完成字段 employee
的更改。
case class Employee(id: String, name: String)
def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
你的场景号3#使用我的转换函数
myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()
结果#
+---+----------------+
|_id| employee|
+---+----------------+
| 1|[12346,krishnan]|
| 1|[12346,krishnan]|
+---+----------------+
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
我想使用隐式函数定义转换我的 DataFrame 列。
我已经定义了我的 DataFrame 类型,它包含额外的功能:
class MyDF(df: DataFrame) {
def bytes2String(colName: String): DataFrame = df
.withColumn(colname + "_tmp", udf((x: Array[Byte]) => bytes2String(x)).apply(col(colname)))
.drop(colname)
.withColumnRenamed(colname + "_tmp", colname)
}
然后我定义我的隐式转换class:
object NpDataFrameImplicits {
implicit def toNpDataFrame(df: DataFrame): NpDataFrame = new NpDataFrame(df)
}
最后,这是我在小型 FunSuite 单元测试中所做的:
test("example: call to bytes2String") {
val df: DataFrame = ...
df.select("header.ID").show() // (1)
df.bytes2String("header.ID").withColumnRenamed("header.ID", "id").select("id").show() // (2)
df.bytes2String("header.ID").select("header.ID").show() // (3)
}
显示#1
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
显示#2
+------------------------------------+
|id |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
显示#3
+-------------------------------------------------+
|ID |
+-------------------------------------------------+
|[62 BF 58 0C 6C 59 48 9C 91 13 7B 97 E7 29 C0 2F]|
|[5C 54 49 07 00 24 40 F4 B3 0E E7 2C 03 B8 06 3C]|
|[5C 3E A2 21 01 D9 4C 1B 80 4E F9 92 1D 4A FE 26]|
|[08 C1 55 89 CE 0D 45 8C 87 0A 4A 04 90 2D 51 56]|
+-------------------------------------------------+
正如您在这里看到的那样,第三个 show
(也就是没有重命名列)没有按预期工作,并向我们展示了一个未转换的 ID 列。有人知道为什么吗?
编辑:
df.select(col("header.ID") as "ID").bytes2String("ID").show()
的输出:
+------------------------------------+
|ID |
+------------------------------------+
|62bf580c-6c59-489c-9113-7b97e729c02f|
|5c544907-0024-40f4-b30e-e72c03b8063c|
|5c3ea221-01d9-4c1b-804e-f9921d4afe26|
|08c15589-ce0d-458c-870a-4a04902d5156|
+------------------------------------+
让我用下面的例子解释一下你的转换函数发生了什么。 首先创建数据框:
val jsonString: String =
"""{
| "employee": {
| "id": 12345,
| "name": "krishnan"
| },
| "_id": 1
|}""".stripMargin
val jsonRDD: RDD[String] = sc.parallelize(Seq(jsonString, jsonString))
val df: DataFrame = sparkSession.read.json(jsonRDD)
df.printSchema()
输出结构:
root
|-- _id: long (nullable = true)
|-- employee: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
转换函数与您的相似:
def myConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((x: Long) => (x+1).toString).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
场景一#
对 root
级别字段进行转换。
myConversion(df, "_id").show()
myConversion(df, "_id").select("_id").show()
结果:
+----------------+---+
| employee|_id|
+----------------+---+
|[12345,krishnan]| 2|
|[12345,krishnan]| 2|
+----------------+---+
+---+
|_id|
+---+
| 2|
| 2|
+---+
场景 2# 为 employee.id
进行转换。在这里,当我们使用 employee.id
时,数据帧在 root
级别添加了新字段 id
。这是正确的行为。
myConversion(df, "employee.id").show()
myConversion(df, "employee.id").select("employee.id").show()
结果:
+---+----------------+-----------+
|_id| employee|employee.id|
+---+----------------+-----------+
| 1|[12345,krishnan]| 12346|
| 1|[12345,krishnan]| 12346|
+---+----------------+-----------+
+-----+
| id|
+-----+
|12345|
|12345|
+-----+
场景3#Select将内层字段转为根级再进行转换
myConversion(df.select("employee.id"), "id").show()
结果:
+-----+
| id|
+-----+
|12346|
|12346|
+-----+
我的新转换函数,获取结构类型字段并执行转换并将其存储到结构类型字段本身。在这里,传递 employee
字段并单独转换 id
字段,但在 root
级别完成字段 employee
的更改。
case class Employee(id: String, name: String)
def myNewConversion(myDf: DataFrame, colName: String): DataFrame = {
myDf.withColumn(colName + "_tmp", udf((row: Row) => Employee((row.getLong(0)+1).toString, row.getString(1))).apply(col(colName)))
.drop(colName)
.withColumnRenamed(colName + "_tmp", colName)
}
你的场景号3#使用我的转换函数
myNewConversion(df, "employee").show()
myNewConversion(df, "employee").select("employee.id").show()
结果#
+---+----------------+
|_id| employee|
+---+----------------+
| 1|[12346,krishnan]|
| 1|[12346,krishnan]|
+---+----------------+
+-----+
| id|
+-----+
|12346|
|12346|
+-----+