为什么使用 Parquet 数据集读取一小部分行比读取整个文件花费相同的时间?
Why reading an small subset of the rows with Parquet Dataset take the same time than reading the whole file?
我正在开发一个程序来分析某些资产的一些历史价格。数据作为 pandas 数据框进行结构化和分析。列是日期,行是资产。以前我使用的是这个的转置,但这种格式给了我更好的阅读时间。我将这些数据保存在镶木地板文件中,现在我想读取从 A 到 B 的日期间隔和一小部分资产,对其进行分析,然后使用相同的资产重复相同的过程,但在 B + 的间隔内1 到 C。
问题是,即使我使用唯一的行,镶木地板读取的时间与读取整个文件的时间相同。有没有办法改善这种行为?,这很好,一旦它过滤了行,它就会保存内存中块的位置以加快下一次读取。我必须写一个新文件过滤资产吗?
我尝试用少量的行组和较小的数据页大小编写 parquet 文件以避免完整读取,但这并没有给我带来时间上的好结果。
我还有以下问题。为什么如果我们使用 Parquet 数据集和 use_legacy_dataset = False 读取完整的 parquet 文件,它比使用 use_legacy_dataset = True 读取相同的 parquet 数据集需要更多时间?
代码示例:
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
# generating the small data for the example, the file weight like 150MB for this example, the real data
# has 2 GB
dates = pd.bdate_range('2019-01-01', '2020-03-01')
assets = list(range(1000, 50000))
historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',
version='2.0',
data_page_version='2.0',
writer_engine_version='2.0',
# row_group_size=100,
# compression=None
# use_dictionary=False,
# data_page_size=1000,
# use_byte_stream_split=True,
# flavor='spark',
)
# reading the complete parquet dataset
start_time = time.time()
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',
use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# Reading only one asset of the parquet dataset
start_time = time.time()
filters = [('assets', '=', assets[0])]
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',
filters=filters,
use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# this is what I want to do, read by intervals.
num_intervals = 5
for i in range(num_intervals):
start = int(i * len(dates) / num_intervals)
end = int((i + 1) * len(dates) / num_intervals)
interval = list(dates[start:end].strftime('%Y-%m-%d'))
historical_prices_dataset.read_pandas(columns=interval).to_pandas()
# Here goes some analyzing process that can't be done in parallel due that the results of every interval
# are used in the next interval
print(time.time() - start_time)
I was using the transpose of this, but this format gave me better reading time.
Parquet 支持单独的列读取。因此,如果您有 10 列 10k 行并且您想要 5 列,那么您将读取 50k 个单元格。如果您有 10k 列 10 行并且您想要 5 列,那么您将读取 50 个单元格。所以大概这就是为什么转置给你更好的阅读时间的原因。我认为我在这里没有足够的细节。 Parquet 还支持读取单个行组,稍后会详细介绍。
您有大约 49,000 个资产和 300 个日期。我希望您将资产作为列获得更好的性能,但 49,000 是很多列。可能是您不得不读取过多的列元数据,或者您正在处理 CPU 跟踪这么多列的开销。
将日期值或资产 ID 作为列有点奇怪。一个更典型的布局是包含三列:“日期”、“资产 ID”和“价格”。
The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file
是的,如果您只有一个行组。 Parquet 不支持部分行组读取。我相信这是因为列被压缩了。但是,我没有得到与您相同的结果。您示例中的中间时间(单个资产读取)通常是第一次读取时间的 60-70%。所以它更快。可能只是因为要达到 pandas 的转换较少,或者可能有一些我不知道的优化。
The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file. Is there a way to improve this behaviour?, It would be good that, once it filter the rows, it saves where the blocks in memory are to speed up the nexts reads. Do I have to write a new file with the assets filtered?.
行组可能就是您的答案。请参阅下一节。
I tried writing the parquet file with a small number of row groups and smaller data page size to avoid the complete reading, but this doesn't gave me a good results in terms of time.
这可能就是您想要的(或者您可以使用多个文件)。 Parquet 支持从整个文件中只读取一个行组。但是,100 对于 row_group_size 来说太小了。每个行组在文件中创建一定量的元数据,并有一些处理开销。例如,如果我将其更改为 10,000,则中间读取的速度会提高两倍(现在只有完整 table 读取的 30-40%)。
Other question that I have is the follwing. Why if we read the complete parquet file using a Parquet Dataset and use_legacy_dataset = False, it takes more time than reading the same parquet dataset with use_legacy_dataset = True?
这个新数据集 API 非常新(7 月发布的 1.0.0 是新的)。可能会有更多的开销。您没有做任何可以利用新数据集 API 的事情(例如使用扫描或 non-parquet 数据集或新文件系统)。因此,虽然 use_legacy_datasets 不应该更快,但也不应该更慢。他们应该花费大致相同的时间。
听起来你有很多资产(数万),你想阅读其中的一些。您还希望将读取分块为更小的读取(您正在使用日期)。
首先,我建议使用 dataset.scan
(https://arrow.apache.org/docs/python/dataset.html) 而不是完全使用日期。这将允许您一次处理一行数据。
其次,有什么方法可以对资产 ID 进行分组吗?如果每个资产 ID 只有一行,您可以忽略它。但是,如果您(例如)每个资产 ID 有 500 行(或每个资产 ID/date 对有 1 行),您可以编写您的文件,使其看起来像这样......
asset_id date price
A 1 ?
A 2 ?
A 3 ?
B 1 ?
B 2 ?
B 3 ?
如果你这样做并且将行组大小设置为合理的值(尝试 10k 或 100k 然后从那里优化)那么你应该能够得到它,这样你每次只读取 1 或 2 个行组资产 ID.
我找到了另一种方法,可以为我的特定情况提供更好的时间,当然,这不是一个非常通用的解决方案。它有一些不是 pyarrow 的功能,但它做了我认为当我们多次读取相同行时 pyarrow 的过滤器所做的事情。当要读取的行组数量增加时,parquet 数据集提供了更好的性能。
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict, Any, List
class PriceGroupReader:
def __init__(self, filename: str, assets: List[int]):
self.price_file = pq.ParquetFile(filename)
self.assets = assets
self.valid_groups = self._get_valid_row_groups()
def _get_valid_row_groups(self):
"""
I don't fine a parquet function to make this row group search, so I did this manual search.
Note: The assets index is sorted, so probably this can be improved a lot.
"""
start_time = time.time()
assets = pd.Index(self.assets)
valid_row_groups = []
index_position = self.price_file.schema.names.index("assets")
for i in range(self.price_file.num_row_groups):
row_group = self.price_file.metadata.row_group(i)
statistics = row_group.column(index_position).statistics
if np.any((statistics.min <= assets) & (assets <= statistics.max)):
valid_row_groups.append(i)
print("getting the row groups: {}".format(time.time() - start_time))
return valid_row_groups
def read_valid_row_groups(self, dates: List[str]):
row_groups = []
for row_group_pos in self.valid_groups:
df = self.price_file.read_row_group(row_group_pos, columns=dates, use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
row_groups.append(df)
df = pd.concat(row_groups)
"""
# This is another way to read the groups but I think it can consume more memory, probably is faster.
df = self.price_file.read_row_groups(self.valid_groups, columns=dates, use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
"""
return df
def write_prices(assets: List[int], dates: List[str]):
historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',
version='2.0',
data_page_version='2.0',
writer_engine_version='2.0',
row_group_size=4000,
# compression=None
# use_dictionary=False,
# data_page_size=1000,
# use_byte_stream_split=True,
# flavor='spark',
)
# generating the small data for the example, the file weight like 150MB, the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01', '2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000, 50000))
write_prices(total_assets, total_dates)
# selecting a subset of the whole assets
valid_assets = total_assets[:3000]
# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet', valid_assets)
# reading all the dates, only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))
# this is what I want to do, read by intervals.
num_intervals = 5
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_group_reader.read_valid_row_groups(interval)
# print(df)
print("interval reading: {}".format(time.time() - start_time))
filters = [('assets', 'in', valid_assets)]
price_dataset = pq.ParquetDataset(
'historical_prices.parquet',
filters=filters,
use_legacy_dataset=False
)
start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_dataset.read_pandas(columns=interval).to_pandas()
print("interval reading with parquet dataset: {}".format(time.time() - start_time))
我正在开发一个程序来分析某些资产的一些历史价格。数据作为 pandas 数据框进行结构化和分析。列是日期,行是资产。以前我使用的是这个的转置,但这种格式给了我更好的阅读时间。我将这些数据保存在镶木地板文件中,现在我想读取从 A 到 B 的日期间隔和一小部分资产,对其进行分析,然后使用相同的资产重复相同的过程,但在 B + 的间隔内1 到 C。 问题是,即使我使用唯一的行,镶木地板读取的时间与读取整个文件的时间相同。有没有办法改善这种行为?,这很好,一旦它过滤了行,它就会保存内存中块的位置以加快下一次读取。我必须写一个新文件过滤资产吗?
我尝试用少量的行组和较小的数据页大小编写 parquet 文件以避免完整读取,但这并没有给我带来时间上的好结果。
我还有以下问题。为什么如果我们使用 Parquet 数据集和 use_legacy_dataset = False 读取完整的 parquet 文件,它比使用 use_legacy_dataset = True 读取相同的 parquet 数据集需要更多时间?
代码示例:
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
# generating the small data for the example, the file weight like 150MB for this example, the real data
# has 2 GB
dates = pd.bdate_range('2019-01-01', '2020-03-01')
assets = list(range(1000, 50000))
historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',
version='2.0',
data_page_version='2.0',
writer_engine_version='2.0',
# row_group_size=100,
# compression=None
# use_dictionary=False,
# data_page_size=1000,
# use_byte_stream_split=True,
# flavor='spark',
)
# reading the complete parquet dataset
start_time = time.time()
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',
use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# Reading only one asset of the parquet dataset
start_time = time.time()
filters = [('assets', '=', assets[0])]
historical_prices_dataset = pq.ParquetDataset(
'historical_prices.parquet',
filters=filters,
use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()
print(time.time() - start_time)
# this is what I want to do, read by intervals.
num_intervals = 5
for i in range(num_intervals):
start = int(i * len(dates) / num_intervals)
end = int((i + 1) * len(dates) / num_intervals)
interval = list(dates[start:end].strftime('%Y-%m-%d'))
historical_prices_dataset.read_pandas(columns=interval).to_pandas()
# Here goes some analyzing process that can't be done in parallel due that the results of every interval
# are used in the next interval
print(time.time() - start_time)
I was using the transpose of this, but this format gave me better reading time.
Parquet 支持单独的列读取。因此,如果您有 10 列 10k 行并且您想要 5 列,那么您将读取 50k 个单元格。如果您有 10k 列 10 行并且您想要 5 列,那么您将读取 50 个单元格。所以大概这就是为什么转置给你更好的阅读时间的原因。我认为我在这里没有足够的细节。 Parquet 还支持读取单个行组,稍后会详细介绍。
您有大约 49,000 个资产和 300 个日期。我希望您将资产作为列获得更好的性能,但 49,000 是很多列。可能是您不得不读取过多的列元数据,或者您正在处理 CPU 跟踪这么多列的开销。
将日期值或资产 ID 作为列有点奇怪。一个更典型的布局是包含三列:“日期”、“资产 ID”和“价格”。
The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file
是的,如果您只有一个行组。 Parquet 不支持部分行组读取。我相信这是因为列被压缩了。但是,我没有得到与您相同的结果。您示例中的中间时间(单个资产读取)通常是第一次读取时间的 60-70%。所以它更快。可能只是因为要达到 pandas 的转换较少,或者可能有一些我不知道的优化。
The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file. Is there a way to improve this behaviour?, It would be good that, once it filter the rows, it saves where the blocks in memory are to speed up the nexts reads. Do I have to write a new file with the assets filtered?.
行组可能就是您的答案。请参阅下一节。
I tried writing the parquet file with a small number of row groups and smaller data page size to avoid the complete reading, but this doesn't gave me a good results in terms of time.
这可能就是您想要的(或者您可以使用多个文件)。 Parquet 支持从整个文件中只读取一个行组。但是,100 对于 row_group_size 来说太小了。每个行组在文件中创建一定量的元数据,并有一些处理开销。例如,如果我将其更改为 10,000,则中间读取的速度会提高两倍(现在只有完整 table 读取的 30-40%)。
Other question that I have is the follwing. Why if we read the complete parquet file using a Parquet Dataset and use_legacy_dataset = False, it takes more time than reading the same parquet dataset with use_legacy_dataset = True?
这个新数据集 API 非常新(7 月发布的 1.0.0 是新的)。可能会有更多的开销。您没有做任何可以利用新数据集 API 的事情(例如使用扫描或 non-parquet 数据集或新文件系统)。因此,虽然 use_legacy_datasets 不应该更快,但也不应该更慢。他们应该花费大致相同的时间。
听起来你有很多资产(数万),你想阅读其中的一些。您还希望将读取分块为更小的读取(您正在使用日期)。
首先,我建议使用 dataset.scan
(https://arrow.apache.org/docs/python/dataset.html) 而不是完全使用日期。这将允许您一次处理一行数据。
其次,有什么方法可以对资产 ID 进行分组吗?如果每个资产 ID 只有一行,您可以忽略它。但是,如果您(例如)每个资产 ID 有 500 行(或每个资产 ID/date 对有 1 行),您可以编写您的文件,使其看起来像这样......
asset_id date price
A 1 ?
A 2 ?
A 3 ?
B 1 ?
B 2 ?
B 3 ?
如果你这样做并且将行组大小设置为合理的值(尝试 10k 或 100k 然后从那里优化)那么你应该能够得到它,这样你每次只读取 1 或 2 个行组资产 ID.
我找到了另一种方法,可以为我的特定情况提供更好的时间,当然,这不是一个非常通用的解决方案。它有一些不是 pyarrow 的功能,但它做了我认为当我们多次读取相同行时 pyarrow 的过滤器所做的事情。当要读取的行组数量增加时,parquet 数据集提供了更好的性能。
import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict, Any, List
class PriceGroupReader:
def __init__(self, filename: str, assets: List[int]):
self.price_file = pq.ParquetFile(filename)
self.assets = assets
self.valid_groups = self._get_valid_row_groups()
def _get_valid_row_groups(self):
"""
I don't fine a parquet function to make this row group search, so I did this manual search.
Note: The assets index is sorted, so probably this can be improved a lot.
"""
start_time = time.time()
assets = pd.Index(self.assets)
valid_row_groups = []
index_position = self.price_file.schema.names.index("assets")
for i in range(self.price_file.num_row_groups):
row_group = self.price_file.metadata.row_group(i)
statistics = row_group.column(index_position).statistics
if np.any((statistics.min <= assets) & (assets <= statistics.max)):
valid_row_groups.append(i)
print("getting the row groups: {}".format(time.time() - start_time))
return valid_row_groups
def read_valid_row_groups(self, dates: List[str]):
row_groups = []
for row_group_pos in self.valid_groups:
df = self.price_file.read_row_group(row_group_pos, columns=dates, use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
row_groups.append(df)
df = pd.concat(row_groups)
"""
# This is another way to read the groups but I think it can consume more memory, probably is faster.
df = self.price_file.read_row_groups(self.valid_groups, columns=dates, use_pandas_metadata=True).to_pandas()
df = df.loc[df.index.isin(self.assets)]
"""
return df
def write_prices(assets: List[int], dates: List[str]):
historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
# name of the index
historical_prices.index.name = 'assets'
# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
'historical_prices.parquet',
version='2.0',
data_page_version='2.0',
writer_engine_version='2.0',
row_group_size=4000,
# compression=None
# use_dictionary=False,
# data_page_size=1000,
# use_byte_stream_split=True,
# flavor='spark',
)
# generating the small data for the example, the file weight like 150MB, the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01', '2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000, 50000))
write_prices(total_assets, total_dates)
# selecting a subset of the whole assets
valid_assets = total_assets[:3000]
# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet', valid_assets)
# reading all the dates, only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))
# this is what I want to do, read by intervals.
num_intervals = 5
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_group_reader.read_valid_row_groups(interval)
# print(df)
print("interval reading: {}".format(time.time() - start_time))
filters = [('assets', 'in', valid_assets)]
price_dataset = pq.ParquetDataset(
'historical_prices.parquet',
filters=filters,
use_legacy_dataset=False
)
start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))
start_time = time.time()
for i in range(num_intervals):
start = int(i * len(total_dates) / num_intervals)
end = int((i + 1) * len(total_dates) / num_intervals)
interval = list(total_dates[start:end])
df = price_dataset.read_pandas(columns=interval).to_pandas()
print("interval reading with parquet dataset: {}".format(time.time() - start_time))