为什么使用 Parquet 数据集读取一小部分行比读取整个文件花费相同的时间?

Why reading an small subset of the rows with Parquet Dataset take the same time than reading the whole file?

我正在开发一个程序来分析某些资产的一些历史价格。数据作为 pandas 数据框进行结构化和分析。列是日期,行是资产。以前我使用的是这个的转置,但这种格式给了我更好的阅读时间。我将这些数据保存在镶木地板文件中,现在我想读取从 A 到 B 的日期间隔和一小部分资产,对其进行分析,然后使用相同的资产重复相同的过程,但在 B + 的间隔内1 到 C。 问题是,即使我使用唯一的行,镶木地板读取的时间与读取整个文件的时间相同。有没有办法改善这种行为?,这很好,一旦它过滤了行,它就会保存内存中块的位置以加快下一次读取。我必须写一个新文件过滤资产吗?

我尝试用少量的行组和较小的数据页大小编写 parquet 文件以避免完整读取,但这并没有给我带来时间上的好结果。

我还有以下问题。为什么如果我们使用 Parquet 数据集和 use_legacy_dataset = False 读取完整的 parquet 文件,它比使用 use_legacy_dataset = True 读取相同的 parquet 数据集需要更多时间?

代码示例:

import pandas as pd 
import numpy as np
import time
import pyarrow.parquet as pq

# generating the small data for the example, the file weight like 150MB for this example, the real data 
# has 2 GB
dates = pd.bdate_range('2019-01-01', '2020-03-01')
assets = list(range(1000, 50000))

historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')

# name of the index
historical_prices.index.name = 'assets'

# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
    'historical_prices.parquet', 
    version='2.0', 
    data_page_version='2.0', 
    writer_engine_version='2.0',
    # row_group_size=100,
    # compression=None
    # use_dictionary=False,
    # data_page_size=1000,
    # use_byte_stream_split=True,
    # flavor='spark',
)


# reading the complete parquet dataset 
start_time = time.time()

historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)


# Reading only one asset of the parquet dataset
start_time = time.time()


filters = [('assets', '=', assets[0])]
historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    filters=filters, 
    use_legacy_dataset=False
)

historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)

# this is what I want to do, read by intervals.
num_intervals = 5

for i in range(num_intervals):
    start = int(i * len(dates) / num_intervals)
    end = int((i + 1) * len(dates) / num_intervals)
    interval = list(dates[start:end].strftime('%Y-%m-%d'))
    historical_prices_dataset.read_pandas(columns=interval).to_pandas()

    # Here goes some analyzing process that can't be done in parallel due that the results of every interval
    # are used in the next interval

print(time.time() - start_time)


I was using the transpose of this, but this format gave me better reading time.

Parquet 支持单独的列读取。因此,如果您有 10 列 10k 行并且您想要 5 列,那么您将读取 50k 个单元格。如果您有 10k 列 10 行并且您想要 5 列,那么您将读取 50 个单元格。所以大概这就是为什么转置给你更好的阅读时间的原因。我认为我在这里没有足够的细节。 Parquet 还支持读取单个行组,稍后会详细介绍。

您有大约 49,000 个资产和 300 个日期。我希望您将资产作为列获得更好的性能,但 49,000 是很多列。可能是您不得不读取过多的列元数据,或者您正在处理 CPU 跟踪这么多列的开销。

将日期值或资产 ID 作为列有点奇怪。一个更典型的布局是包含三列:“日期”、“资产 ID”和“价格”。

The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file

是的,如果您只有一个行组。 Parquet 不支持部分行组读取。我相信这是因为列被压缩了。但是,我没有得到与您相同的结果。您示例中的中间时间(单个资产读取)通常是第一次读取时间的 60-70%。所以它更快。可能只是因为要达到 pandas 的转换较少,或者可能有一些我不知道的优化。

The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file. Is there a way to improve this behaviour?, It would be good that, once it filter the rows, it saves where the blocks in memory are to speed up the nexts reads. Do I have to write a new file with the assets filtered?.

行组可能就是您的答案。请参阅下一节。

I tried writing the parquet file with a small number of row groups and smaller data page size to avoid the complete reading, but this doesn't gave me a good results in terms of time.

这可能就是您想要的(或者您可以使用多个文件)。 Parquet 支持从整个文件中只读取一个行组。但是,100 对于 row_group_size 来说太小了。每个行组在文件中创建一定量的元数据,并有一些处理开销。例如,如果我将其更改为 10,000,则中间读取的速度会提高两倍(现在只有完整 table 读取的 30-40%)。

