使用 pyarrow 如何附加到镶木地板文件?

Using pyarrow how do you append to parquet file?

您如何 append/update 使用 pyarrowparquet 文件?

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


 table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
 table3 = pd.DataFrame({'six': [-1, np.nan, 2.5], 'nine': ['foo', 'bar', 'baz'], 'ten': [True, False, True]})


pq.write_table(table2, './dataNew/pqTest2.parquet')
#append pqTest2 here?  

我在文档中找不到任何关于附加镶木地板文件的内容。并且,您可以使用 pyarrow 和多处理来 insert/update 数据吗?

一般来说,Parquet 数据集由多个文件组成,因此您可以通过将附加文件写入数据所属的同一目录来追加。能够轻松连接多个文件会很有用。我打开 https://issues.apache.org/jira/browse/PARQUET-1154 以便在 C++ 中轻松完成此操作(因此 Python)

我 运行 遇到了同样的问题,我想我可以使用以下方法解决它:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


chunksize=10000 # this is the number of lines

pqwriter = None
for i, df in enumerate(pd.read_csv('sample.csv', chunksize=chunksize)):
    table = pa.Table.from_pandas(df)
    # for the first chunk of records
    if i == 0:
        # create a parquet write object giving it an output file
        pqwriter = pq.ParquetWriter('sample.parquet', table.schema)            
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()

在您的情况下,列名不一致,我使三个示例数据帧的列名一致,以下代码对我有用。

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.

    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.

    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer


if __name__ == '__main__':

    table1 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table3 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    writer = None
    filepath = '/tmp/verify_pyarrow_append.parquet'
    table_list = [table1, table2, table3]

    for table in table_list:
        writer = append_to_parquet_table(table, filepath, writer)

    if writer:
        writer.close()

    df = pd.read_parquet(filepath)
    print(df)

输出:

   one  three  two
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz

将 Pandas 数据帧附加到现有 .parquet 文件的演示。

Note: Other answers cannot append to existing .parquet files. This can; see discussion at end.

在 Windows 和 Linux 上的 Python v3.9 上测试。

使用 pip 安装 PyArrow:

pip install pyarrow==6.0.1

Anaconda / Miniconda:

conda install -c conda-forge pyarrow=6.0.1 -y

演示代码:

# Q. Demo?
# A. Demo of appending to an existing .parquet file by memory mapping the original file, appending the new dataframe, then writing the new file out.

import os
import numpy as np
import pandas as pd
import pyarrow as pa  
import pyarrow.parquet as pq  

filepath = "parquet_append.parquet"

方法 1 之 2

简单方法:使用 pandas,读取原始 .parquet 文件,追加,写回整个文件。

# Create parquet file.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
df.to_parquet(filepath)  # ... write to file.

# Append to original parquet file.
df = pd.read_parquet(filepath)  # Read original ...
df2 = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
df3 = pd.concat([df, df2])  # ... concatenate together ...
df3.to_parquet(filepath)  # ... overwrite original file.

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

方法 2 之 2

更复杂但更快:使用本机 PyArrow 调用、内存映射原始文件、附加新数据帧、写出新文件。

# Write initial file using PyArrow.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
table = pa.Table.from_pandas(df)
pq.write_table(table, where=filepath)

def parquet_append(filepath:Path or str, df: pd.DataFrame) -> None:
    """
    Append to dataframe to existing .parquet file. Reads original .parquet file in, appends new dataframe, writes new .parquet file out.
    :param filepath: Filepath for parquet file.
    :param df: Pandas dataframe to append. Must be same schema as original.
    """
    table_original_file = pq.read_table(source=filepath,  pre_buffer=False, use_threads=True, memory_map=True)  # Use memory map for speed.
    table_to_append = pa.Table.from_pandas(df)
    table_to_append = table_to_append.cast(table_original_file.schema)  # Attempt to cast new schema to existing, e.g. datetime64[ns] to datetime64[us] (may throw otherwise).
    handle = pq.ParquetWriter(filepath, table_original_file.schema)  # Overwrite old file with empty. WARNING: PRODUCTION LEVEL CODE SHOULD BE MORE ATOMIC: WRITE TO A TEMPORARY FILE, DELETE THE OLD, RENAME. THEN FAILURES WILL NOT LOSE DATA.
    handle.write_table(table_original_file)
    handle.write_table(table_to_append)
    handle.close()  # Writes binary footer. Until this occurs, .parquet file is not usable.

# Append to original parquet file.
df = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
parquet_append(filepath, df)

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

讨论

来自@Ibraheem Ibraheem 和@yardstick17 的答案不能用于附加到现有的 .parquet 文件:

  • 限制1:调用.close()后,无法追加文件。一旦写好页脚,一切就都固定了;
  • 限制 2:在调用 .close() 之前,任何其他程序都无法读取 .parquet 文件(由于缺少二进制页脚,它将抛出异常)。

结合起来,这些限制意味着它们不能用于附加到现有的 .parquet 文件,它们只能用于以块的形式写入 .parquet 文件。上面的技术消除了这些限制,但代价是效率较低,因为必须重写整个文件以附加到末尾。经过广泛的研究,我认为不可能使用现有的 PyArrow 库(从 v6.0.1 开始)附加到现有的 .parquet 文件。

可以修改它以将一个文件夹中的多个 .parquet 文件合并到一个 .parquet 文件中。

可以执行高效的更新插入:pq.read_table() 对列和行进行过滤,因此如果原始 table 中的行在加载时被过滤掉,则新的 table 将有效地取代旧的。这对时间序列数据更有用。