为什么使用 MQTT 源查询会抛出 ClassCastException "SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset"?
Why does query throw ClassCastException "SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset" with MQTT Source?
我在使用 Spark Structured Streaming 代码时遇到以下异常
18/12/05 15:00:38 ERROR StreamExecution: Query [id = 48ec92a0-811a-4d57-a65d-c0b9c754e093, runId = 5e2adff4-855e-46c6-8592-05e3557544c6] terminated with error
java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:152)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$$anonfun$apply.apply(StreamExecution.scala:614)
每次启动查询时都会出现此异常。当我删除检查点后启动它时它确实有效。
Spark Structured 流代码如下,基本上我只是从 MQTT 队列读取数据并写入 ElasticSearch 索引。
spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "password")
.option("clientId", "employee11")
.load("tcp://localhost:8000")
.as[(String, Timestamp)]
.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
以下是使用的依赖项。我使用 MapR 分布。
"org.apache.spark" %% "spark-core" % "2.2.1-mapr-1803",
"org.apache.spark" %% "spark-sql" % "2.2.1-mapr-1803",
"org.apache.spark" %% "spark-streaming" % "2.2.1-mapr-1803",
"org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.2.1",
"org.apache.bahir" %% "spark-streaming-mqtt" % "2.2.1",
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.3.2"
Spark-提交命令
/opt/mapr/spark/spark-2.2.1/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--jars spark-sql-streaming-mqtt_2.11-2.2.1.jar,org.eclipse.paho.client.mqttv3-1.1.0.jar,elasticsearch-spark-20_2.11-6.3.2.jar,mail-1.4.7.jar myjar_2.11-0.1.jar \
--class <MAIN_CLASS>
如有任何帮助,我们将不胜感激。
在 Apache Bahir 中似乎 bug。
我在使用 Spark Structured Streaming 代码时遇到以下异常
18/12/05 15:00:38 ERROR StreamExecution: Query [id = 48ec92a0-811a-4d57-a65d-c0b9c754e093, runId = 5e2adff4-855e-46c6-8592-05e3557544c6] terminated with error java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:152) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$$anonfun$apply.apply(StreamExecution.scala:614)
每次启动查询时都会出现此异常。当我删除检查点后启动它时它确实有效。
Spark Structured 流代码如下,基本上我只是从 MQTT 队列读取数据并写入 ElasticSearch 索引。
spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "password")
.option("clientId", "employee11")
.load("tcp://localhost:8000")
.as[(String, Timestamp)]
.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
以下是使用的依赖项。我使用 MapR 分布。
"org.apache.spark" %% "spark-core" % "2.2.1-mapr-1803",
"org.apache.spark" %% "spark-sql" % "2.2.1-mapr-1803",
"org.apache.spark" %% "spark-streaming" % "2.2.1-mapr-1803",
"org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.2.1",
"org.apache.bahir" %% "spark-streaming-mqtt" % "2.2.1",
"org.elasticsearch" %% "elasticsearch-spark-20" % "6.3.2"
Spark-提交命令
/opt/mapr/spark/spark-2.2.1/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--jars spark-sql-streaming-mqtt_2.11-2.2.1.jar,org.eclipse.paho.client.mqttv3-1.1.0.jar,elasticsearch-spark-20_2.11-6.3.2.jar,mail-1.4.7.jar myjar_2.11-0.1.jar \
--class <MAIN_CLASS>
如有任何帮助,我们将不胜感激。
在 Apache Bahir 中似乎 bug。