为什么 Spark DataSet 会丢失所有模式并只返回 byte[]?
Why Spark DataSet loses all its schema and just returning byte[]?
我创建我的 SparkSession 并以这种方式注册 kryo classes:
val sparkConf = new SparkConf()
.setAppName("bd-dq-spark")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.driver.host", "127.0.0.1")
.registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession =
SparkSession.builder()
.master("local[*]")
.config(sparkConf)
.getOrCreate()
我这样定义我的案例class:
object Model {
type Timestamp = Long
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: EventType,
timestamp: Timestamp,
temperature: Double
)
object EventType extends Enumeration {
final type EventType = Value
val TEMPERATURE_CHANGE: EventType.Value = Value
}
}
我这样准备假数据:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
我的主要内容是:
def main(args: Array[String]): Unit = {
implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
.createDataset(heatSensorEventData).as[HeatSensorEvent]
heatSensorEventDs.show
heatSensorEventDs.printSchema()
}
但我得到的只有这个:
+--------------------+
| value|
+--------------------+
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
+--------------------+
root
|-- value: binary (nullable = true)
我的问题是为什么我丢失了所有架构并且无法显示正常数据?我该如何解决这个问题?
将编码器与对象一起使用时,列可以转换为单个二进制列,这使得无法使用 dataset.show()
检查值
查看 approaches how to solve this, which was originated from this post(不幸的是,这是一个 http link)。
定义你的 类:
type Timestamp = Long
object Events {
sealed case class EventType(value: String)
object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: Events.EventType,
timestamp: Timestamp,
temperature: Double
)
创建您的数据:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
现在您可以看到您的数据集:
val ds = heatSensorEventData.toDS()
ds.show()
输出:
+-------+--------+--------+--------------------+----------+-----------+
|eventId|sensorId|deviceId| eventType| timestamp|temperature|
+-------+--------+--------+--------------------+----------+-----------+
| 123| s1| d1|[TEMPERATURE_CHANGE]|1619555389| 85.41|
| 234| s1| d1|[TEMPERATURE_CHANGE]|1619555419| 60.41|
| 345| s1| d1|[TEMPERATURE_CHANGE]|1619556389| 60.41|
| 567| s1| d1|[TEMPERATURE_CHANGE]|1619557389| 50.41|
+-------+--------+--------+--------------------+----------+-----------+
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]
在 spark 中使用枚举已 requested,并且在没有修复的情况下关闭。这样做的好处是您不需要使用自定义编码器。
我创建我的 SparkSession 并以这种方式注册 kryo classes:
val sparkConf = new SparkConf()
.setAppName("bd-dq-spark")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.driver.host", "127.0.0.1")
.registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession =
SparkSession.builder()
.master("local[*]")
.config(sparkConf)
.getOrCreate()
我这样定义我的案例class:
object Model {
type Timestamp = Long
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: EventType,
timestamp: Timestamp,
temperature: Double
)
object EventType extends Enumeration {
final type EventType = Value
val TEMPERATURE_CHANGE: EventType.Value = Value
}
}
我这样准备假数据:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
我的主要内容是:
def main(args: Array[String]): Unit = {
implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
.createDataset(heatSensorEventData).as[HeatSensorEvent]
heatSensorEventDs.show
heatSensorEventDs.printSchema()
}
但我得到的只有这个:
+--------------------+
| value|
+--------------------+
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
+--------------------+
root
|-- value: binary (nullable = true)
我的问题是为什么我丢失了所有架构并且无法显示正常数据?我该如何解决这个问题?
将编码器与对象一起使用时,列可以转换为单个二进制列,这使得无法使用 dataset.show()
查看 approaches how to solve this, which was originated from this post(不幸的是,这是一个 http link)。
定义你的 类:
type Timestamp = Long
object Events {
sealed case class EventType(value: String)
object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: Events.EventType,
timestamp: Timestamp,
temperature: Double
)
创建您的数据:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
现在您可以看到您的数据集:
val ds = heatSensorEventData.toDS()
ds.show()
输出:
+-------+--------+--------+--------------------+----------+-----------+
|eventId|sensorId|deviceId| eventType| timestamp|temperature|
+-------+--------+--------+--------------------+----------+-----------+
| 123| s1| d1|[TEMPERATURE_CHANGE]|1619555389| 85.41|
| 234| s1| d1|[TEMPERATURE_CHANGE]|1619555419| 60.41|
| 345| s1| d1|[TEMPERATURE_CHANGE]|1619556389| 60.41|
| 567| s1| d1|[TEMPERATURE_CHANGE]|1619557389| 50.41|
+-------+--------+--------+--------------------+----------+-----------+
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]
在 spark 中使用枚举已 requested,并且在没有修复的情况下关闭。这样做的好处是您不需要使用自定义编码器。