如何使用 Spark Dataset 和 UDF 分析类型不匹配错误
How to analyse type mismatch error with Spark Dataset and UDF
我正在处理 2 个 CSV 文件以加入数据并使用 json4s 库生成 JSON 有效负载。我在使用 UDF 映射 spark 数据集行时遇到了问题。
我尝试创建一个简单的 UDF 接受行并返回硬编码值。问题依旧。
val station_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/station_data.csv").drop("lat").drop("long").drop("dockcount").drop("installation")
val trip_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/trip_data.csv").drop("Start Date").drop("End Date").drop("Subscriber Type").drop("Zip Code")
val getConcatenated = udf((first: String, second: String) => {
first + "," + second
})
val StatStationData = trip_data.join(station_data, col("Start Terminal") === col("station_id"), "inner").withColumn("Start Station", col("name")).withColumn("StartStationlandmark", col("landmark")).drop("name").drop("Start Terminal").drop("station_id").drop("landmark")
val FinalData = StatStationData.join(station_data, col("End Terminal") === col("station_id"), "inner").withColumn("End Station", col("name")).withColumn("Final landmark", when(col("landmark") === col("StartStationlandmark"), col("landmark")).otherwise(getConcatenated($"landmark", $"StartStationlandmark"))).drop("name").drop(("End Terminal")).drop("station_id").drop("landmark").drop("StartStationlandmark")
val FinalDataDf = FinalData.withColumn("TripID", col("Trip ID")).withColumn("EndStation", col("End Station")).withColumn("landmark", split(col("Final landmark"), "\,")).withColumn("Bike", col("Bike #")).withColumn("StartStation", col("Start Station")).drop("Trip ID").drop("End Station").drop("Final landmark").drop("Bike #").drop("Start Station")
FinalDataDf.show(false)
case class FinalDataStruct(TripID: String, Duration: String, Bike: String, StartStation: String, EndStation: String, landmark: String)
val encoder = org.apache.spark.sql.Encoders.product[FinalDataStruct]
val FinalDataDS = FinalDataDf.as(encoder)
FinalDataDS.show(false)
import spark.sqlContext.implicits._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
def convertRowToJSON(row: Row) = {
val json =
("bike" -> row(3).toString) ~
("start_station" -> row(4).toString) ~
("end_station" -> row(5).toString) ~
("landmarks" -> row(6).toString) ~
("total_duration" -> row(2).toString)
(row(1).toString, compact(render(json)).toString)
}
val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
// To Test
def convertRowToJSONTtry(row: Row) = {
(11, "Hello".toString)
}
val JsonPlayloadDataTest1 = FinalDataDS.map(convertRowToJSONTtry)
我得到的错误是:
scala> val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
<console>:42: error: type mismatch;
found : org.apache.spark.sql.Row => (String, String)
required: FinalDataStruct => ?
val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
错误消息告诉您几乎所有需要了解的信息。您定义的函数是 Row => (String, String)
而您映射 Dataset[FinalDataStruct]
(这不是 udf)并且需要 FinalDataStruct => ?
.
如果你想使用这个应用在 DataFrame
:
FinalDataDf.map(convertRowToJSON)
在 Dataset[FinalDataStruct]
使用:
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
FinalDataDS.map { x =>
implicit val formats = DefaultFormats
(x.TripID, write(x))
}
尽管在实践中最好用 to_json
调用替换地图 - .
另外请注意,Rows
是从 0 而不是 1 开始索引的。
我正在处理 2 个 CSV 文件以加入数据并使用 json4s 库生成 JSON 有效负载。我在使用 UDF 映射 spark 数据集行时遇到了问题。
我尝试创建一个简单的 UDF 接受行并返回硬编码值。问题依旧。
val station_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/station_data.csv").drop("lat").drop("long").drop("dockcount").drop("installation")
val trip_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/trip_data.csv").drop("Start Date").drop("End Date").drop("Subscriber Type").drop("Zip Code")
val getConcatenated = udf((first: String, second: String) => {
first + "," + second
})
val StatStationData = trip_data.join(station_data, col("Start Terminal") === col("station_id"), "inner").withColumn("Start Station", col("name")).withColumn("StartStationlandmark", col("landmark")).drop("name").drop("Start Terminal").drop("station_id").drop("landmark")
val FinalData = StatStationData.join(station_data, col("End Terminal") === col("station_id"), "inner").withColumn("End Station", col("name")).withColumn("Final landmark", when(col("landmark") === col("StartStationlandmark"), col("landmark")).otherwise(getConcatenated($"landmark", $"StartStationlandmark"))).drop("name").drop(("End Terminal")).drop("station_id").drop("landmark").drop("StartStationlandmark")
val FinalDataDf = FinalData.withColumn("TripID", col("Trip ID")).withColumn("EndStation", col("End Station")).withColumn("landmark", split(col("Final landmark"), "\,")).withColumn("Bike", col("Bike #")).withColumn("StartStation", col("Start Station")).drop("Trip ID").drop("End Station").drop("Final landmark").drop("Bike #").drop("Start Station")
FinalDataDf.show(false)
case class FinalDataStruct(TripID: String, Duration: String, Bike: String, StartStation: String, EndStation: String, landmark: String)
val encoder = org.apache.spark.sql.Encoders.product[FinalDataStruct]
val FinalDataDS = FinalDataDf.as(encoder)
FinalDataDS.show(false)
import spark.sqlContext.implicits._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
def convertRowToJSON(row: Row) = {
val json =
("bike" -> row(3).toString) ~
("start_station" -> row(4).toString) ~
("end_station" -> row(5).toString) ~
("landmarks" -> row(6).toString) ~
("total_duration" -> row(2).toString)
(row(1).toString, compact(render(json)).toString)
}
val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
// To Test
def convertRowToJSONTtry(row: Row) = {
(11, "Hello".toString)
}
val JsonPlayloadDataTest1 = FinalDataDS.map(convertRowToJSONTtry)
我得到的错误是:
scala> val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
<console>:42: error: type mismatch;
found : org.apache.spark.sql.Row => (String, String)
required: FinalDataStruct => ?
val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
错误消息告诉您几乎所有需要了解的信息。您定义的函数是 Row => (String, String)
而您映射 Dataset[FinalDataStruct]
(这不是 udf)并且需要 FinalDataStruct => ?
.
如果你想使用这个应用在 DataFrame
:
FinalDataDf.map(convertRowToJSON)
在 Dataset[FinalDataStruct]
使用:
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
FinalDataDS.map { x =>
implicit val formats = DefaultFormats
(x.TripID, write(x))
}
尽管在实践中最好用 to_json
调用替换地图 -
另外请注意,Rows
是从 0 而不是 1 开始索引的。