pySpark - DataFrame groupBy 故障排除回溯

pySpark - DataFrame groupBy troubleshooting traceback

我在 VirtualBox Ubuntu 14 32 位虚拟机中设置了 Spark 1.3。我已将 csv 文件放入 Spark DataFrame 中,并正在尝试一些操作,这些操作给出了我无法解决的错误消息。

pySpark代码如下

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import *
from dateutil.parser import parse
sqlContext = SQLContext(sc)
elevFile = sc.textFile('file:////sharefolder/Jones Lake.csv')
header = elevFile.first()
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields[0].dataType = IntegerType()
fields[1].dataType = TimestampType()
fields[2].dataType = FloatType()
schema = StructType(fields)
elevHeader = elevFile.filter(lambda l: "Hour" in l)
elevHeader.collect()
elevNoHeader = elevFile.subtract(elevHeader)
print elevNoHeader.take(5)
elev_df = (elevNoHeader.map(lambda k: k.split(","))
            .map(lambda p: (int(p[0]), parse(p[1]), float(p[2])))
            .toDF(schema))

到目前为止一切正常。它打印出新 DataFrame 的前 5 行没有问题:

print elev_df.head(5)
[Row(Hour=6, Date=datetime.datetime(1989, 9, 19, 0, 0), Value=641.6890258789062), Row(Hour=20, Date=datetime.datetime(1992, 4, 30, 0, 0), Value=633.7100219726562), Row(Hour=10, Date=datetime.datetime(1987, 7, 26, 0, 0), Value=638.6920166015625), Row(Hour=1, Date=datetime.datetime(1991, 2, 26, 0, 0), Value=634.2100219726562), Row(Hour=2, Date=datetime.datetime(1984, 7, 28, 0, 0), Value=639.8779907226562)]

但是当我尝试进行简单的分组和计数时,我遇到了无法解决的错误。

elev_df.groupBy("Hour").count().show()

给出错误(前几行错误如下)。

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-209-6533c596fac9> in <module>()
----> 1 elev_df.groupBy("Hour").count().show()

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n)
    271         5   Bob
    272         """
--> 273         print self._jdf.showString(n).encode('utf8', 'ignore')
    274 
    275     def __repr__(self):

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

关于进一步解决此问题的任何想法?

你的 CSV 似乎有一些空白值。 我可以看到您正在替换空白值,但我相信 groupby 不接受。 使用 spark 数据框以简单的方式处理你的 csv 空白值-

fillna(value, subset=None)
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

Parameters: 
value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.
subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.