运行 Connected Component 示例时无法设置检查点目录

Cannot set checkpoint dir when running Connected Component example

这是 example graphframe:

的连通分量
from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

在文档中,他们说:

NOTE: With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark checkpoint directory. Users can revert to the old algorithm using connectedComponents.setAlgorithm("graphx").

所以这是我的完整代码 connected.pysetCheckpointDir:

import pyspark

sc = pyspark.SparkContext().getOrCreate()

sc.addPyFile("/home/username/.ivy2/jars/graphframes_graphframes-0.8.1-spark3.0-s_2.12.jar")

from graphframes.examples import Graphs

sc.setCheckpointDir("graphframes_cps")

g = Graphs(sqlContext).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

和 运行 使用此命令:

spark-submit connected.py --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12

然后就returns这个错误:

Traceback (most recent call last):
  File "/home/username//test/spark/connected.py", line 11, in <module>
    sc.setCheckpointDir("graphframes_cps")
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 975, in setCheckpointDir
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.setCheckpointDir.

我该如何解决这个问题?

当运行通过graphframes连接连通分量的例子时:

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

我会得到这个错误:

java.io.IOException: Checkpoint directory is not set. Please set it first using sc.setCheckpointDir().

也就是说我还没有设置checkpointDir。然后添加该行:

sc.setCheckpointDir(dirName="/home/username/graphframes_cps")

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

我得到的错误是:

Traceback (most recent call last):
  File "/home/username//test/spark/connected.py", line 11, in <module>
    sc.setCheckpointDir("graphframes_cps")
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 975, in setCheckpointDir
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.setCheckpointDir.

Py4JJavaError: An error occurred while calling o176.setCheckpointDir.
: java.net.ConnectException: Call From huycomputer/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused

下面还有其他我没有注意到的错误行,但这是根本问题。我没有启动 HDFS,所以 pyspark 无法连接到 localhost:9000 这是 HDFS 服务端口。

所以在我 运行 start-dfs.sh 之后,它按预期工作。但是我仍然不知道如何使用本地文件夹。这是localhost:9870并且在我运行例子几次之后的"/home/username/graphframes_cps"路径中。

所以这是我的完整代码,我使用 Jupyter Notebook,所以它已经启动了一个 SparkContext,我只需要使用 sc 变量到 运行 setCheckpointDir()

from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph
sc.setCheckpointDir(dirName="/home/dhuy237/graphframes_cps")

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

输出:

+---+------------+
| id|   component|
+---+------------+
|  b|412316860416|
|  c|412316860416|
|  e|412316860416|
|  f|412316860416|
|  d|412316860416|
|  a|412316860416|
+---+------------+