apache 箭头中的不一致模式

Non-consistent schema in apache arrow

主要问题: 每批处理数据批次时,如何处理 pyarrow 中不断变化的架构?

长话短说:

举个例子,我有以下数据

| col_a | col_b |
-----------------
|  10   |  42   |
|  41   |  21   |
| 'foo' |  11   |
| 'bar' |  99   |

我正在使用 python 3.7 并使用 pandas 1.1.0。

>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
  col_a  col_b
0    10     42
1    41     21
2   foo     11
3   bar     99
>>> df.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

我需要使用 PyArrow 1.0.1 实现开始使用 Apache Arrow。在我的应用程序中,我们每批处理一批。这意味着我们看到了部分数据,因此是部分数据类型。

>>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
   col_a  col_b
0     10     42
1     41     21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
  col_a  col_b
2   foo     11
3   bar     99
>>> dfg2
  col_a  col_b
2   foo     11
3   bar     99
>>> sub_1.dtypes
col_a    int64
col_b    int64
dtype: object 
>>> sub_2.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

我的目标是在每批工作批次的约束下使用 Apache Arrow 的镶木地板格式保留整个数据帧。它要求我们正确地填充模式。如何处理随批次变化的数据类型? 这是使用上述数据重现问题的完整代码。

from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd

pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)

# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)

# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2)  # This will fail bcs batch_to_write_2 is not defined

我们得到以下异常

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column     
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)

此行为是有意的。尝试一些替代方案(我相信它们应该有效,但我还没有测试所有的方案):

  1. 如果您预先知道最终模式,请在 pyarrow 中手动构建它,而不是依赖从第一个记录批次中推断出的模式。
  2. 遍历所有数据并计算最终架构。然后使用新模式重新处理数据。
  3. 检测架构更改和 recast 以前的记录批次。
  4. 检测模式更改并开始一个新的 table(然后您将以每个模式一个 parquet 文件结束,并且您将需要另一个进程来统一模式)。

最后,如果它有效,并且您正在尝试转换 CSV 数据,您可以考虑使用内置的 Arrow CSV 解析器。