为什么 Spark DataSet 会丢失所有模式并只返回 byte[]?

Why Spark DataSet loses all its schema and just returning byte[]?

我创建我的 SparkSession 并以这种方式注册 kryo classes:

val sparkConf = new SparkConf()
    .setAppName("bd-dq-spark")
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrationRequired", "true")
    .set("spark.driver.host", "127.0.0.1")
    .registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession = 
   SparkSession.builder()
     .master("local[*]")
     .config(sparkConf)
     .getOrCreate()

我这样定义我的案例class:

object Model {
  type Timestamp = Long
  case class HeatSensorEvent(
                              eventId: String,
                              sensorId: String,
                              deviceId: String,
                              eventType: EventType,
                              timestamp: Timestamp,
                              temperature: Double
                            )
  object EventType extends Enumeration {
    final type EventType = Value
    val TEMPERATURE_CHANGE: EventType.Value = Value
  }
}

我这样准备假数据:

  val heatSensorEventData = Seq(
    HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
    HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
    HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
    HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
  )

我的主要内容是:

def main(args: Array[String]): Unit = {
    implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
    implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
    val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
      .createDataset(heatSensorEventData).as[HeatSensorEvent]
    heatSensorEventDs.show
    heatSensorEventDs.printSchema()
}

但我得到的只有这个:

+--------------------+
|               value|
+--------------------+
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
+--------------------+

root
 |-- value: binary (nullable = true)

我的问题是为什么我丢失了所有架构并且无法显示正常数据?我该如何解决这个问题?

将编码器与对象一起使用时,列可以转换为单个二进制列,这使得无法使用 dataset.show()

检查值

查看 approaches how to solve this, which was originated from this post(不幸的是,这是一个 http link)。

定义你的 类:

type Timestamp = Long  
object Events {
  sealed case class EventType(value: String)
  object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
  val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}

case class HeatSensorEvent(
                            eventId: String,
                            sensorId: String,
                            deviceId: String,
                            eventType: Events.EventType,
                            timestamp: Timestamp,
                            temperature: Double
                          )

创建您的数据:

val heatSensorEventData = Seq(
  HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
  HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
  HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
  HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)

现在您可以看到您的数据集:

val ds = heatSensorEventData.toDS()
ds.show()

输出:

+-------+--------+--------+--------------------+----------+-----------+
|eventId|sensorId|deviceId|           eventType| timestamp|temperature|
+-------+--------+--------+--------------------+----------+-----------+
|    123|      s1|      d1|[TEMPERATURE_CHANGE]|1619555389|      85.41|
|    234|      s1|      d1|[TEMPERATURE_CHANGE]|1619555419|      60.41|
|    345|      s1|      d1|[TEMPERATURE_CHANGE]|1619556389|      60.41|
|    567|      s1|      d1|[TEMPERATURE_CHANGE]|1619557389|      50.41|
+-------+--------+--------+--------------------+----------+-----------+
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]

在 spark 中使用枚举已 requested,并且在没有修复的情况下关闭。这样做的好处是您不需要使用自定义编码器。