缓存 Rdds 时从另一个 spark 节点获取块时出错

Error fetching the blocks from another spark node while caching the Rdds

我正在尝试使用 SparkCassandraConnector. I have multiple Spark(1.6) Jobs which uses the same data. So, I cached it using the following code.

Cassandra 3.9 获取数据

火花代码:

  sc.parallelize(partitions, 2*sc.defaultParallelism).map(x => new Partition(x)).joinWithCassandraTable("KEYSPACE","COLUMNFAMILY").on(SomeColumns("partitionkey")).select("partitionkey", "cookie", "query").cache()  

但是很少有任务失败并抛出以下异常:

 org.apache.spark.storage.BlockFetchException: Failed to fetch block from 1 locations. Most recent failure cause:
        at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:595)
        at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote.apply(BlockManager.scala:585)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
        at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.io.IOException: Connection from ubuntu/172.16.0.27:56727 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
        at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
        at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:659)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
        ... 1 more

当我们不缓存数据时我没有得到任何异常。此外,每个节点都包含 ubuntuubuntu1 到其主机文件中各自 IP 的映射。

此外,如截图中所述,它将整个数据划分为8个分区。 SparkCassandra connector 应该智能地分配了作业,但为什么 Locality_Level 它显示的是 ANY 这意味着它无法在同一节点上找到数据,为什么?

新版本已解决此问题:

an error which occurs due to failed fetches of NON-shuffle blocks (such as broadcasts or cached RDD blocks).

https://issues.apache.org/jira/browse/SPARK-14209
https://issues.apache.org/jira/browse/SPARK-17484