Spark Streaming:foreachRDD 更新我的 mongo RDD

Spark Streaming: foreachRDD update my mongo RDD

我想在每次进入 foreachRDD 时创建一个 new mongodb RDD。但是我有序列化问题:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

这会给我一个错误:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)

有什么想法吗?

根据我的理解,如果你有一个 "not serializable" 对象,你必须添加,你需要通过 foreachPartition 传递它,这样你就可以在 [=15= 之前在每个节点上建立数据库连接] 你的处理。

mydstream.foreachRDD(rdd => {
        rdd.foreachPartition{
          val mongoClient = MongoClient("localhost", 27017)
          val db = mongoClient(mongoDatabase)
          val coll = db(mongoCollection)
          // ssc is my StreamingContext
          val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})

您可以尝试使用 rdd.context returns SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)。

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

其实RDD好像也有.sparkContext方法。老实说,我不知道区别,也许它们是别名(?)。