UDF 生成 JSON 行为不一致的字符串
UDF to generate JSON string behaving inconsistently
我正在尝试生成一个 JSON 字符串以在单个 STRING 列中存储可变数量的历史记录。该代码适用于我所有的小测试,但在实际数据上 运行 时失败(没有错误,只是没有数据)。这是我拥有的:
class HistoryDetail (
var date : String,
var val1 : Int,
var val2 : Int,
var changeCode : String
)
class HistoryHeader(
var numDetailRecords : Int,
var calcDate : String,
var historyRecords : List[HistoryDetail]
)
def getJSON = (val1:Int, val2:Int) => {
implicit val formats = org.json4s.DefaultFormats;
val today = LocalDate.now
val hdl = List(new HistoryDetail(today.toString, val1, val2, "D"))
val hh:HistoryHeader = new HistoryHeader(1,today.toString,hdl)
Serialization.write(hh);
}
调用 Scala 函数的非常简单的测试工作正常:
val strJson = getJSON(1000,1000)
strJson: String = {"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":1000,"val2":1000,"changeCode":"D"}]}
创建 UDF 并应用于小型 DataFrame 效果很好:
spark.udf.register("getJSONUDF", getJSON)
val smallDF = Seq((100, 100), (101, 101), (102, 102)).toDF("int_col1", "int_col2").withColumn("json_col",callUDF("getJSONUDF", $"int_col1", $"int_col2"))
smallDF.show(false)
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
|int_col1|int_col2|json_col |
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
|100 |100 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":100,"val2":100,"changeCode":"D"}]}|
|101 |101 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":101,"val2":101,"changeCode":"D"}]}|
|102 |102 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":102,"val2":102,"changeCode":"D"}]}|
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
运行 它针对实际数据失败(再次没有错误,只是没有数据):
val bigDF = spark.read.table("table_name")
.select($"int_col1",$"int_col2")
.withColumn("json_col",callUDF("getJSONUDF", $"int_col1", $"int_col2"))
bigDF.show(false)
+--------+--------+--------+
|int_col1|int_col2|json_col|
+--------+--------+--------+
|18995 |12702 |{} |
|14989 |46998 |{} |
|25588 |25051 |{} |
|18750 |52282 |{} |
|19963 |25745 |{} |
|17500 |21587 |{} |
|21999 |20379 |{} |
|25975 |5988 |{} |
|26382 |5988 |{} |
|7049 |101907 |{} |
|45997 |47472 |{} |
|45997 |47472 |{} |
|13950 |158957 |{} |
|18999 |123689 |{} |
|33842 |69623 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
+--------+--------+--------+
only showing top 20 rows
(版本:java1.8.0_60,spark 2.2.0,scala 2.11.8)
关于为什么我在使用较大的 DataFrame 时得到一个空 JSON 对象有什么想法吗?
TBH 我不知道到底出了什么问题,因为我预计在某个时候会出现 Task not serializable
错误。我最好的猜测是 getJSON
不是线程安全的,并且在要编码的对象的某个地方保持集中状态。同样,这可能是完全错误的
我认为您的方法可以更好,通过使用 Spark 的 to_json
函数。
def getHistoryReader = udf((val1:Int, val2:Int) => {
val today = LocalDate.now
val hdl = List(new HistoryDetail(today.toString, val1, val2, "D"))
new HistoryHeader(1,today.toString,hdl)
})
val bigDF = spark.read.table("table_name")
.select($"int_col1",$"int_col2")
.withColumn("json_col",to_json(getHistoryReader($"int_col1", $"int_col2")))
bigDF.show(false)
在我看来看起来更简洁,并将 json 序列化留给 Spark。
这应该有效
我正在尝试生成一个 JSON 字符串以在单个 STRING 列中存储可变数量的历史记录。该代码适用于我所有的小测试,但在实际数据上 运行 时失败(没有错误,只是没有数据)。这是我拥有的:
class HistoryDetail (
var date : String,
var val1 : Int,
var val2 : Int,
var changeCode : String
)
class HistoryHeader(
var numDetailRecords : Int,
var calcDate : String,
var historyRecords : List[HistoryDetail]
)
def getJSON = (val1:Int, val2:Int) => {
implicit val formats = org.json4s.DefaultFormats;
val today = LocalDate.now
val hdl = List(new HistoryDetail(today.toString, val1, val2, "D"))
val hh:HistoryHeader = new HistoryHeader(1,today.toString,hdl)
Serialization.write(hh);
}
调用 Scala 函数的非常简单的测试工作正常:
val strJson = getJSON(1000,1000)
strJson: String = {"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":1000,"val2":1000,"changeCode":"D"}]}
创建 UDF 并应用于小型 DataFrame 效果很好:
spark.udf.register("getJSONUDF", getJSON)
val smallDF = Seq((100, 100), (101, 101), (102, 102)).toDF("int_col1", "int_col2").withColumn("json_col",callUDF("getJSONUDF", $"int_col1", $"int_col2"))
smallDF.show(false)
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
|int_col1|int_col2|json_col |
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
|100 |100 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":100,"val2":100,"changeCode":"D"}]}|
|101 |101 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":101,"val2":101,"changeCode":"D"}]}|
|102 |102 |{"numDetailRecords":1,"calcDate":"2018-04-23","historyRecords":[{"date":"2018-04-23","val1":102,"val2":102,"changeCode":"D"}]}|
+--------+--------+------------------------------------------------------------------------------------------------------------------------------+
运行 它针对实际数据失败(再次没有错误,只是没有数据):
val bigDF = spark.read.table("table_name")
.select($"int_col1",$"int_col2")
.withColumn("json_col",callUDF("getJSONUDF", $"int_col1", $"int_col2"))
bigDF.show(false)
+--------+--------+--------+
|int_col1|int_col2|json_col|
+--------+--------+--------+
|18995 |12702 |{} |
|14989 |46998 |{} |
|25588 |25051 |{} |
|18750 |52282 |{} |
|19963 |25745 |{} |
|17500 |21587 |{} |
|21999 |20379 |{} |
|25975 |5988 |{} |
|26382 |5988 |{} |
|7049 |101907 |{} |
|45997 |47472 |{} |
|45997 |47472 |{} |
|13950 |158957 |{} |
|18999 |123689 |{} |
|33842 |69623 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
|64000 |13362 |{} |
+--------+--------+--------+
only showing top 20 rows
(版本:java1.8.0_60,spark 2.2.0,scala 2.11.8)
关于为什么我在使用较大的 DataFrame 时得到一个空 JSON 对象有什么想法吗?
TBH 我不知道到底出了什么问题,因为我预计在某个时候会出现 Task not serializable
错误。我最好的猜测是 getJSON
不是线程安全的,并且在要编码的对象的某个地方保持集中状态。同样,这可能是完全错误的
我认为您的方法可以更好,通过使用 Spark 的 to_json
函数。
def getHistoryReader = udf((val1:Int, val2:Int) => {
val today = LocalDate.now
val hdl = List(new HistoryDetail(today.toString, val1, val2, "D"))
new HistoryHeader(1,today.toString,hdl)
})
val bigDF = spark.read.table("table_name")
.select($"int_col1",$"int_col2")
.withColumn("json_col",to_json(getHistoryReader($"int_col1", $"int_col2")))
bigDF.show(false)
在我看来看起来更简洁,并将 json 序列化留给 Spark。 这应该有效