Azure DataBricks Stream foreach 因 NotSerializableException 而失败
Azure DataBricks Stream foreach fails with NotSerializableException
我想不断细化数据集流的行(最初由 Kafka 发起):基于我想更新 Radis 哈希的条件。这是我的代码片段(lastContacts
是前一个命令的结果,它是这种类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
。这扩展为 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
谁能解释一下为什么会出现这种异常,以及如何避免?谢谢!
这个问题与以下两个问题有关:
Spark 上下文不可序列化。
ForeachWriter 的任何实现都必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化-反序列化副本。因此,强烈建议在调用 open(...) 方法后完成写入数据的任何初始化(例如打开连接或启动事务),这表示任务已准备好生成数据。
在您的代码中,您试图在 process 方法中使用 spark 上下文,
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
*sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
}
要向redis发送数据,需要自己创建连接并在open方法中打开,然后在process方法中使用。
看看如何创建redis连接池。 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
我想不断细化数据集流的行(最初由 Kafka 发起):基于我想更新 Radis 哈希的条件。这是我的代码片段(lastContacts
是前一个命令的结果,它是这种类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
。这扩展为 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
谁能解释一下为什么会出现这种异常,以及如何避免?谢谢!
这个问题与以下两个问题有关:
Spark 上下文不可序列化。
ForeachWriter 的任何实现都必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化-反序列化副本。因此,强烈建议在调用 open(...) 方法后完成写入数据的任何初始化(例如打开连接或启动事务),这表示任务已准备好生成数据。
在您的代码中,您试图在 process 方法中使用 spark 上下文,
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
*sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
}
要向redis发送数据,需要自己创建连接并在open方法中打开,然后在process方法中使用。
看看如何创建redis连接池。 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala