无法在转换中访问广播变量

Can't access broadcast variable in transformation

我在从转换函数内部访问变量时遇到问题。有人可以帮帮我吗? 这是我的相关 类 和函数。

@SerialVersionUID(889949215L)
object MyCache extends Serializable {
    @transient lazy val logger = Logger(getClass.getName)
    @volatile var cache: Broadcast[Map[UUID, Definition]] = null

    def getInstance(sparkContext: SparkContext) : Broadcast[Map[UUID, Definition]] = {
        if (cache == null) {
            synchronized {
                val map = sparkContext.cassandraTable("keyspace", "table")
                   .collect()
                   .map(m => m.getUUID("id") ->
                        Definition(m.getString("c1"), m.getString("c2"), m.getString("c3"),
                                m.getString("c4"))).toMap
                cache = sparkContext.broadcast(map)
            }
        }
        cache
    }
}

在不同的文件中:

object Processor extends Serializable {
    @transient lazy val logger = Logger(getClass.getName)

    def processData[T: ClassTag](rawStream: DStream[(String, String)], ssc: StreamingContext,
                                        processor: (String, Broadcast[Map[UUID, Definition]]) => T): DStream[T] = {
        MYCache.getInstance(ssc.sparkContext)
        var newCacheValues = Map[UUID, Definition]()
        rawStream.cache()
        rawStream
          .transform(rdd => {
                val array = rdd.collect()
                array.foreach(r => {
                      val value = getNewCacheValue(r._2, rdd.context)
                      if (value.isDefined) {
                          newCacheValues = newCacheValues + value.get
                      }
                })
                rdd
           })
       if (newCacheValues.nonEmpty) {
           logger.info(s"Rebroadcasting.  There are ${newCacheValues.size} new values")
           logger.info("Destroying old cache")
           MyCache.cache.destroy()
           // this is probably wrong here, destroying object, but then referencing it.  But I haven't gotten to this part yet.
           MyCache.cache = ssc.sparkContext.broadcast(MyCache.cache.value ++ newCacheValues)
       }
       rawStream
          .map(r => {
               println("######################")
               println(MyCache.cache.value)
               r
          })
          .map(r => processor(r._2, MyCache.cache.value))
          .filter(r => null != r)
   }
}

每次我 运行 尝试访问 cache.value

时,我都会得到 SparkException: Failed to get broadcast_1_piece0 of broadcast_1

当我在 .getInstance 之后添加 println(MyCache.cache.values) 时,我能够访问广播变量,但是当我将它部署到 mesos 集群时,我无法访问广播值再次出现,但出现空指针异常。

更新:

我看到的错误是 println(MyCache.cache.value)。我不应该添加这个包含 destroy 的 if 语句,因为我的测试从来没有碰到过它。

我的应用程序的基础是,我在 cassandra 中有一个 table,不会经常更新。但是我需要对一些流数据进行一些验证。所以我想从这个 table 中提取所有数据,这些数据更新不多,进入内存。 getInstance 在启动时拉入整个 table ,然后我检查我所有的流数据,看看我是否需要再次从 cassandra 拉出(我很少需要这样做)。 transform 和 collect 是我检查是否需要拉入新数据的地方。但是由于我的 table 有可能会更新,所以我需要偶尔更新广播。所以我的想法是销毁它然后重新播放。一旦我让其他东西工作,我会更新它。

如果我注释掉销毁和重播,我会得到同样的错误。

另一个更新:

我需要在 processor 这一行访问广播变量:.map(r => processor(r._2, MyCache.cache.value)).

我可以在转换中广播变量,如果我在转换中执行 println(MyCache.cache.value),那么我的所有测试都通过了,然后我可以在 [=19= 中访问广播]

更新:

rawStream
    .map(r => {
      println("$$$$$$$$$$$$$$$$$$$")
      println(metrics.value)
      r
    })

这是我遇到此行时得到的堆栈跟踪。

    ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 135.0 (TID 114)
    java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
        at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
        at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:160)
        at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:158)
        at scala.collection.Iterator$$anon.next(Iterator.scala:370)
        at scala.collection.Iterator$$anon.next(Iterator.scala:370)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:414)
        at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
        at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:137)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:175)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
        ... 24 more

[更新答案]

您收到错误是因为 rawStream.map 中的代码,即 MyCache.cache.value 正在其中一个执行程序上执行,而 MyCache.cache 仍然是 null

当您执行 MyCache.getInstance 时,它会在驱动程序 上创建 MyCache.cache 并正常广播。但是你没有在你的 map 方法中引用同一个对象,所以它不会被发送给执行者。相反,由于您直接引用 MyCache,因此执行程序会在他们自己的 MyCache 对象副本上调用 MyCache.cache,这显然是空的。

您可以通过首先在驱动程序中获取 cache 广播对象的实例并在地图中使用 that 对象来使其按预期工作。以下代码应该适合您 --

val cache = MYCache.getInstance(ssc.sparkContext)
rawStream.map(r => {
                     println(cache.value)
                     r
             })