pyarrow - 确定编写镶木地板数据集时编写的片段或使用的过滤器?
pyarrow - identify the fragments written or filters used when writing a parquet dataset?
我的用例是我想将文件路径或过滤器作为 xcom 传递给 Airflow 中的任务,以便我的下一个任务可以读取刚刚处理的数据。
任务 A 将 table 写入分区数据集,并生成许多 Parquet 文件片段 --> 任务 B 稍后将这些片段作为数据集读取。不过,我只需要读取相关数据,而不是可能有数百万行的整个数据集。
我测试了两种方法:
- 在我完成写入数据集后立即列出修改过的文件。这将为我提供一个路径列表,我可以在下一个任务中调用 ds.dataset(paths)。我可以在这些路径上使用
partitioning.parse()
或检查片段以获取使用的过滤器列表 (frag.partition_expression
)
这样做的一个缺陷是我可以将文件并行写入同一数据集。
- 我可以生成写入数据集时使用的过滤器,方法是将 table 转换为 pandas 数据框,进行分组,然后构建过滤器。我不确定是否有更简单的方法。然后我可以在结果上使用 pq._filters_to_expression() 来创建一个可用的过滤器。
这并不理想,因为我需要修复某些数据类型,这些数据类型无法正确保存为 Airflow xcom(没有酸洗,所以所有内容都必须采用 json 格式)。此外,如果我想在字典列上进行分区,我可能需要调整此功能。
def create_filter_list(df, partition_columns):
"""Creates a list of pyarrow filters to be sent through an xcom and evaluated as an expression. Xcom disables pickling, so we need to save timestamp and date values as strings and convert downstream"""
filter_list = []
value_list = []
partition_keys = [df[col] for col in partition_columns]
for keys, _ in df[partition_columns].groupby(partition_keys):
if len(partition_columns) == 1:
if is_jsonable(keys):
value_list.append(keys)
elif keys is not None:
value_list.append(str(keys))
else:
if not isinstance(keys, tuple):
keys = (keys,)
read_filter = []
for name, val in zip(partition_columns, keys):
if type(val) == np.int_:
read_filter.append((name, "==", int(val)))
elif val is not None:
read_filter.append((name, "==", str(val)))
filter_list.append(read_filter)
if len(partition_columns) == 1:
if len(value_list) > 0:
filter_list = [(name, "in", value_list) for name in partition_columns]
return filter_list
关于我应该采用哪种方法有什么建议,或者是否有更好的方法来实现我的目标?
一个建议(不确定这是否最适合您的用例):
关键问题是需要正确select数据的子集,这个可以'fixed'上游。更新大数据帧的 function/script 可以包含一个条件,用于在单独的(临时)路径中保存已修改并满足某些要求的数据的临时副本。然后这个文件将被传递给下游任务,一旦它被处理,它可以删除临时文件。
您可以观看本期 (https://issues.apache.org/jira/browse/ARROW-10440),我相信它可以满足您的需求。在此期间,您可以使用 basename_template
作为解决方法。
import glob
import os
import pyarrow as pa
import pyarrow.dataset as pads
class TrackingWriter:
def __init__(self):
self.counter = 0
part_schema = pa.schema({'part': pa.int64()})
self.partitioning = pads.HivePartitioning(part_schema)
def next_counter(self):
result = self.counter
self.counter += 1
return result
def write_dataset(self, table, base_dir):
counter = self.next_counter()
pads.write_dataset(table, base_dir, format='parquet', partitioning=self.partitioning, basename_template=f'batch-{counter}-part-{{i}}')
files_written = glob.glob(os.path.join(base_dir, '**', f'batch-{counter}-*'))
return files_written
table_one = pa.table({'part': [0, 0, 1, 1], 'val': [1, 2, 3, 4]})
table_two = pa.table({'part': [0, 0, 1, 1], 'val': [5, 6, 7, 8]})
writer = TrackingWriter()
print(writer.write_dataset(table_one, '/tmp/mydataset'))
print(writer.write_dataset(table_two, '/tmp/mydataset'))
这只是一个粗略的草图。您可能还希望在启动时将代码设为 运行,以查看 counter
的下一个自由值是什么。或者您可以使用 uuid
而不是计数器。
我的用例是我想将文件路径或过滤器作为 xcom 传递给 Airflow 中的任务,以便我的下一个任务可以读取刚刚处理的数据。
任务 A 将 table 写入分区数据集,并生成许多 Parquet 文件片段 --> 任务 B 稍后将这些片段作为数据集读取。不过,我只需要读取相关数据,而不是可能有数百万行的整个数据集。
我测试了两种方法:
- 在我完成写入数据集后立即列出修改过的文件。这将为我提供一个路径列表,我可以在下一个任务中调用 ds.dataset(paths)。我可以在这些路径上使用
partitioning.parse()
或检查片段以获取使用的过滤器列表 (frag.partition_expression
)
这样做的一个缺陷是我可以将文件并行写入同一数据集。
- 我可以生成写入数据集时使用的过滤器,方法是将 table 转换为 pandas 数据框,进行分组,然后构建过滤器。我不确定是否有更简单的方法。然后我可以在结果上使用 pq._filters_to_expression() 来创建一个可用的过滤器。
这并不理想,因为我需要修复某些数据类型,这些数据类型无法正确保存为 Airflow xcom(没有酸洗,所以所有内容都必须采用 json 格式)。此外,如果我想在字典列上进行分区,我可能需要调整此功能。
def create_filter_list(df, partition_columns):
"""Creates a list of pyarrow filters to be sent through an xcom and evaluated as an expression. Xcom disables pickling, so we need to save timestamp and date values as strings and convert downstream"""
filter_list = []
value_list = []
partition_keys = [df[col] for col in partition_columns]
for keys, _ in df[partition_columns].groupby(partition_keys):
if len(partition_columns) == 1:
if is_jsonable(keys):
value_list.append(keys)
elif keys is not None:
value_list.append(str(keys))
else:
if not isinstance(keys, tuple):
keys = (keys,)
read_filter = []
for name, val in zip(partition_columns, keys):
if type(val) == np.int_:
read_filter.append((name, "==", int(val)))
elif val is not None:
read_filter.append((name, "==", str(val)))
filter_list.append(read_filter)
if len(partition_columns) == 1:
if len(value_list) > 0:
filter_list = [(name, "in", value_list) for name in partition_columns]
return filter_list
关于我应该采用哪种方法有什么建议,或者是否有更好的方法来实现我的目标?
一个建议(不确定这是否最适合您的用例):
关键问题是需要正确select数据的子集,这个可以'fixed'上游。更新大数据帧的 function/script 可以包含一个条件,用于在单独的(临时)路径中保存已修改并满足某些要求的数据的临时副本。然后这个文件将被传递给下游任务,一旦它被处理,它可以删除临时文件。
您可以观看本期 (https://issues.apache.org/jira/browse/ARROW-10440),我相信它可以满足您的需求。在此期间,您可以使用 basename_template
作为解决方法。
import glob
import os
import pyarrow as pa
import pyarrow.dataset as pads
class TrackingWriter:
def __init__(self):
self.counter = 0
part_schema = pa.schema({'part': pa.int64()})
self.partitioning = pads.HivePartitioning(part_schema)
def next_counter(self):
result = self.counter
self.counter += 1
return result
def write_dataset(self, table, base_dir):
counter = self.next_counter()
pads.write_dataset(table, base_dir, format='parquet', partitioning=self.partitioning, basename_template=f'batch-{counter}-part-{{i}}')
files_written = glob.glob(os.path.join(base_dir, '**', f'batch-{counter}-*'))
return files_written
table_one = pa.table({'part': [0, 0, 1, 1], 'val': [1, 2, 3, 4]})
table_two = pa.table({'part': [0, 0, 1, 1], 'val': [5, 6, 7, 8]})
writer = TrackingWriter()
print(writer.write_dataset(table_one, '/tmp/mydataset'))
print(writer.write_dataset(table_two, '/tmp/mydataset'))
这只是一个粗略的草图。您可能还希望在启动时将代码设为 运行,以查看 counter
的下一个自由值是什么。或者您可以使用 uuid
而不是计数器。