Other question that I have is the follwing. Why if we read the complete parquet file using a Parquet Dataset and use_legacy_dataset = False, it takes more time than reading the same parquet dataset with use_legacy_dataset = True?

这个新数据集 API 非常新(7 月发布的 1.0.0 是新的)。可能会有更多的开销。您没有做任何可以利用新数据集 API 的事情(例如使用扫描或 non-parquet 数据集或新文件系统)。因此,虽然 use_legacy_datasets 不应该更快,但也不应该更慢。他们应该花费大致相同的时间。


听起来你有很多资产(数万),你想阅读其中的一些。您还希望将读取分块为更小的读取(您正在使用日期)。

首先,我建议使用 dataset.scan (https://arrow.apache.org/docs/python/dataset.html) 而不是完全使用日期。这将允许您一次处理一行数据。

其次,有什么方法可以对资产 ID 进行分组吗?如果每个资产 ID 只有一行,您可以忽略它。但是,如果您(例如)每个资产 ID 有 500 行(或每个资产 ID/date 对有 1 行),您可以编写您的文件,使其看起来像这样......

asset_id  date  price
A         1     ?
A         2     ?
A         3     ?
B         1     ?
B         2     ?
B         3     ?

如果你这样做并且将行组大小设置为合理的值(尝试 10k 或 100k 然后从那里优化)那么你应该能够得到它,这样你每次只读取 1 或 2 个行组资产 ID.

我找到了另一种方法,可以为我的特定情况提供更好的时间,当然,这不是一个非常通用的解决方案。它有一些不是 pyarrow 的功能,但它做了我认为当我们多次读取相同行时 pyarrow 的过滤器所做的事情。当要读取的行组数量增加时,parquet 数据集提供了更好的性能。

import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict, Any, List


class PriceGroupReader:
    def __init__(self, filename: str, assets: List[int]):
        self.price_file = pq.ParquetFile(filename)
        self.assets = assets
        self.valid_groups = self._get_valid_row_groups()

    def _get_valid_row_groups(self):
        """
        I don't fine a parquet function to make this row group search, so I did this manual search.
        Note: The assets index is sorted, so probably this can be improved a lot.
        """
        start_time = time.time()
        assets = pd.Index(self.assets)
        valid_row_groups = []
        index_position = self.price_file.schema.names.index("assets")

        for i in range(self.price_file.num_row_groups):
            row_group = self.price_file.metadata.row_group(i)
            statistics = row_group.column(index_position).statistics
            if np.any((statistics.min <= assets) & (assets <= statistics.max)):
                valid_row_groups.append(i)

        print("getting the row groups: {}".format(time.time() - start_time))
        return valid_row_groups

    def read_valid_row_groups(self, dates: List[str]):
        
        row_groups = []
        for row_group_pos in self.valid_groups:
            df = self.price_file.read_row_group(row_group_pos, columns=dates, use_pandas_metadata=True).to_pandas()
            df = df.loc[df.index.isin(self.assets)]
            row_groups.append(df)

        df = pd.concat(row_groups)
    

        """
        # This is another way to read the groups but I think it can consume more memory, probably is faster.
        df = self.price_file.read_row_groups(self.valid_groups, columns=dates, use_pandas_metadata=True).to_pandas()
        df = df.loc[df.index.isin(self.assets)]
        """
        
        return df


def write_prices(assets: List[int], dates: List[str]):
    historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)

    # name of the index
    historical_prices.index.name = 'assets'

    # writing the parquet file using the lastest version, in the comments are the thigns that I tested
    historical_prices.to_parquet(
        'historical_prices.parquet',
        version='2.0',
        data_page_version='2.0',
        writer_engine_version='2.0',
        row_group_size=4000,
        # compression=None
        # use_dictionary=False,
        # data_page_size=1000,
        # use_byte_stream_split=True,
        # flavor='spark',
    )


# generating the small data for the example, the file weight like 150MB, the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01', '2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000, 50000))
write_prices(total_assets, total_dates)

# selecting a subset of the whole assets
valid_assets = total_assets[:3000]

# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet', valid_assets)

# reading all the dates, only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))

# this is what I want to do, read by intervals.
num_intervals = 5

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_group_reader.read_valid_row_groups(interval)
    # print(df)

print("interval reading: {}".format(time.time() - start_time))


filters = [('assets', 'in', valid_assets)]
price_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    filters=filters, 
    use_legacy_dataset=False
)

start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_dataset.read_pandas(columns=interval).to_pandas()

print("interval reading with parquet dataset: {}".format(time.time() - start_time))