如何将消息从套接字流源转换为自定义域对象?
How to convert messages from socket streaming source to custom domain object?
我对 Spark Streaming 还很陌生。我有一个 Spark Standalone 2.2 运行 一个工人。我正在使用套接字源并尝试将传入流读入一个名为 MicroserviceMessage 的对象中。
val message = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
val df = message.as[MicroserviceMessage].flatMap(microserviceMessage =>
microserviceMessage.DataPoints.map(datapoint => (datapoint, microserviceMessage.ServiceProperties, datapoint.EpochUTC)))
.toDF("datapoint", "properties", "timestamp")
我希望这将是一个包含 "datapoint"、"properties" 和 "timestamp"
列的 DataFrame
我粘贴到我的 netcat 终端的数据看起来像这样(这是我试图读入的 MicroserviceMessage):
{
"SystemType": "mytype",
"SystemGuid": "6c84fb90-12c4-11e1-840d-7b25c5ee775a",
"TagType": "Raw Tags",
"ServiceType": "FILTER",
"DataPoints": [
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956001,
"ItemValue": 25.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
},
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956010,
"ItemValue": 26.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
}
],
"ServiceProperties": [
{
"Key": "OutputTagName",
"Value": "FI12102.PV_CL"
},
{
"Key": "OutputTagType",
"Value": "Cleansing Flow Tags"
}
]
}
相反,我看到的是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`SystemType`' given input columns: [value];
MicroserviceMessage 案例class 看起来像这样:
case class DataPoints
(
TagName: String,
EpochUTC: Double,
ItemValue: Double,
ItemValueStr: String,
Quality: String,
TimeOffset: String
)
case class ServiceProperties
(
Key: String,
Value: String
)
case class MicroserviceMessage
(
SystemType: String,
SystemGuid: String,
TagType: String,
ServiceType: String,
DataPoints: List[DataPoints],
ServiceProperties: List[ServiceProperties]
)
编辑:
阅读 this post 后,我可以通过
开始工作
val messageEncoder = Encoders.bean(classOf[MicroserviceMessage])
val df = message.select($"value").as(messageEncoder).map(
msmg => (msmg.ServiceType, msmg.SystemGuid)
).toDF("service", "guid")
但这会导致我开始发送数据时出现问题。
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
完整 stacktrace
这个:
message.as[MicroserviceMessage]
如错误消息所述不正确:
cannot resolve 'SystemType
' given input columns: [value];
来自 SocketStream
的数据只是字符串(或字符串和时间戳)。要使其可用于强类型 Dataset
,您必须解析它,例如使用 org.apache.spark.sql.functions.from_json
.
异常原因
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
您是否使用 Spark 2.2 不支持的 Scala 2.12.4(或 2.12 流中的任何其他流)编译了 Spark Structured Streaming 应用程序。
来自 scala.runtime.LambdaDeserializer 的 scaladoc:
This class is only intended to be called by synthetic $deserializeLambda$ method that the Scala 2.12 compiler will add to classes hosting lambdas.
Spark 2.2 最高支持并包括 Scala 2.11.12,其中 2.11.8 是最多 "blessed" 版本。
我对 Spark Streaming 还很陌生。我有一个 Spark Standalone 2.2 运行 一个工人。我正在使用套接字源并尝试将传入流读入一个名为 MicroserviceMessage 的对象中。
val message = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
val df = message.as[MicroserviceMessage].flatMap(microserviceMessage =>
microserviceMessage.DataPoints.map(datapoint => (datapoint, microserviceMessage.ServiceProperties, datapoint.EpochUTC)))
.toDF("datapoint", "properties", "timestamp")
我希望这将是一个包含 "datapoint"、"properties" 和 "timestamp"
列的 DataFrame我粘贴到我的 netcat 终端的数据看起来像这样(这是我试图读入的 MicroserviceMessage):
{
"SystemType": "mytype",
"SystemGuid": "6c84fb90-12c4-11e1-840d-7b25c5ee775a",
"TagType": "Raw Tags",
"ServiceType": "FILTER",
"DataPoints": [
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956001,
"ItemValue": 25.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
},
{
"TagName": "013FIC003.PV",
"EpochUTC": 1505247956010,
"ItemValue": 26.47177,
"ItemValueStr": "NORMAL",
"Quality": "Good",
"TimeOffset": "P0000"
}
],
"ServiceProperties": [
{
"Key": "OutputTagName",
"Value": "FI12102.PV_CL"
},
{
"Key": "OutputTagType",
"Value": "Cleansing Flow Tags"
}
]
}
相反,我看到的是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`SystemType`' given input columns: [value];
MicroserviceMessage 案例class 看起来像这样:
case class DataPoints
(
TagName: String,
EpochUTC: Double,
ItemValue: Double,
ItemValueStr: String,
Quality: String,
TimeOffset: String
)
case class ServiceProperties
(
Key: String,
Value: String
)
case class MicroserviceMessage
(
SystemType: String,
SystemGuid: String,
TagType: String,
ServiceType: String,
DataPoints: List[DataPoints],
ServiceProperties: List[ServiceProperties]
)
编辑: 阅读 this post 后,我可以通过
开始工作val messageEncoder = Encoders.bean(classOf[MicroserviceMessage])
val df = message.select($"value").as(messageEncoder).map(
msmg => (msmg.ServiceType, msmg.SystemGuid)
).toDF("service", "guid")
但这会导致我开始发送数据时出现问题。
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
完整 stacktrace
这个:
message.as[MicroserviceMessage]
如错误消息所述不正确:
cannot resolve '
SystemType
' given input columns: [value];
来自 SocketStream
的数据只是字符串(或字符串和时间戳)。要使其可用于强类型 Dataset
,您必须解析它,例如使用 org.apache.spark.sql.functions.from_json
.
异常原因
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize
您是否使用 Spark 2.2 不支持的 Scala 2.12.4(或 2.12 流中的任何其他流)编译了 Spark Structured Streaming 应用程序。
来自 scala.runtime.LambdaDeserializer 的 scaladoc:
This class is only intended to be called by synthetic $deserializeLambda$ method that the Scala 2.12 compiler will add to classes hosting lambdas.
Spark 2.2 最高支持并包括 Scala 2.11.12,其中 2.11.8 是最多 "blessed" 版本。