如何更新 pyarrow table 中的数据?
How to update data in pyarrow table?
我有一个 python 脚本,它使用 pyarrow 读取镶木地板文件。我正在尝试遍历 table 以更新其中的值。如果我试试这个:
for col_name in table2.column_names:
if col_name in my_columns:
print('updating values in column ' + col_name)
col_data = pa.Table.column(table2, col_name)
row_ct = 1
for i in col_data:
pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
row_ct += 1
我收到这个错误:
TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment
如何更新这些值?
我尝试使用 pandas,但它无法处理原始 table 中的空值,而且它还错误地转换了原始 table 中列的数据类型。 pyarrow 是否有本地方式来编辑数据?
在pyarrow中更新数组数据的原生方式是pyarrow compute functions。转换为您描述的 pandas 也是实现此目的的有效方法,因此您可能想要弄清楚这一点。但是,API 与您的方法不匹配。
您目前在 Python 函数 change_str
中决定每个项目的新值应该是什么。希望可以将您需要执行的操作表达为 pyarrow 计算函数的组合。这将避免将整个本机数组编组为 python 个对象的(昂贵的)成本。如果您可以在 change_str
中描述您想要实现的目标(可能是在一个新问题中),我可以帮助您解决问题。
如果出于某种原因,您必须将 change_str
保留在 Python 中,那么您将需要将整个列转换为 python 对象(这将导致相当大的性能损失) 使用 ChunkedArray.to_pylist()
箭头tables(和数组)是immutable。因此,您将无法就地更新 table。
实现这一点的方法是在修改数据时创建数据副本。 Arrow 支持对modify strings的一些基本操作,但是非常有限
另一种选择是使用 pandas,但正如您所注意到的,从箭头到 pandas 再返回并不是无缝的。
举个例子:
>>> table = pa.Table.from_arrays(
[
pa.array(['abc', 'def'], pa.string()),
pa.array([1, None], pa.int32()),
],
schema=pa.schema(
[
pa.field('str_col', pa.string()),
pa.field('int_col', pa.int32()),
]
)
)
>>> from_pandas = pa.Table.from_pandas(table.to_pandas())
>>> from_pandas.schema
str_col: string
int_col: double
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 487
您可以看到转换为 pandas 并返回已将 int 列的类型更改为 double。这是因为 pandas 不能很好地支持 null int 值,所以它将 int 列转换为 double。
为了避免这个问题,我建议逐列工作,只将字符串列转换为 pandas:
def my_func(value):
return 'hello ' + value + '!'
columns = []
my_columns = ['str_col']
for column_name in table.column_names:
column_data = table[column_name]
if column_name in my_columns:
column_data = pa.array(table['str_col'].to_pandas().apply(my_func))
columns.append(column_data)
updated_table = pa.Table.from_arrays(
columns,
schema=table.schema
)
>>> table['str_col']
<pyarrow.lib.ChunkedArray object at 0x7f05f42b3f40>
[
[
"hello abc!",
"hello def!"
]
]
我能够使用这些参考资料让它工作:
http://arrow.apache.org/docs/python/generated/pyarrow.Table.html
http://arrow.apache.org/docs/python/generated/pyarrow.Field.html
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_table.py
基本上它循环遍历原始 table 并创建新列 (pa.array),并将调整后的文本附加到新的 table。这可能不是最好的方法,但它奏效了。最重要的是,它让我保留空值并指定每列的数据类型。
import sys, getopt
import random
import re
import math
import pyarrow.parquet as pq
import pyarrow.csv as pcsv
import numpy as np
#import pandas as pd
import pyarrow as pa
import os.path
<a lot of other code here>
parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)
<a lot of other code here>
changed_ct = 0
all_cols_ct = 0
table3 = pa.Table.from_arrays([pa.array(range(0,table2.num_rows))], names=('0')) # CREATE TEMP COLUMN!!
#print(table3)
#exit()
changed_column_list = []
for col_name in table2.column_names:
print('processing column: ' + col_name)
new_list = []
col_data = pa.Table.column(table2, col_name)
col_data_type = table2.schema.field(col_name).type
printed_changed_flag = False
for i in col_data:
# GET STRING REPRESENTATION OF THE COLUMN DATA
if(col_data_type == 'string'):
col_str = pa.StringScalar.as_py(i)
elif(col_data_type == 'int32'):
col_str = pa.Int32Scalar.as_py(i)
elif(col_data_type == 'int64'):
col_str = pa.Int64Scalar.as_py(i)
if col_name in change_columns:
if printed_changed_flag == False:
print('changing values in column ' + col_name)
changed_column_list.append(col_name)
changed_ct += 1
printed_changed_flag = True
new_list.append(change_str(col_str))
else:
new_list.append(col_str)
#set data type for the column
if(col_data_type == 'string'):
col_data_type = pa.string()
elif(col_data_type == 'int32'):
col_data_type = pa.int32()
elif(col_data_type == 'int64'):
col_data_type = pa.int64()
arr = pa.array(new_list, type=col_data_type)
new_field = pa.field(col_name, col_data_type)
table3 = pa.Table.append_column(table3, new_field, arr)
all_cols_ct += 1
#for i in table3:
# print(i)
table3 = pa.Table.remove_column(table3, 0) # REMOVE TEMP COLUMN!!
#print(table2)
#print('-------------------')
#print(table3)
#exit()
print('changed ' + str(changed_ct) + ' columns:')
print(*changed_column_list, sep='\n')
# WRITE NEW PARQUET FILE
pa.parquet.write_table(table3, out_file)
我有一个 python 脚本,它使用 pyarrow 读取镶木地板文件。我正在尝试遍历 table 以更新其中的值。如果我试试这个:
for col_name in table2.column_names:
if col_name in my_columns:
print('updating values in column ' + col_name)
col_data = pa.Table.column(table2, col_name)
row_ct = 1
for i in col_data:
pa.Table.column(table2, col_name)[row_ct] = change_str(pa.StringScalar.as_py(i))
row_ct += 1
我收到这个错误:
TypeError: 'pyarrow.lib.ChunkedArray' object does not support item assignment
如何更新这些值?
我尝试使用 pandas,但它无法处理原始 table 中的空值,而且它还错误地转换了原始 table 中列的数据类型。 pyarrow 是否有本地方式来编辑数据?
在pyarrow中更新数组数据的原生方式是pyarrow compute functions。转换为您描述的 pandas 也是实现此目的的有效方法,因此您可能想要弄清楚这一点。但是,API 与您的方法不匹配。
您目前在 Python 函数 change_str
中决定每个项目的新值应该是什么。希望可以将您需要执行的操作表达为 pyarrow 计算函数的组合。这将避免将整个本机数组编组为 python 个对象的(昂贵的)成本。如果您可以在 change_str
中描述您想要实现的目标(可能是在一个新问题中),我可以帮助您解决问题。
如果出于某种原因,您必须将 change_str
保留在 Python 中,那么您将需要将整个列转换为 python 对象(这将导致相当大的性能损失) 使用 ChunkedArray.to_pylist()
箭头tables(和数组)是immutable。因此,您将无法就地更新 table。
实现这一点的方法是在修改数据时创建数据副本。 Arrow 支持对modify strings的一些基本操作,但是非常有限
另一种选择是使用 pandas,但正如您所注意到的,从箭头到 pandas 再返回并不是无缝的。
举个例子:
>>> table = pa.Table.from_arrays(
[
pa.array(['abc', 'def'], pa.string()),
pa.array([1, None], pa.int32()),
],
schema=pa.schema(
[
pa.field('str_col', pa.string()),
pa.field('int_col', pa.int32()),
]
)
)
>>> from_pandas = pa.Table.from_pandas(table.to_pandas())
>>> from_pandas.schema
str_col: string
int_col: double
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 487
您可以看到转换为 pandas 并返回已将 int 列的类型更改为 double。这是因为 pandas 不能很好地支持 null int 值,所以它将 int 列转换为 double。
为了避免这个问题,我建议逐列工作,只将字符串列转换为 pandas:
def my_func(value):
return 'hello ' + value + '!'
columns = []
my_columns = ['str_col']
for column_name in table.column_names:
column_data = table[column_name]
if column_name in my_columns:
column_data = pa.array(table['str_col'].to_pandas().apply(my_func))
columns.append(column_data)
updated_table = pa.Table.from_arrays(
columns,
schema=table.schema
)
>>> table['str_col']
<pyarrow.lib.ChunkedArray object at 0x7f05f42b3f40>
[
[
"hello abc!",
"hello def!"
]
]
我能够使用这些参考资料让它工作:
http://arrow.apache.org/docs/python/generated/pyarrow.Table.html
http://arrow.apache.org/docs/python/generated/pyarrow.Field.html
https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_table.py
基本上它循环遍历原始 table 并创建新列 (pa.array),并将调整后的文本附加到新的 table。这可能不是最好的方法,但它奏效了。最重要的是,它让我保留空值并指定每列的数据类型。
import sys, getopt
import random
import re
import math
import pyarrow.parquet as pq
import pyarrow.csv as pcsv
import numpy as np
#import pandas as pd
import pyarrow as pa
import os.path
<a lot of other code here>
parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)
<a lot of other code here>
changed_ct = 0
all_cols_ct = 0
table3 = pa.Table.from_arrays([pa.array(range(0,table2.num_rows))], names=('0')) # CREATE TEMP COLUMN!!
#print(table3)
#exit()
changed_column_list = []
for col_name in table2.column_names:
print('processing column: ' + col_name)
new_list = []
col_data = pa.Table.column(table2, col_name)
col_data_type = table2.schema.field(col_name).type
printed_changed_flag = False
for i in col_data:
# GET STRING REPRESENTATION OF THE COLUMN DATA
if(col_data_type == 'string'):
col_str = pa.StringScalar.as_py(i)
elif(col_data_type == 'int32'):
col_str = pa.Int32Scalar.as_py(i)
elif(col_data_type == 'int64'):
col_str = pa.Int64Scalar.as_py(i)
if col_name in change_columns:
if printed_changed_flag == False:
print('changing values in column ' + col_name)
changed_column_list.append(col_name)
changed_ct += 1
printed_changed_flag = True
new_list.append(change_str(col_str))
else:
new_list.append(col_str)
#set data type for the column
if(col_data_type == 'string'):
col_data_type = pa.string()
elif(col_data_type == 'int32'):
col_data_type = pa.int32()
elif(col_data_type == 'int64'):
col_data_type = pa.int64()
arr = pa.array(new_list, type=col_data_type)
new_field = pa.field(col_name, col_data_type)
table3 = pa.Table.append_column(table3, new_field, arr)
all_cols_ct += 1
#for i in table3:
# print(i)
table3 = pa.Table.remove_column(table3, 0) # REMOVE TEMP COLUMN!!
#print(table2)
#print('-------------------')
#print(table3)
#exit()
print('changed ' + str(changed_ct) + ' columns:')
print(*changed_column_list, sep='\n')
# WRITE NEW PARQUET FILE
pa.parquet.write_table(table3, out_file)