如何更新 pyarrow table 中的数据?

How to update data in pyarrow table?

我有一个 python 脚本,它使用 pyarrow 读取镶木地板文件。我正在尝试遍历 table 以更新其中的值。如果我试试这个:

for col_name in table2.column_names:
    if col_name in my_columns:
        print('updating values in column '  + col_name)
        
        col_data = pa.Table.column(table2, col_name)
        
        row_ct = 1
        for i in col_data:
            pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
            row_ct += 1

我收到这个错误:

 TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment

如何更新这些值?

我尝试使用 pandas,但它无法处理原始 table 中的空值,而且它还错误地转换了原始 table 中列的数据类型。 pyarrow 是否有本地方式来编辑数据?

在pyarrow中更新数组数据的原生方式是pyarrow compute functions。转换为您描述的 pandas 也是实现此目的的有效方法,因此您可能想要弄清楚这一点。但是,API 与您的方法不匹配。

您目前在 Python 函数 change_str 中决定每个项目的新值应该是什么。希望可以将您需要执行的操作表达为 pyarrow 计算函数的组合。这将避免将整个本机数组编组为 python 个对象的(昂贵的)成本。如果您可以在 change_str 中描述您想要实现的目标(可能是在一个新问题中),我可以帮助您解决问题。

如果出于某种原因,您必须将 change_str 保留在 Python 中,那么您将需要将整个列转换为 python 对象(这将导致相当大的性能损失) 使用 ChunkedArray.to_pylist()

箭头tables(和数组)是immutable。因此,您将无法就地更新 table。

实现这一点的方法是在修改数据时创建数据副本。 Arrow 支持对modify strings的一些基本操作,但是非常有限

另一种选择是使用 pandas,但正如您所注意到的,从箭头到 pandas 再返回并不是无缝的。

举个例子:

>>> table = pa.Table.from_arrays(
    [ 
        pa.array(['abc', 'def'], pa.string()),
        pa.array([1, None], pa.int32()),
    ],
    schema=pa.schema(
    [
        pa.field('str_col', pa.string()), 
        pa.field('int_col', pa.int32()), 
    ]
    )
)
>>> from_pandas = pa.Table.from_pandas(table.to_pandas())
>>> from_pandas.schema
str_col: string
int_col: double
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 487

您可以看到转换为 pandas 并返回已将 int 列的类型更改为 double。这是因为 pandas 不能很好地支持 null int 值,所以它将 int 列转换为 double。

为了避免这个问题,我建议逐列工作,只将字符串列转换为 pandas:

def my_func(value):
    return 'hello ' + value + '!'


columns = []
my_columns = ['str_col']
for column_name in table.column_names:
    column_data = table[column_name]
    if column_name in my_columns:
        column_data = pa.array(table['str_col'].to_pandas().apply(my_func))
    columns.append(column_data)

updated_table = pa.Table.from_arrays(
    columns, 
    schema=table.schema
)
>>> table['str_col']
<pyarrow.lib.ChunkedArray object at 0x7f05f42b3f40>
[
  [
    "hello abc!",
    "hello def!"
  ]
]

我能够使用这些参考资料让它工作:

http://arrow.apache.org/docs/python/generated/pyarrow.Table.html

http://arrow.apache.org/docs/python/generated/pyarrow.Field.html

https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_table.py

基本上它循环遍历原始 table 并创建新列 (pa.array),并将调整后的文本附加到新的 table。这可能不是最好的方法,但它奏效了。最重要的是,它让我保留空值并指定每列的数据类型。

import sys, getopt
import random
import re
import math

import pyarrow.parquet as pq
import pyarrow.csv as pcsv
import numpy as np
#import pandas as pd
import pyarrow as pa
import os.path

<a lot of other code here>

parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)

<a lot of other code here>

changed_ct = 0
all_cols_ct = 0
table3 = pa.Table.from_arrays([pa.array(range(0,table2.num_rows))], names=('0')) # CREATE TEMP COLUMN!!
#print(table3)
#exit()
changed_column_list = []
for col_name in table2.column_names:
    print('processing column: ' + col_name)
    new_list = []
    col_data = pa.Table.column(table2, col_name)
    col_data_type = table2.schema.field(col_name).type
    printed_changed_flag = False
    for i in col_data:
        # GET STRING REPRESENTATION OF THE COLUMN DATA
        if(col_data_type == 'string'):
            col_str = pa.StringScalar.as_py(i)
        elif(col_data_type == 'int32'):
            col_str = pa.Int32Scalar.as_py(i)
        elif(col_data_type == 'int64'):
            col_str = pa.Int64Scalar.as_py(i)
            
            
        if col_name in change_columns:
            if printed_changed_flag == False:
                print('changing values in column '  + col_name)
                changed_column_list.append(col_name)
                changed_ct += 1
                printed_changed_flag = True

            new_list.append(change_str(col_str))
        
        else:
            new_list.append(col_str)
        
    #set data type for the column
    if(col_data_type == 'string'):
        col_data_type = pa.string()
    elif(col_data_type == 'int32'):
        col_data_type = pa.int32()
    elif(col_data_type == 'int64'):
        col_data_type = pa.int64()
        
    arr = pa.array(new_list, type=col_data_type)
        
    new_field = pa.field(col_name, col_data_type)
    
    table3 = pa.Table.append_column(table3, new_field, arr)
        
    all_cols_ct += 1
    
#for i in table3:
#   print(i)

table3 = pa.Table.remove_column(table3, 0) # REMOVE TEMP COLUMN!!
#print(table2)
#print('-------------------')
#print(table3)
#exit()

print('changed ' + str(changed_ct) + ' columns:')
print(*changed_column_list, sep='\n')

# WRITE NEW PARQUET FILE
pa.parquet.write_table(table3, out_file)