如何将消息从套接字流源转换为自定义域对象?

How to convert messages from socket streaming source to custom domain object?

我对 Spark Streaming 还很陌生。我有一个 Spark Standalone 2.2 运行 一个工人。我正在使用套接字源并尝试将传入流读入一个名为 MicroserviceMessage 的对象中。

val message = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .load()

val df = message.as[MicroserviceMessage].flatMap(microserviceMessage =>
    microserviceMessage.DataPoints.map(datapoint => (datapoint, microserviceMessage.ServiceProperties, datapoint.EpochUTC)))
  .toDF("datapoint", "properties", "timestamp")

我希望这将是一个包含 "datapoint"、"properties" 和 "timestamp"

列的 DataFrame

我粘贴到我的 netcat 终端的数据看起来像这样(这是我试图读入的 MicroserviceMessage):

{
  "SystemType": "mytype",
  "SystemGuid": "6c84fb90-12c4-11e1-840d-7b25c5ee775a",
  "TagType": "Raw Tags",
  "ServiceType": "FILTER",
  "DataPoints": [
    {
      "TagName": "013FIC003.PV",
      "EpochUTC": 1505247956001,
      "ItemValue": 25.47177,
      "ItemValueStr": "NORMAL",
      "Quality": "Good",
      "TimeOffset": "P0000"
    },
    {
      "TagName": "013FIC003.PV",
      "EpochUTC": 1505247956010,
      "ItemValue": 26.47177,
      "ItemValueStr": "NORMAL",
      "Quality": "Good",
      "TimeOffset": "P0000"
    }
  ],
  "ServiceProperties": [
    {
      "Key": "OutputTagName",
      "Value": "FI12102.PV_CL"
    },
    {
      "Key": "OutputTagType",
      "Value": "Cleansing Flow Tags"
    }
  ]
}

相反,我看到的是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`SystemType`' given input columns: [value];

MicroserviceMessage 案例class 看起来像这样:

case class DataPoints
(
  TagName: String,
  EpochUTC: Double,
  ItemValue: Double,
  ItemValueStr: String,
  Quality: String,
  TimeOffset: String
)

case class ServiceProperties
(
  Key: String,
  Value: String
)

case class MicroserviceMessage
(
  SystemType: String,
  SystemGuid: String,
  TagType: String,
  ServiceType: String,
  DataPoints: List[DataPoints],
  ServiceProperties: List[ServiceProperties]
)

编辑: 阅读 this post 后,我可以通过

开始工作
val messageEncoder = Encoders.bean(classOf[MicroserviceMessage])

val df = message.select($"value").as(messageEncoder).map(
  msmg => (msmg.ServiceType, msmg.SystemGuid)
).toDF("service", "guid")

但这会导致我开始发送数据时出现问题。

Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize

完整 stacktrace

这个:

message.as[MicroserviceMessage]

如错误消息所述不正确:

cannot resolve 'SystemType' given input columns: [value];

来自 SocketStream 的数据只是字符串(或字符串和时间戳)。要使其可用于强类型 Dataset,您必须解析它,例如使用 org.apache.spark.sql.functions.from_json.

异常原因

Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize

您是否使用 Spark 2.2 不支持的 Scala 2.12.4(或 2.12 流中的任何其他流)编译了 Spark Structured Streaming 应用程序。

来自 scala.runtime.LambdaDeserializer 的 scaladoc:

This class is only intended to be called by synthetic $deserializeLambda$ method that the Scala 2.12 compiler will add to classes hosting lambdas.

Spark 2.2 最高支持并包括 Scala 2.11.12,其中 2.11.8 是最多 "blessed" 版本。