运行 PageRank 和 BFS 在 PySpark 中的 Graphframes 上运行时出错

Error while running PageRank and BFS functions on Graphframes in PySpark

我是 Spark 的新手,正在 Cloudera Distr for Hadoop (CDH) 上学习它。我正在尝试通过 Jupyter Notebook 执行 PageRank 和 BFS 函数,这是使用以下命令启动的:

pyspark --packages graphframes:graphframes:0.1.0-spark1.6,com.databricks:spark-csv_2.11:1.2.0

以下是我尝试 运行 的 PageRank 函数命令以及错误消息:

ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)

输出:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-20-34d549cc033e> in <module>()
----> 1 ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)
      2 ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(20).show()

/tmp/spark-3bdc323d-a439-4f0a-ac1d-4e64ef4d1396/userFiles-0c248c5c-29fc-44c7-bfd9-3543500350dc/graphframes_graphframes-0.1.0-spark1.6.jar/graphframes/graphframe.pyc in pageRank(self, resetProbability, sourceId, maxIter, tol)

/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o106.run.
: java.lang.AbstractMethodError
    at org.apache.spark.Logging$class.log(Logging.scala:50)
    at org.apache.spark.graphx.lib.backport.PageRank$.log(PageRank.scala:65)
    at org.apache.spark.Logging$class.logInfo(Logging.scala:58)
    at org.apache.spark.graphx.lib.backport.PageRank$.logInfo(PageRank.scala:65)
    at org.apache.spark.graphx.lib.backport.PageRank$.runWithOptions(PageRank.scala:148)
    at org.graphframes.lib.PageRank$.run(PageRank.scala:130)
    at org.graphframes.lib.PageRank.run(PageRank.scala:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

我正在尝试的 BFS 函数收到相同的错误消息:

filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SEA'",
  toExpr = "id = 'SFO'",
  maxPathLength = 1)

输出:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-22-74394b11f50d> in <module>()
      4   fromExpr = "id = 'SEA'",
      5   toExpr = "id = 'SFO'",
----> 6   maxPathLength = 1)
      7 
      8 filteredPaths.show()

/tmp/spark-3bdc323d-a439-4f0a-ac1d-4e64ef4d1396/userFiles-0c248c5c-29fc-44c7-bfd9-3543500350dc/graphframes_graphframes-0.1.0-spark1.6.jar/graphframes/graphframe.pyc in bfs(self, fromExpr, toExpr, edgeFilter, maxPathLength)

/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o147.run.
: java.lang.AbstractMethodError
    at org.apache.spark.Logging$class.log(Logging.scala:50)
    at org.graphframes.lib.BFS$.log(BFS.scala:131)
    at org.apache.spark.Logging$class.logInfo(Logging.scala:58)
    at org.graphframes.lib.BFS$.logInfo(BFS.scala:131)
    at org.graphframes.lib.BFS$.org$graphframes$lib$BFS$$run(BFS.scala:212)
    at org.graphframes.lib.BFS.run(BFS.scala:126)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

你能告诉我这个问题吗?

谢谢,萨西。

您正在使用不兼容的 Scala 版本:

  • graphframes:graphframes:0.1.0-spark1.6 - Scala 2.10
  • com.databricks:spark-csv_2.11:1.2.0 - Scala 2.11
  • - 可能是 Scala 2.10。

您必须对所有组件使用相同的 Scala 版本(com.databricks:spark-csv_2.10:1.2.0 如果 Spark 是使用 Scala 2.10 编译的)。详情请咨询