Pyspark iphython 笔记本错误

Pyspark iphython notebook error

这是 Databricks spark 教程中可用的 pyspark 程序的简单日志分析:

import sys
import os
from test_helper import Test

baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab2', 'apache.access.log.PROJECT')
logFile = os.path.join(baseDir, inputPath)

def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine))

    access_logs = (parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0]))

    failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print 'Number of invalid logline: %d' % failed_logs.count()
        for line in failed_logs.take(20):
            print 'Invalid logline: %s' % line

    print 'Read %d lines, successfully parsed %d lines, failed to parse %d lines' % (parsed_logs.count(), access_logs.count(), failed_logs.count())
    return parsed_logs, access_logs, failed_logs


parsed_logs, access_logs, failed_logs = parseLogs()

我收到以下错误:


Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-d91e2c3d41f9> in <module>()
     24 
     25 
---> 26 parsed_logs, access_logs, failed_logs = parseLogs()

<ipython-input-4-d91e2c3d41f9> in parseLogs()
     14 
     15     failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0]))
---> 16     failed_logs_count = failed_logs.count()
     17     if failed_logs_count > 0:
     18         print 'Number of invalid logline: %d' % failed_logs.count()

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in count(self)
    930         3
    931         """
--> 932         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
    933 
    934     def stats(self):

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in sum(self)
    921         6.0
    922         """
--> 923         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
    924 
    925     def count(self):

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in reduce(self, f)
    737             yield reduce(f, iterator, initial)
    738 
--> 739         vals = self.mapPartitions(func).collect()
    740         if vals:
    741             return reduce(f, vals)

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self)
    711         """
    712         with SCCallSiteSync(self.context) as css:
--> 713             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    714         return list(_load_from_socket(port, self._jrdd_deserializer))
    715 

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/vagrant/data/data/cs100/lab2/apache.access.log.PROJECT
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:374)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

你能告诉我这里的问题是什么吗?我是这个领域的新手。如果你也写一个解释会很有帮助。

我认为问题出在您输入文件的路径上。

Input path does not exist: file:/home/vagrant/data/data/cs100/lab2/apache.access.log.PROJECT

确保这是正确的路径,如果不正确,则在脚本中进行相应的更改。您也可以通过以下方式访问路径:

:
inputPath = '/path/to/your/data/directory/'
logFile = inputPath + 'apache.access.log.PROJECT'
:
parsed_logs = (sc.textFile(logFile).map(parseApacheLogLine))
:

您始终可以通过打印路径来验证您的路径,然后通过 terminal/cmd 或文件资源管理器手动验证位置。