如何使用 PySpark 将这么多 csv 文件(大约 130,000 个)有效地合并到一个大型数据集中?

How can I merge these many csv files (around 130,000) using PySpark into one large dataset efficiently?

我之前发布了这个问题并得到了一些使用 PySpark 的建议。

How can I merge this large dataset into one large dataframe efficiently?

以下 zip 文件 (https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip) 包含一个名为 data 的文件夹,其中包含大约 130,000 个 csv 文件。我想将它们全部合并到一个数据框中。我有 16gb 的 RAM,当我访问前几百个文件时,我将 运行ning 保留在 RAM 之外。文件的总大小只有大约 300-400 MB 的数据。

如果你打开任何一个csv文件,你可以看到它们的格式都是一样的,第一列是日期,第二列是数据系列。

所以现在我正在使用 PySpark,但是我不知道什么是连接所有文件的最有效方法,使用 pandas 数据帧我会像这样连接单个帧的列表,因为我希望他们在日期合并:

bigframe = pd.concat(listofframes,join='outer', axis=0)

但是正如我提到的,这种方法不起作用,因为我 运行 RAM 耗尽的速度非常快。

使用 PySpark 执行类似操作的最佳方法是什么?

到目前为止我有这个,(顺便说一句,下面的文件列表只是我要提取的文件列表,你可以忽略它)


import os

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col

from functools import reduce
from pyspark.sql import DataFrame

listdf = []

for subdir, dirs, files in os.walk("/kaggle/input/filelist/"):
    for file in files:
        path = os.path.join(subdir,file)
        print(file)
        filelist = pd.read_excel("/kaggle/input/filelist/" + file)

        for row in filelist.File.items():
            df = spark.read.csv(f"/kaggle/input/master/{file[:-5]}/{file[:-5]}/data/" + row[1], inferSchema = True, header = True)
            df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
            df.show(3)
            listdf.append(df)

我在添加了 10 帧后停止了代码, 但是当我尝试下面的代码时,它只有一列数据,无法正确合并。

bigframe = reduce(DataFrame.join(listdf, ['DATE'], how='full'))

但我只剩下 2 列数据,日期和火花帧列表中的第一项。

如何正确地将所有内容合并到一个框架中?我希望日期成为其他列合并的事物索引。意思是如果一帧有:

Date        TimeSeries1
1 Jan 2012  12345
2 Jan 2012  23456

另一个有

Date        TimeSeries2
1 Jan 2012  5678
3 Jan 2012  8910

我要的成品是

Date        TimeSeries1 TimeSeries2
1 Jan 2012  12345       5678
2 Jan 2012  23456
3 Jan 2012              8910

此外,为了识别列,必须将名称更改为文件名。

spark 默认可以从包含相同模式的多个文件中读取数据。

要分别处理每个时间序列,您可以按文件名对数据帧进行分组,然后使用 pandas udf 来处理每个组。

import glob as g
import pyspark.sql.functions as F

@F.pandas_udf("date Date, value DECIMAL(38,4)", F.PandasUDFType.GROUPED_MAP)
def transform(pdf):
  # pdf will be a pandas datafrmme for each timeseries
  # apply timeseries computations here and return a new dataframe
  # with aggregated values
  return pdf

paths = g.glob("./INTRNTL_csv_2/data/**/*.csv", recursive=True)

df = spark.read.csv(paths, header=False, schema="date DATE, value DECIMAL(38,4)")


res = df.withColumn('name', F.input_file_name())
res = res.groupBy('name').apply(transform)
res.show()

这里发生了很多事情,但是如果我可以将其提炼为需要将 130k CSV 文件中的数据合并到一个 DF 中,并捕获每个文件的名称,那么您可以这样做。

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])

fullpath = 'mnt/INTRNTL_csv_2/data/??/*.csv'

df = spark.read.format("csv") \
   .option("header", "false") \
   .option("sep","|") \
   .schema(customSchema) \
   .load(fullPath) \
   .withColumn("filename", input_file_name())

注意:第一行代码和最后一行代码用于获取文件名。另外,请注意通配符;这 '?'用于单个字符(字母或数字),“*”用于任意数量的字符(字母和数字的任意组合)。