如何使用 PySpark 将这么多 csv 文件(大约 130,000 个)有效地合并到一个大型数据集中?
How can I merge these many csv files (around 130,000) using PySpark into one large dataset efficiently?
我之前发布了这个问题并得到了一些使用 PySpark 的建议。
How can I merge this large dataset into one large dataframe efficiently?
以下 zip 文件 (https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip) 包含一个名为 data 的文件夹,其中包含大约 130,000 个 csv 文件。我想将它们全部合并到一个数据框中。我有 16gb 的 RAM,当我访问前几百个文件时,我将 运行ning 保留在 RAM 之外。文件的总大小只有大约 300-400 MB 的数据。
如果你打开任何一个csv文件,你可以看到它们的格式都是一样的,第一列是日期,第二列是数据系列。
所以现在我正在使用 PySpark,但是我不知道什么是连接所有文件的最有效方法,使用 pandas 数据帧我会像这样连接单个帧的列表,因为我希望他们在日期合并:
bigframe = pd.concat(listofframes,join='outer', axis=0)
但是正如我提到的,这种方法不起作用,因为我 运行 RAM 耗尽的速度非常快。
使用 PySpark 执行类似操作的最佳方法是什么?
到目前为止我有这个,(顺便说一句,下面的文件列表只是我要提取的文件列表,你可以忽略它)
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame
listdf = []
for subdir, dirs, files in os.walk("/kaggle/input/filelist/"):
for file in files:
path = os.path.join(subdir,file)
print(file)
filelist = pd.read_excel("/kaggle/input/filelist/" + file)
for row in filelist.File.items():
df = spark.read.csv(f"/kaggle/input/master/{file[:-5]}/{file[:-5]}/data/" + row[1], inferSchema = True, header = True)
df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
df.show(3)
listdf.append(df)
我在添加了 10 帧后停止了代码,
但是当我尝试下面的代码时,它只有一列数据,无法正确合并。
bigframe = reduce(DataFrame.join(listdf, ['DATE'], how='full'))
但我只剩下 2 列数据,日期和火花帧列表中的第一项。
如何正确地将所有内容合并到一个框架中?我希望日期成为其他列合并的事物索引。意思是如果一帧有:
Date TimeSeries1
1 Jan 2012 12345
2 Jan 2012 23456
另一个有
Date TimeSeries2
1 Jan 2012 5678
3 Jan 2012 8910
我要的成品是
Date TimeSeries1 TimeSeries2
1 Jan 2012 12345 5678
2 Jan 2012 23456
3 Jan 2012 8910
此外,为了识别列,必须将名称更改为文件名。
spark 默认可以从包含相同模式的多个文件中读取数据。
要分别处理每个时间序列,您可以按文件名对数据帧进行分组,然后使用 pandas udf 来处理每个组。
import glob as g
import pyspark.sql.functions as F
@F.pandas_udf("date Date, value DECIMAL(38,4)", F.PandasUDFType.GROUPED_MAP)
def transform(pdf):
# pdf will be a pandas datafrmme for each timeseries
# apply timeseries computations here and return a new dataframe
# with aggregated values
return pdf
paths = g.glob("./INTRNTL_csv_2/data/**/*.csv", recursive=True)
df = spark.read.csv(paths, header=False, schema="date DATE, value DECIMAL(38,4)")
res = df.withColumn('name', F.input_file_name())
res = res.groupBy('name').apply(transform)
res.show()
这里发生了很多事情,但是如果我可以将其提炼为需要将 130k CSV 文件中的数据合并到一个 DF 中,并捕获每个文件的名称,那么您可以这样做。
from pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc.,
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])
fullpath = 'mnt/INTRNTL_csv_2/data/??/*.csv'
df = spark.read.format("csv") \
.option("header", "false") \
.option("sep","|") \
.schema(customSchema) \
.load(fullPath) \
.withColumn("filename", input_file_name())
注意:第一行代码和最后一行代码用于获取文件名。另外,请注意通配符;这 '?'用于单个字符(字母或数字),“*”用于任意数量的字符(字母和数字的任意组合)。
我之前发布了这个问题并得到了一些使用 PySpark 的建议。
How can I merge this large dataset into one large dataframe efficiently?
以下 zip 文件 (https://fred.stlouisfed.org/categories/32263/downloaddata/INTRNTL_csv_2.zip) 包含一个名为 data 的文件夹,其中包含大约 130,000 个 csv 文件。我想将它们全部合并到一个数据框中。我有 16gb 的 RAM,当我访问前几百个文件时,我将 运行ning 保留在 RAM 之外。文件的总大小只有大约 300-400 MB 的数据。
如果你打开任何一个csv文件,你可以看到它们的格式都是一样的,第一列是日期,第二列是数据系列。
所以现在我正在使用 PySpark,但是我不知道什么是连接所有文件的最有效方法,使用 pandas 数据帧我会像这样连接单个帧的列表,因为我希望他们在日期合并:
bigframe = pd.concat(listofframes,join='outer', axis=0)
但是正如我提到的,这种方法不起作用,因为我 运行 RAM 耗尽的速度非常快。
使用 PySpark 执行类似操作的最佳方法是什么?
到目前为止我有这个,(顺便说一句,下面的文件列表只是我要提取的文件列表,你可以忽略它)
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark-dataframe-demo').getOrCreate()
from pyspark.sql import *
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame
listdf = []
for subdir, dirs, files in os.walk("/kaggle/input/filelist/"):
for file in files:
path = os.path.join(subdir,file)
print(file)
filelist = pd.read_excel("/kaggle/input/filelist/" + file)
for row in filelist.File.items():
df = spark.read.csv(f"/kaggle/input/master/{file[:-5]}/{file[:-5]}/data/" + row[1], inferSchema = True, header = True)
df = df.select(col("DATE").alias("DATE"),col("VALUE").alias(row[1][:-4]))
df.show(3)
listdf.append(df)
我在添加了 10 帧后停止了代码, 但是当我尝试下面的代码时,它只有一列数据,无法正确合并。
bigframe = reduce(DataFrame.join(listdf, ['DATE'], how='full'))
但我只剩下 2 列数据,日期和火花帧列表中的第一项。
如何正确地将所有内容合并到一个框架中?我希望日期成为其他列合并的事物索引。意思是如果一帧有:
Date TimeSeries1
1 Jan 2012 12345
2 Jan 2012 23456
另一个有
Date TimeSeries2
1 Jan 2012 5678
3 Jan 2012 8910
我要的成品是
Date TimeSeries1 TimeSeries2
1 Jan 2012 12345 5678
2 Jan 2012 23456
3 Jan 2012 8910
此外,为了识别列,必须将名称更改为文件名。
spark 默认可以从包含相同模式的多个文件中读取数据。
要分别处理每个时间序列,您可以按文件名对数据帧进行分组,然后使用 pandas udf 来处理每个组。
import glob as g
import pyspark.sql.functions as F
@F.pandas_udf("date Date, value DECIMAL(38,4)", F.PandasUDFType.GROUPED_MAP)
def transform(pdf):
# pdf will be a pandas datafrmme for each timeseries
# apply timeseries computations here and return a new dataframe
# with aggregated values
return pdf
paths = g.glob("./INTRNTL_csv_2/data/**/*.csv", recursive=True)
df = spark.read.csv(paths, header=False, schema="date DATE, value DECIMAL(38,4)")
res = df.withColumn('name', F.input_file_name())
res = res.groupBy('name').apply(transform)
res.show()
这里发生了很多事情,但是如果我可以将其提炼为需要将 130k CSV 文件中的数据合并到一个 DF 中,并捕获每个文件的名称,那么您可以这样做。
from pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc.,
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])
fullpath = 'mnt/INTRNTL_csv_2/data/??/*.csv'
df = spark.read.format("csv") \
.option("header", "false") \
.option("sep","|") \
.schema(customSchema) \
.load(fullPath) \
.withColumn("filename", input_file_name())
注意:第一行代码和最后一行代码用于获取文件名。另外,请注意通配符;这 '?'用于单个字符(字母或数字),“*”用于任意数量的字符(字母和数字的任意组合)。