在 S3 中读取 100k+ 小 (40kb) 镶木地板以在 Python 中进行数据操作的最快方法。到目前为止使用的所有方法都非常慢(2 小时以上)
Fastest way to read 100k+ small (40kb) parquets in S3 for data manipulation in Python. All methods used so far have been extremely slow (2+ hours)
我正在尝试从我的 S3 存储桶中读取大量镶木地板文件。目标是编写一些代码来读取这些数据,使用 pandas/dask 对其应用一些逻辑,然后将它们上传回 S3。
每个 parquet 包含约 130 列和 1 行,一些文件的架构可能略有不同。这些文件的路径根据它们所属的分组类别而有所不同,比如 group1、group2 和 group3,如下所示:s3://path/to/my/files/{group_N}/filename1.parquet.gz
.
在我的代码的早期阶段,我生成了(list_objects_V2
和 NextPaginationToken
)一个字典,它以 group_name 为键,以该组下所有镶木地板路径的列表为值(例如:{str : List[str]}
)。尽管如此,根据我用来读取镶木地板文件的方法,这个组合也只是group_name到dir_path(例如:{str:str}
)。
我已经尝试了几种解决方案(Dask、PyArrow、Boto3 的 get_object 和 BytesIO 缓冲区、PySpark、AWS Wrangler)并且,那些工作需要 2 小时到(估计)1 天才能 运行.
最好的 运行ning 解决方案是 Boto3 的 get_object 和 BytesIO 缓冲区:
def read_parquet_objects(self, objects_dict: dict) -> dict:
df_holder = {}
for key in objects_dict.keys():
logging.info("Start reading parquets for: " + key)
df_list = list()
for i in objects_dict[key]:
object = self.s3_client.client.get_object(Bucket=my_bucket, Key=i)
bytes_ = BytesIO(object["Body"].read())
df_list.append(pd.read_parquet(bytes_))
pandas_df = pd.concat(df_list, axis=0, ignore_index=True)
df_holder[key] = pandas_df
return df_holder
运行 大约花了 2 个小时。
然后我尝试了 Dask、PyArrow 和 AWS Wrangler,它们花费的时间太长 运行。根据他们在我停止该过程之前完成的内容,他们将花费 1 天以上的时间才能 运行。这是代码:
#AWS WRANGLER
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
print("Start reading parquet for: " + key)
df = wr.s3.read_parquet(path=objects_dict[key], use_threads=True)
return df
# DASK
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
df_list.append(dd.read_parquet(path=i, gather_statistics=False))
df = dd.concat(df_list, axis=0)
pandas_df = df.compute()
objects_dict[key] = pandas_df
return objects_dict
# PYARROW
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
dataset = pq.ParquetDataset(i, filesystem=FILESYSTEM)
table = dataset.read()
single_df = table.to_pandas()
df_list.append(single_df)
pandas_df = pd.concat(dfs, axis=0)
objects_dict[key] = pandas_df
return objects_dict
这三种方法是迄今为止最慢的,每秒读取约 1 个镶木地板(其中 AWS Wrangler 最快,每秒读取约 2 个)。
然后我尝试使用 PySpark 进行 Spark。
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.271,org.apache.hadoop:hadoop-aws:3.1.2 pyspark-shell"
sparkConf = SparkConf()
sparkConf.setAppName("spark")
sparkConf.set("spark.executor.instances", "4")
sparkConf.set("spark.executor.cores", "4")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
hadoop_conf.set("fs.s3a.session.token", os.environ['AWS_SESSION_TOKEN'])
hadoop_conf.set("fs.s3a.experimental.input.fadvise", "sequential")
PROTOCOL = 's3a://'
def read_parquet_objects(self, objects_dict: dict):
for key in objects_dict.keys():
df = spark.read.option("mergeSchema", "true").parquet(objects_dict[key])
df.show()
return df
objects_dict[key]
在这种情况下包含我的镶木地板文件的密钥(就像没有文件名的基本路径一样)。当我 运行 代码时,它曾经卡在 Stage 0 ([Stage 0:> (0 + 8) / 8]
) 上,然后在我醒来时看到此错误消息 HTTP ERROR 500 java.lang.OutOfMemoryError: GC overhead limit exceeded
.现在我得到了一个不同的错误,com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
我真的不知道如何解释。
现在的问题是:有没有更好、更快(< 30 秒)的方法来读取这些镶木地板文件,以便在 pandas/dask 中转换它们?我使用的是完全错误的方法吗?我是否应该使用一些内部 AWS 服务(Lambda、Step Functions、Athena)来获得更好的结果?在这种情况下,read_parquet 函数的运行速度让我感到非常惊讶。
TL:DR;看来您的所有代码都是单线程的,这意味着您在 IO 上花费了大量时间。
部分选项:
- 由于您已经有了文件列表,请尝试使用 manual pyarrow dataset creation on the entire list 而不是一次传递一个文件。默认情况下,数据集应使用多线程。
- 自己穿线。拥有您提交路径的线程池,并让每个任务读取数据并将其附加到列表中。
最后,如果您可以控制数据,我建议您不要使用 parquet 作为格式,因为每个文件的行数如此之少,所以每列的开销很大。 Avro 可能是一个不错的选择,或者如果可能的话,尝试合并每个文件的多行。
我正在尝试从我的 S3 存储桶中读取大量镶木地板文件。目标是编写一些代码来读取这些数据,使用 pandas/dask 对其应用一些逻辑,然后将它们上传回 S3。
每个 parquet 包含约 130 列和 1 行,一些文件的架构可能略有不同。这些文件的路径根据它们所属的分组类别而有所不同,比如 group1、group2 和 group3,如下所示:s3://path/to/my/files/{group_N}/filename1.parquet.gz
.
在我的代码的早期阶段,我生成了(list_objects_V2
和 NextPaginationToken
)一个字典,它以 group_name 为键,以该组下所有镶木地板路径的列表为值(例如:{str : List[str]}
)。尽管如此,根据我用来读取镶木地板文件的方法,这个组合也只是group_name到dir_path(例如:{str:str}
)。
我已经尝试了几种解决方案(Dask、PyArrow、Boto3 的 get_object 和 BytesIO 缓冲区、PySpark、AWS Wrangler)并且,那些工作需要 2 小时到(估计)1 天才能 运行.
最好的 运行ning 解决方案是 Boto3 的 get_object 和 BytesIO 缓冲区:
def read_parquet_objects(self, objects_dict: dict) -> dict:
df_holder = {}
for key in objects_dict.keys():
logging.info("Start reading parquets for: " + key)
df_list = list()
for i in objects_dict[key]:
object = self.s3_client.client.get_object(Bucket=my_bucket, Key=i)
bytes_ = BytesIO(object["Body"].read())
df_list.append(pd.read_parquet(bytes_))
pandas_df = pd.concat(df_list, axis=0, ignore_index=True)
df_holder[key] = pandas_df
return df_holder
运行 大约花了 2 个小时。
然后我尝试了 Dask、PyArrow 和 AWS Wrangler,它们花费的时间太长 运行。根据他们在我停止该过程之前完成的内容,他们将花费 1 天以上的时间才能 运行。这是代码:
#AWS WRANGLER
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
print("Start reading parquet for: " + key)
df = wr.s3.read_parquet(path=objects_dict[key], use_threads=True)
return df
# DASK
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
df_list.append(dd.read_parquet(path=i, gather_statistics=False))
df = dd.concat(df_list, axis=0)
pandas_df = df.compute()
objects_dict[key] = pandas_df
return objects_dict
# PYARROW
def read_parquet_objects(self, objects_dict: dict) -> dict:
for key in objects_dict.keys():
df_list = list()
for i in objects_dict[key]:
dataset = pq.ParquetDataset(i, filesystem=FILESYSTEM)
table = dataset.read()
single_df = table.to_pandas()
df_list.append(single_df)
pandas_df = pd.concat(dfs, axis=0)
objects_dict[key] = pandas_df
return objects_dict
这三种方法是迄今为止最慢的,每秒读取约 1 个镶木地板(其中 AWS Wrangler 最快,每秒读取约 2 个)。
然后我尝试使用 PySpark 进行 Spark。
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.271,org.apache.hadoop:hadoop-aws:3.1.2 pyspark-shell"
sparkConf = SparkConf()
sparkConf.setAppName("spark")
sparkConf.set("spark.executor.instances", "4")
sparkConf.set("spark.executor.cores", "4")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])
hadoop_conf.set("fs.s3a.session.token", os.environ['AWS_SESSION_TOKEN'])
hadoop_conf.set("fs.s3a.experimental.input.fadvise", "sequential")
PROTOCOL = 's3a://'
def read_parquet_objects(self, objects_dict: dict):
for key in objects_dict.keys():
df = spark.read.option("mergeSchema", "true").parquet(objects_dict[key])
df.show()
return df
objects_dict[key]
在这种情况下包含我的镶木地板文件的密钥(就像没有文件名的基本路径一样)。当我 运行 代码时,它曾经卡在 Stage 0 ([Stage 0:> (0 + 8) / 8]
) 上,然后在我醒来时看到此错误消息 HTTP ERROR 500 java.lang.OutOfMemoryError: GC overhead limit exceeded
.现在我得到了一个不同的错误,com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
我真的不知道如何解释。
现在的问题是:有没有更好、更快(< 30 秒)的方法来读取这些镶木地板文件,以便在 pandas/dask 中转换它们?我使用的是完全错误的方法吗?我是否应该使用一些内部 AWS 服务(Lambda、Step Functions、Athena)来获得更好的结果?在这种情况下,read_parquet 函数的运行速度让我感到非常惊讶。
TL:DR;看来您的所有代码都是单线程的,这意味着您在 IO 上花费了大量时间。
部分选项:
- 由于您已经有了文件列表,请尝试使用 manual pyarrow dataset creation on the entire list 而不是一次传递一个文件。默认情况下,数据集应使用多线程。
- 自己穿线。拥有您提交路径的线程池,并让每个任务读取数据并将其附加到列表中。
最后,如果您可以控制数据,我建议您不要使用 parquet 作为格式,因为每个文件的行数如此之少,所以每列的开销很大。 Avro 可能是一个不错的选择,或者如果可能的话,尝试合并每个文件的多行。