使用 Spark 加载 CSV 文件

Load CSV file with Spark

我是 Spark 的新手,我正在尝试使用 Spark 从文件中读取 CSV 数据。 这就是我正在做的事情:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

我希望这个调用能给我一个文件前两列的列表,但我收到了这个错误:

File "", line 1, in IndexError: list index out of range

尽管我的 CSV 文件不止一列。

您确定所有行至少有 2 列吗?你能试试类似的东西,只是为了检查一下吗?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

或者,您可以打印罪魁祸首(如果有的话):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

现在,对于任何通用的 csv 文件,还有另一个选项:https://github.com/seahboonsiew/pyspark-csv,如下所示:

假设我们有以下上下文

sc = SparkContext
sqlCtx = SQLContext or HiveContext

首先,使用 SparkContext

将 pyspark-csv.py 分发给执行者
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

通过SparkContext读取csv数据并转换为DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

还有另一种选择,即使用 Pandas 读取 CSV 文件,然后将 Pandas DataFrame 导入 Spark。

例如:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

如果您的 csv 数据碰巧在任何字段中不包含换行符,您可以使用 textFile() 加载数据并解析它

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

Spark 2.0.0+

可以直接使用built-incsv数据源:

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

不包括任何外部依赖项。

Spark < 2.0.0:

我建议 spark-csv:

而不是手动解析,这在一般情况下远非微不足道

确保路径中包含 Spark CSV (--packages--jars--driver-class-path)

并加载您的数据如下:

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

它可以处理加载、模式推断、删除格式错误的行,并且不需要将数据从 Python 传递到 JVM。

:

如果您知道架构,最好避免架构推断并将其传递给 DataFrameReader。假设您有三列 - 整数、双精度和字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

简单地用逗号分隔也会分隔字段内的逗号(例如 a,b,"1,2,3",c),因此不推荐这样做。 zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv 模块:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

编辑:正如@muon 在评论中提到的那样,这会将 header 视为任何其他行,因此您需要手动提取它。例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(确保在过滤器计算之前不要修改 header)。但此时,您最好使用 built-in csv 解析器。

这与 关于使用 Pandas 的内容一致,但有一个重大修改:如果您以块的形式将数据读入 Pandas,它应该更具延展性。这意味着,您可以解析比 Pandas 实际可以作为单个文件处理的文件大得多的文件,并将其以更小的尺寸传递给 Spark。 (这也回答了关于如果他们无论如何都可以将所有内容加载到 Pandas 为什么要使用 Spark 的评论。)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

如果您想将 csv 作为数据框加载,则可以执行以下操作:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

对我来说效果很好。

如果数据集中有任何一行或多行的列数少于或多于 2,则可能会出现此错误。

我也是 Pyspark 的新手,正在尝试读取 CSV 文件。以下代码对我有用:

在这段代码中,我使用来自 kaggle 的数据集 link 是:https://www.kaggle.com/carrie1/ecommerce-data

1.不提架构:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

现在检查列: sdfData.columns

输出将是:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

检查每列的数据类型:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

这将给出数据框,其中所有列的数据类型都是 StringType

2。使用架构: 如果您知道模式或想要更改上面任何列的数据类型 table 然后使用它(假设我有以下列并希望它们在每个列中具有特定的数据类型)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

现在检查每列数据类型的模式:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

已编辑:我们也可以使用以下代码行而无需明确提及模式:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

输出为:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

输出将如下所示:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

当使用 spark.read.csv 时,我发现使用选项 escape='"'multiLine=True 提供了与 CSV standard 最一致的解决方案,根据我的经验,效果最好使用从 Google 表格中导出的 CSV 文件。

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

这是在 PYSPARK 中

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

那你可以查看

df.show(5)
df.count()

以这种方式读取您的 csv 文件:

df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)

spark 版本为 3.0.1