Fastparquet 似乎没有下压过滤器
Fastparquet doesn't seem to be pushing down filters
我使用 dask 的数据帧 to_parquet
方法创建了一个镶木地板文件,使用 fastparquet
作为引擎。
使用 fastparquet.ParquetFile
读取文件我得到以下信息。
from fastparquet import ParquetFile
file = ParquetFile('data/raw_data_fastpar.par/')
file.dtypes
OrderedDict([(u'@timestamp', dtype('<M8[ns]')),
(u'@version', dtype('O')),
(u'_id', dtype('O')),
(u'browser_build', dtype('O')),
(u'browser_device', dtype('O')),
(u'browser_major', dtype('float64')),
(u'browser_minor', dtype('float64')),
(u'browser_name', dtype('O')),
(u'browser_os', dtype('O')),
(u'browser_os_name', dtype('O')),
(u'dst', dtype('O')),
(u'dst_port', dtype('float64')),
(u'http_req_header_contentlength', dtype('O')),
(u'http_req_header_host', dtype('O')),
(u'http_req_header_referer', dtype('O')),
(u'http_req_header_useragent', dtype('O')),
(u'http_req_headers', dtype('O')),
(u'http_req_method', dtype('O')),
(u'http_req_secondleveldomain', dtype('O')),
(u'http_req_url', dtype('O')),
(u'http_req_version', dtype('O')),
(u'http_resp_code', dtype('O')),
(u'http_resp_header_contentlength', dtype('O')),
(u'http_resp_header_contenttype', dtype('O')),
(u'http_resp_headers', dtype('O')),
(u'http_user', dtype('O')),
(u'received_from', dtype('O')),
(u'redis_db', dtype('O')),
(u'src', dtype('O')),
(u'src_port', dtype('float64')),
(u'type', dtype('O')),
(u'month', u'category'),
(u'day', u'category')])
file.schema.text
u'- schema: \n
| - @timestamp: INT64, TIMESTAMP_MICROS, OPTIONAL\n
| - @version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - _id: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_build: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_device: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_major: DOUBLE, OPTIONAL\n
| - browser_minor: DOUBLE, OPTIONAL\n
| - browser_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst_port: DOUBLE, OPTIONAL\n
| - http_req_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_host: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_referer: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_useragent: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_method: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_secondleveldomain: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_url: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_code: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contenttype: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_user: BYTE_ARRAY, UTF8, OPTIONAL\n
| - received_from: BYTE_ARRAY, UTF8, OPTIONAL\n
| - redis_db: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src_port: DOUBLE, OPTIONAL\n
| - type: BYTE_ARRAY, UTF8, OPTIONAL'
所以字段是正确的。由于它们是时间序列数据,因此使用月份和日期对数据进行分区。数据总数为 22815984
。现在,我尝试使用 filters 关键字读取镶木地板,但出现了奇怪的行为。
# this works
import datetime
since = datetime.datetime(year=2018, month=10, day=1)
filters = [('@timestamp', '>', np.datetime64(since)),]
raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'], filters=filters)
raw_data.count().compute()
http_user 3835971
dst 3835971
dst_port 3835971
http_req_method 3835971
dtype: int64
正确,过滤被下推。当我将过滤器更改为另一个字段时,
filters = [('http_req_method', '=', 'GET'),]
它取回所有数据
http_user 22815984
dst 22815984
dst_port 22815984
http_req_method 22815984
dtype: int64
手动执行,有效:
raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'])
raw_data.loc[raw_data.http_req_method == 'GET'].count().compute()
http_user 14407709
dst 14407709
dst_port 14407709
http_req_method 14407709
dtype: int64
同时将筛选器更改为不存在的字段,不会引发任何异常,所以这也很奇怪。关于镶木地板和过滤,我是否遗漏了什么?
Dask DataFrame Structure:
http_user dst dst_port http_req_method
npartitions=612
object object float64 object
... ... ... ...
... ... ... ...
... ... ... ... ...
... ... ... ...
Dask Name: read-parquet, 612 tasks
包含 filters=
选项作为对有意义的情况的优化,以避免考虑肯定不包含任何有效数据的数据部分。
在docs:
This implements row-group (partition) -level filtering only, i.e., to prevent the loading of some chunks of the data, and only if relevant statistics have been included in the metadata.
例如,如果您有一组行组,其中感兴趣的列单调递增,则该列上的过滤器可能能够排除许多行组(也称为分区)。另一方面,如果每个行组都包含该列整个范围内的值,那么这种过滤器将有任何效果。
data[raw_data.http_req_method == 'GET']
这做了一些不同的事情:现在每个行组都作为一个分区加载,然后在工作人员的内存中过滤。 Dask 可能仅在您过滤索引的特殊情况下才能加载某些分区。
如果您想要优化,但您的数据结构不合理,分区边界与您的过滤条件完全一致,则您需要同时使用这两种方法。
如果您认为文档字符串可以更清晰,请提出问题。
我使用 dask 的数据帧 to_parquet
方法创建了一个镶木地板文件,使用 fastparquet
作为引擎。
使用 fastparquet.ParquetFile
读取文件我得到以下信息。
from fastparquet import ParquetFile
file = ParquetFile('data/raw_data_fastpar.par/')
file.dtypes
OrderedDict([(u'@timestamp', dtype('<M8[ns]')),
(u'@version', dtype('O')),
(u'_id', dtype('O')),
(u'browser_build', dtype('O')),
(u'browser_device', dtype('O')),
(u'browser_major', dtype('float64')),
(u'browser_minor', dtype('float64')),
(u'browser_name', dtype('O')),
(u'browser_os', dtype('O')),
(u'browser_os_name', dtype('O')),
(u'dst', dtype('O')),
(u'dst_port', dtype('float64')),
(u'http_req_header_contentlength', dtype('O')),
(u'http_req_header_host', dtype('O')),
(u'http_req_header_referer', dtype('O')),
(u'http_req_header_useragent', dtype('O')),
(u'http_req_headers', dtype('O')),
(u'http_req_method', dtype('O')),
(u'http_req_secondleveldomain', dtype('O')),
(u'http_req_url', dtype('O')),
(u'http_req_version', dtype('O')),
(u'http_resp_code', dtype('O')),
(u'http_resp_header_contentlength', dtype('O')),
(u'http_resp_header_contenttype', dtype('O')),
(u'http_resp_headers', dtype('O')),
(u'http_user', dtype('O')),
(u'received_from', dtype('O')),
(u'redis_db', dtype('O')),
(u'src', dtype('O')),
(u'src_port', dtype('float64')),
(u'type', dtype('O')),
(u'month', u'category'),
(u'day', u'category')])
file.schema.text
u'- schema: \n
| - @timestamp: INT64, TIMESTAMP_MICROS, OPTIONAL\n
| - @version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - _id: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_build: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_device: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_major: DOUBLE, OPTIONAL\n
| - browser_minor: DOUBLE, OPTIONAL\n
| - browser_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst_port: DOUBLE, OPTIONAL\n
| - http_req_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_host: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_referer: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_useragent: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_method: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_secondleveldomain: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_url: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_code: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contenttype: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_user: BYTE_ARRAY, UTF8, OPTIONAL\n
| - received_from: BYTE_ARRAY, UTF8, OPTIONAL\n
| - redis_db: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src_port: DOUBLE, OPTIONAL\n
| - type: BYTE_ARRAY, UTF8, OPTIONAL'
所以字段是正确的。由于它们是时间序列数据,因此使用月份和日期对数据进行分区。数据总数为 22815984
。现在,我尝试使用 filters 关键字读取镶木地板,但出现了奇怪的行为。
# this works
import datetime
since = datetime.datetime(year=2018, month=10, day=1)
filters = [('@timestamp', '>', np.datetime64(since)),]
raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'], filters=filters)
raw_data.count().compute()
http_user 3835971
dst 3835971
dst_port 3835971
http_req_method 3835971
dtype: int64
正确,过滤被下推。当我将过滤器更改为另一个字段时,
filters = [('http_req_method', '=', 'GET'),]
它取回所有数据
http_user 22815984
dst 22815984
dst_port 22815984
http_req_method 22815984
dtype: int64
手动执行,有效:
raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'])
raw_data.loc[raw_data.http_req_method == 'GET'].count().compute()
http_user 14407709
dst 14407709
dst_port 14407709
http_req_method 14407709
dtype: int64
同时将筛选器更改为不存在的字段,不会引发任何异常,所以这也很奇怪。关于镶木地板和过滤,我是否遗漏了什么?
Dask DataFrame Structure:
http_user dst dst_port http_req_method
npartitions=612
object object float64 object
... ... ... ...
... ... ... ...
... ... ... ... ...
... ... ... ...
Dask Name: read-parquet, 612 tasks
包含 filters=
选项作为对有意义的情况的优化,以避免考虑肯定不包含任何有效数据的数据部分。
在docs:
This implements row-group (partition) -level filtering only, i.e., to prevent the loading of some chunks of the data, and only if relevant statistics have been included in the metadata.
例如,如果您有一组行组,其中感兴趣的列单调递增,则该列上的过滤器可能能够排除许多行组(也称为分区)。另一方面,如果每个行组都包含该列整个范围内的值,那么这种过滤器将有任何效果。
data[raw_data.http_req_method == 'GET']
这做了一些不同的事情:现在每个行组都作为一个分区加载,然后在工作人员的内存中过滤。 Dask 可能仅在您过滤索引的特殊情况下才能加载某些分区。
如果您想要优化,但您的数据结构不合理,分区边界与您的过滤条件完全一致,则您需要同时使用这两种方法。
如果您认为文档字符串可以更清晰,请提出问题。