从文件中读取图形
Reading graph from file
在我的 Windows 机器上使用 Hadoop/Spark 的 SparklyR 安装中的 Spark-Shell 查看 运行 GraphX 示例。我可以先从这里的安装目录启动 shell:
start C:\Users\eyeOfTheStorm\AppData\Local\rstudio\spark\Cache\spark-2.0.0-bin-hadoop2.7\bin\spark-shell
输出:
17/01/02 12:21:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/02 12:21:07 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.99.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1483388466798).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) Client VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
然后使用 SPARK IN ACTION 中的此文本示例作为 Cit-Hepth.txt
保存在 C:\Users\eyeOfTheStorm
中,例如使用此数据:
"V1" "V2"
1001 9304045
1001 9308122
1001 9309097
1001 9311042
1001 9401139
1001 9404151
1001 9407087
1001 9408099
1001 9501030
1001 9503124
1001 9504090
然后我简单地从 Scala shell 运行 val graph = GraphLoader.edgeListFile(sc, "Cit-HepTh.txt")
,得到以下错误。请注意,HADOOP_HOME
由 SparklyR 自动设置,并在 C:\Users\eyeOfTheStorm\AppData\Local\rstudio\spark\Cache\spark-2.0.0-bin-hadoop2.7\tmp\hadoop
中安装了正确的 winutils。是否缺少一段代码或一条路径可以消除下面的错误和 运行 代码?
scala> val graph = GraphLoader.edgeListFile(sc, "Cit-HepTh.txt")
17/01/02 12:41:48 WARN BlockManager: Putting block rdd_5_0 failed
17/01/02 12:41:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
17/01/02 12:41:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
17/01/02 12:41:48 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/01/02 12:41:48 WARN BlockManager: Putting block rdd_5_1 failed
17/01/02 12:41:48 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost): TaskKilled (killed intentionally)
[Stage 0:> (0 + 1) / 2]org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
at org.apache.spark.graphx.GraphLoader$.edgeListFile(GraphLoader.scala:94)
... 50 elided
Caused by: java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
GraphLoader.edgeListFile
should be的输入:
edge list formatted file where each line contains two integers: a source id and a target id.
不允许其他值,例如 headers 或属性。您可以手动删除 header 行或使用其他加载方法,例如 csv
reader:
val spark: SparkSession = ???
import spark.implicits._
val path: String = ???
Graph.fromEdgeTuples(
spark.read
// Adjust separator if needed
.options(Map("header" -> "true", "delimiter" -> "\t"))
.csv(path)
.select($"V1".cast("long"), $"V2".cast("long"))
.as[(Long, Long)]
.rdd,
defaultValue = 0
)
您也可以使用 GraphFrames
:
import org.graphframes.GraphFrame
GraphFrame.fromEdges(spark.read
.options(Map("header" -> "true", "delimiter" -> "\t"))
.csv(path)
.toDF("src", "dst")
).toGraphX
在我的 Windows 机器上使用 Hadoop/Spark 的 SparklyR 安装中的 Spark-Shell 查看 运行 GraphX 示例。我可以先从这里的安装目录启动 shell:
start C:\Users\eyeOfTheStorm\AppData\Local\rstudio\spark\Cache\spark-2.0.0-bin-hadoop2.7\bin\spark-shell
输出:
17/01/02 12:21:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/02 12:21:07 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.99.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1483388466798).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) Client VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
然后使用 SPARK IN ACTION 中的此文本示例作为 Cit-Hepth.txt
保存在 C:\Users\eyeOfTheStorm
中,例如使用此数据:
"V1" "V2"
1001 9304045
1001 9308122
1001 9309097
1001 9311042
1001 9401139
1001 9404151
1001 9407087
1001 9408099
1001 9501030
1001 9503124
1001 9504090
然后我简单地从 Scala shell 运行 val graph = GraphLoader.edgeListFile(sc, "Cit-HepTh.txt")
,得到以下错误。请注意,HADOOP_HOME
由 SparklyR 自动设置,并在 C:\Users\eyeOfTheStorm\AppData\Local\rstudio\spark\Cache\spark-2.0.0-bin-hadoop2.7\tmp\hadoop
中安装了正确的 winutils。是否缺少一段代码或一条路径可以消除下面的错误和 运行 代码?
scala> val graph = GraphLoader.edgeListFile(sc, "Cit-HepTh.txt")
17/01/02 12:41:48 WARN BlockManager: Putting block rdd_5_0 failed
17/01/02 12:41:48 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
17/01/02 12:41:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
17/01/02 12:41:48 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/01/02 12:41:48 WARN BlockManager: Putting block rdd_5_1 failed
17/01/02 12:41:48 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost): TaskKilled (killed intentionally)
[Stage 0:> (0 + 1) / 2]org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
at org.apache.spark.graphx.GraphLoader$.edgeListFile(GraphLoader.scala:94)
... 50 elided
Caused by: java.lang.NumberFormatException: For input string: ""V1""
at java.lang.NumberFormatException.forInputString(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at java.lang.Long.parseLong(Unknown Source)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:83)
at org.apache.spark.graphx.GraphLoader$$anonfun$$anonfun$apply.apply(GraphLoader.scala:77)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:77)
at org.apache.spark.graphx.GraphLoader$$anonfun.apply(GraphLoader.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:332)
at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:330)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:919)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
GraphLoader.edgeListFile
should be的输入:
edge list formatted file where each line contains two integers: a source id and a target id.
不允许其他值,例如 headers 或属性。您可以手动删除 header 行或使用其他加载方法,例如 csv
reader:
val spark: SparkSession = ???
import spark.implicits._
val path: String = ???
Graph.fromEdgeTuples(
spark.read
// Adjust separator if needed
.options(Map("header" -> "true", "delimiter" -> "\t"))
.csv(path)
.select($"V1".cast("long"), $"V2".cast("long"))
.as[(Long, Long)]
.rdd,
defaultValue = 0
)
您也可以使用 GraphFrames
:
import org.graphframes.GraphFrame
GraphFrame.fromEdges(spark.read
.options(Map("header" -> "true", "delimiter" -> "\t"))
.csv(path)
.toDF("src", "dst")
).toGraphX