Spark Streaming:foreachRDD 更新我的 mongo RDD
Spark Streaming: foreachRDD update my mongo RDD
我想在每次进入 foreachRDD
时创建一个 new mongodb RDD。但是我有序列化问题:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
这会给我一个错误:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
有什么想法吗?
根据我的理解,如果你有一个 "not serializable" 对象,你必须添加,你需要通过 foreachPartition
传递它,这样你就可以在 [=15= 之前在每个节点上建立数据库连接] 你的处理。
mydstream.foreachRDD(rdd => {
rdd.foreachPartition{
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})
您可以尝试使用 rdd.context returns SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)。
mydstream foreachRDD { rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })
其实RDD好像也有.sparkContext
方法。老实说,我不知道区别,也许它们是别名(?)。
我想在每次进入 foreachRDD
时创建一个 new mongodb RDD。但是我有序列化问题:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
这会给我一个错误:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
有什么想法吗?
根据我的理解,如果你有一个 "not serializable" 对象,你必须添加,你需要通过 foreachPartition
传递它,这样你就可以在 [=15= 之前在每个节点上建立数据库连接] 你的处理。
mydstream.foreachRDD(rdd => {
rdd.foreachPartition{
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})
您可以尝试使用 rdd.context returns SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)。
mydstream foreachRDD { rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })
其实RDD好像也有.sparkContext
方法。老实说,我不知道区别,也许它们是别名(?)。