apache 箭头中的不一致模式
Non-consistent schema in apache arrow
主要问题:
每批处理数据批次时,如何处理 pyarrow 中不断变化的架构?
长话短说:
举个例子,我有以下数据
| col_a | col_b |
-----------------
| 10 | 42 |
| 41 | 21 |
| 'foo' | 11 |
| 'bar' | 99 |
我正在使用 python 3.7 并使用 pandas 1.1.0。
>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
col_a col_b
0 10 42
1 41 21
2 foo 11
3 bar 99
>>> df.dtypes
col_a object
col_b int64
dtype: object
>>>
我需要使用 PyArrow 1.0.1 实现开始使用 Apache Arrow。在我的应用程序中,我们每批处理一批。这意味着我们看到了部分数据,因此是部分数据类型。
>>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
col_a col_b
0 10 42
1 41 21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
col_a col_b
2 foo 11
3 bar 99
>>> dfg2
col_a col_b
2 foo 11
3 bar 99
>>> sub_1.dtypes
col_a int64
col_b int64
dtype: object
>>> sub_2.dtypes
col_a object
col_b int64
dtype: object
>>>
我的目标是在每批工作批次的约束下使用 Apache Arrow 的镶木地板格式保留整个数据帧。它要求我们正确地填充模式。如何处理随批次变化的数据类型?
这是使用上述数据重现问题的完整代码。
from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd
pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)
# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)
# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2) # This will fail bcs batch_to_write_2 is not defined
我们得到以下异常
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)
此行为是有意的。尝试一些替代方案(我相信它们应该有效,但我还没有测试所有的方案):
- 如果您预先知道最终模式,请在 pyarrow 中手动构建它,而不是依赖从第一个记录批次中推断出的模式。
- 遍历所有数据并计算最终架构。然后使用新模式重新处理数据。
- 检测架构更改和 recast 以前的记录批次。
- 检测模式更改并开始一个新的 table(然后您将以每个模式一个 parquet 文件结束,并且您将需要另一个进程来统一模式)。
最后,如果它有效,并且您正在尝试转换 CSV 数据,您可以考虑使用内置的 Arrow CSV 解析器。
主要问题: 每批处理数据批次时,如何处理 pyarrow 中不断变化的架构?
长话短说:
举个例子,我有以下数据
| col_a | col_b |
-----------------
| 10 | 42 |
| 41 | 21 |
| 'foo' | 11 |
| 'bar' | 99 |
我正在使用 python 3.7 并使用 pandas 1.1.0。
>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
col_a col_b
0 10 42
1 41 21
2 foo 11
3 bar 99
>>> df.dtypes
col_a object
col_b int64
dtype: object
>>>
我需要使用 PyArrow 1.0.1 实现开始使用 Apache Arrow。在我的应用程序中,我们每批处理一批。这意味着我们看到了部分数据,因此是部分数据类型。
>>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
col_a col_b
0 10 42
1 41 21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
col_a col_b
2 foo 11
3 bar 99
>>> dfg2
col_a col_b
2 foo 11
3 bar 99
>>> sub_1.dtypes
col_a int64
col_b int64
dtype: object
>>> sub_2.dtypes
col_a object
col_b int64
dtype: object
>>>
我的目标是在每批工作批次的约束下使用 Apache Arrow 的镶木地板格式保留整个数据帧。它要求我们正确地填充模式。如何处理随批次变化的数据类型? 这是使用上述数据重现问题的完整代码。
from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd
pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)
# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)
# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2) # This will fail bcs batch_to_write_2 is not defined
我们得到以下异常
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
for c, f in zip(columns_to_convert, convert_fields)]
File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)
此行为是有意的。尝试一些替代方案(我相信它们应该有效,但我还没有测试所有的方案):
- 如果您预先知道最终模式,请在 pyarrow 中手动构建它,而不是依赖从第一个记录批次中推断出的模式。
- 遍历所有数据并计算最终架构。然后使用新模式重新处理数据。
- 检测架构更改和 recast 以前的记录批次。
- 检测模式更改并开始一个新的 table(然后您将以每个模式一个 parquet 文件结束,并且您将需要另一个进程来统一模式)。
最后,如果它有效,并且您正在尝试转换 CSV 数据,您可以考虑使用内置的 Arrow CSV 解析器。