Spark s3 csv 文件读取顺序
Spark s3 csv files read order
假设s3文件夹中的三个文件,通过spark.read.csv(s3:bucketname/folder1/*.csv)读取是否按顺序读取文件?
如果没有,有没有办法在读取整个文件夹时对文件进行排序,其中包含在内部不同时间收到的多个文件。
File name
s3 file uploaded/Last modified time
s3:bucketname/folder1/file1.csv
01:00:00
s3:bucketname/folder1/file2.csv
01:10:00
s3:bucketname/folder1/file3.csv
01:20:00
您可以使用以下方法实现此目的
- 迭代存储桶中的所有文件并通过添加新列加载该 csv
last_modified
。保留将在 dfs_list
中加载的所有 dfs 的列表。由于 pyspark 进行惰性评估,因此不会立即加载数据。
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucketname')
dfs_list = []
for file_object in my_bucket.objects.filter(Prefix="folder1/"):
df = spark.read.parquet('s3a://' + file_object.name).withColumn("modified_date", file_object.last_modified)
dfs_list.append(df)
- 现在使用 pyspark
unionAll
函数对所有 dfs 进行并集,然后根据 modified_date
. 对数据进行排序
from functools import reduce
from pyspark.sql import DataFrame
df_combined = reduce(DataFrame.unionAll, dfs_list)
df_combined = df_combined.orderBy('modified_date')
假设s3文件夹中的三个文件,通过spark.read.csv(s3:bucketname/folder1/*.csv)读取是否按顺序读取文件? 如果没有,有没有办法在读取整个文件夹时对文件进行排序,其中包含在内部不同时间收到的多个文件。
File name | s3 file uploaded/Last modified time |
---|---|
s3:bucketname/folder1/file1.csv | 01:00:00 |
s3:bucketname/folder1/file2.csv | 01:10:00 |
s3:bucketname/folder1/file3.csv | 01:20:00 |
您可以使用以下方法实现此目的
- 迭代存储桶中的所有文件并通过添加新列加载该 csv
last_modified
。保留将在dfs_list
中加载的所有 dfs 的列表。由于 pyspark 进行惰性评估,因此不会立即加载数据。
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucketname')
dfs_list = []
for file_object in my_bucket.objects.filter(Prefix="folder1/"):
df = spark.read.parquet('s3a://' + file_object.name).withColumn("modified_date", file_object.last_modified)
dfs_list.append(df)
- 现在使用 pyspark
unionAll
函数对所有 dfs 进行并集,然后根据modified_date
. 对数据进行排序
from functools import reduce
from pyspark.sql import DataFrame
df_combined = reduce(DataFrame.unionAll, dfs_list)
df_combined = df_combined.orderBy('modified_date')