无法在转换中访问广播变量
Can't access broadcast variable in transformation
我在从转换函数内部访问变量时遇到问题。有人可以帮帮我吗?
这是我的相关 类 和函数。
@SerialVersionUID(889949215L)
object MyCache extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
@volatile var cache: Broadcast[Map[UUID, Definition]] = null
def getInstance(sparkContext: SparkContext) : Broadcast[Map[UUID, Definition]] = {
if (cache == null) {
synchronized {
val map = sparkContext.cassandraTable("keyspace", "table")
.collect()
.map(m => m.getUUID("id") ->
Definition(m.getString("c1"), m.getString("c2"), m.getString("c3"),
m.getString("c4"))).toMap
cache = sparkContext.broadcast(map)
}
}
cache
}
}
在不同的文件中:
object Processor extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
def processData[T: ClassTag](rawStream: DStream[(String, String)], ssc: StreamingContext,
processor: (String, Broadcast[Map[UUID, Definition]]) => T): DStream[T] = {
MYCache.getInstance(ssc.sparkContext)
var newCacheValues = Map[UUID, Definition]()
rawStream.cache()
rawStream
.transform(rdd => {
val array = rdd.collect()
array.foreach(r => {
val value = getNewCacheValue(r._2, rdd.context)
if (value.isDefined) {
newCacheValues = newCacheValues + value.get
}
})
rdd
})
if (newCacheValues.nonEmpty) {
logger.info(s"Rebroadcasting. There are ${newCacheValues.size} new values")
logger.info("Destroying old cache")
MyCache.cache.destroy()
// this is probably wrong here, destroying object, but then referencing it. But I haven't gotten to this part yet.
MyCache.cache = ssc.sparkContext.broadcast(MyCache.cache.value ++ newCacheValues)
}
rawStream
.map(r => {
println("######################")
println(MyCache.cache.value)
r
})
.map(r => processor(r._2, MyCache.cache.value))
.filter(r => null != r)
}
}
每次我 运行 尝试访问 cache.value
时,我都会得到 SparkException: Failed to get broadcast_1_piece0 of broadcast_1
当我在 .getInstance
之后添加 println(MyCache.cache.values)
时,我能够访问广播变量,但是当我将它部署到 mesos 集群时,我无法访问广播值再次出现,但出现空指针异常。
更新:
我看到的错误是 println(MyCache.cache.value)
。我不应该添加这个包含 destroy 的 if 语句,因为我的测试从来没有碰到过它。
我的应用程序的基础是,我在 cassandra 中有一个 table,不会经常更新。但是我需要对一些流数据进行一些验证。所以我想从这个 table 中提取所有数据,这些数据更新不多,进入内存。 getInstance
在启动时拉入整个 table ,然后我检查我所有的流数据,看看我是否需要再次从 cassandra 拉出(我很少需要这样做)。 transform 和 collect 是我检查是否需要拉入新数据的地方。但是由于我的 table 有可能会更新,所以我需要偶尔更新广播。所以我的想法是销毁它然后重新播放。一旦我让其他东西工作,我会更新它。
如果我注释掉销毁和重播,我会得到同样的错误。
另一个更新:
我需要在 processor
这一行访问广播变量:.map(r => processor(r._2, MyCache.cache.value))
.
我可以在转换中广播变量,如果我在转换中执行 println(MyCache.cache.value)
,那么我的所有测试都通过了,然后我可以在 [=19= 中访问广播]
更新:
rawStream
.map(r => {
println("$$$$$$$$$$$$$$$$$$$")
println(metrics.value)
r
})
这是我遇到此行时得到的堆栈跟踪。
ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 135.0 (TID 114)
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:160)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:158)
at scala.collection.Iterator$$anon.next(Iterator.scala:370)
at scala.collection.Iterator$$anon.next(Iterator.scala:370)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:414)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
... 24 more
[更新答案]
您收到错误是因为 rawStream.map
中的代码,即 MyCache.cache.value
正在其中一个执行程序上执行,而 MyCache.cache 仍然是 null
!
当您执行 MyCache.getInstance
时,它会在驱动程序 上创建 MyCache.cache
值 并正常广播。但是你没有在你的 map
方法中引用同一个对象,所以它不会被发送给执行者。相反,由于您直接引用 MyCache
,因此执行程序会在他们自己的 MyCache
对象副本上调用 MyCache.cache
,这显然是空的。
您可以通过首先在驱动程序中获取 cache
广播对象的实例并在地图中使用 that 对象来使其按预期工作。以下代码应该适合您 --
val cache = MYCache.getInstance(ssc.sparkContext)
rawStream.map(r => {
println(cache.value)
r
})
我在从转换函数内部访问变量时遇到问题。有人可以帮帮我吗? 这是我的相关 类 和函数。
@SerialVersionUID(889949215L)
object MyCache extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
@volatile var cache: Broadcast[Map[UUID, Definition]] = null
def getInstance(sparkContext: SparkContext) : Broadcast[Map[UUID, Definition]] = {
if (cache == null) {
synchronized {
val map = sparkContext.cassandraTable("keyspace", "table")
.collect()
.map(m => m.getUUID("id") ->
Definition(m.getString("c1"), m.getString("c2"), m.getString("c3"),
m.getString("c4"))).toMap
cache = sparkContext.broadcast(map)
}
}
cache
}
}
在不同的文件中:
object Processor extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
def processData[T: ClassTag](rawStream: DStream[(String, String)], ssc: StreamingContext,
processor: (String, Broadcast[Map[UUID, Definition]]) => T): DStream[T] = {
MYCache.getInstance(ssc.sparkContext)
var newCacheValues = Map[UUID, Definition]()
rawStream.cache()
rawStream
.transform(rdd => {
val array = rdd.collect()
array.foreach(r => {
val value = getNewCacheValue(r._2, rdd.context)
if (value.isDefined) {
newCacheValues = newCacheValues + value.get
}
})
rdd
})
if (newCacheValues.nonEmpty) {
logger.info(s"Rebroadcasting. There are ${newCacheValues.size} new values")
logger.info("Destroying old cache")
MyCache.cache.destroy()
// this is probably wrong here, destroying object, but then referencing it. But I haven't gotten to this part yet.
MyCache.cache = ssc.sparkContext.broadcast(MyCache.cache.value ++ newCacheValues)
}
rawStream
.map(r => {
println("######################")
println(MyCache.cache.value)
r
})
.map(r => processor(r._2, MyCache.cache.value))
.filter(r => null != r)
}
}
每次我 运行 尝试访问 cache.value
时,我都会得到SparkException: Failed to get broadcast_1_piece0 of broadcast_1
当我在 .getInstance
之后添加 println(MyCache.cache.values)
时,我能够访问广播变量,但是当我将它部署到 mesos 集群时,我无法访问广播值再次出现,但出现空指针异常。
更新:
我看到的错误是 println(MyCache.cache.value)
。我不应该添加这个包含 destroy 的 if 语句,因为我的测试从来没有碰到过它。
我的应用程序的基础是,我在 cassandra 中有一个 table,不会经常更新。但是我需要对一些流数据进行一些验证。所以我想从这个 table 中提取所有数据,这些数据更新不多,进入内存。 getInstance
在启动时拉入整个 table ,然后我检查我所有的流数据,看看我是否需要再次从 cassandra 拉出(我很少需要这样做)。 transform 和 collect 是我检查是否需要拉入新数据的地方。但是由于我的 table 有可能会更新,所以我需要偶尔更新广播。所以我的想法是销毁它然后重新播放。一旦我让其他东西工作,我会更新它。
如果我注释掉销毁和重播,我会得到同样的错误。
另一个更新:
我需要在 processor
这一行访问广播变量:.map(r => processor(r._2, MyCache.cache.value))
.
我可以在转换中广播变量,如果我在转换中执行 println(MyCache.cache.value)
,那么我的所有测试都通过了,然后我可以在 [=19= 中访问广播]
更新:
rawStream
.map(r => {
println("$$$$$$$$$$$$$$$$$$$")
println(metrics.value)
r
})
这是我遇到此行时得到的堆栈跟踪。
ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 135.0 (TID 114)
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:160)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData.apply(StreamProcessors.scala:158)
at scala.collection.Iterator$$anon.next(Iterator.scala:370)
at scala.collection.Iterator$$anon.next(Iterator.scala:370)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:414)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$$anonfun.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
... 24 more
[更新答案]
您收到错误是因为 rawStream.map
中的代码,即 MyCache.cache.value
正在其中一个执行程序上执行,而 MyCache.cache 仍然是 null
!
当您执行 MyCache.getInstance
时,它会在驱动程序 上创建 MyCache.cache
值 并正常广播。但是你没有在你的 map
方法中引用同一个对象,所以它不会被发送给执行者。相反,由于您直接引用 MyCache
,因此执行程序会在他们自己的 MyCache
对象副本上调用 MyCache.cache
,这显然是空的。
您可以通过首先在驱动程序中获取 cache
广播对象的实例并在地图中使用 that 对象来使其按预期工作。以下代码应该适合您 --
val cache = MYCache.getInstance(ssc.sparkContext)
rawStream.map(r => {
println(cache.value)
r
})