通过 pyarrow 和(或)pandas in Python 构造镶木地板的列

Columns to Struct parquet by pyarrow and (or) pandas in Python

我希望你们中的一些人能抽出一点时间来帮助像我这样的初学者。我整整一周都在处理这项任务,但找不到解决方案。我理解并且完全同意我必须学习我使用的每个包及其组合以找到正确的解决方案。

完整的任务是将 5 列(1000 行)组合成 1 个结构列,并在 parquet 中将其 stored/transformed 排成一行(1000 列)。但我坚持将 5 列合并为 1 结构列的问题。

最初,我收到以下列:列=['date'、'bidopen'、'bidclose'、'bidhigh'、'bidlow'、'tickqty'].我不需要 'date' 成为结构的一部分。

我尝试了什么:

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

选项 1 - 带 pandas

的字典
df = pd.read_csv('original.csv')
df2 = df.drop(columns=['date'])
df3 = df2.to_dict('records')

我无法通过 pandas 将 dict 保存到 csv 或 parquet - 以下 2 个命令导致向后转换为 pandas 数据框并单独保存列。

pd.DataFrame(df3).to_csv('test_parquet.csv', index=False)
pd.DataFrame(df3).to_parquet('test2.parquet')

如果我可以使用字典作为数据框,接下来我将使用 pandas.DataFrame.pivot 将行转换为列。接下来,我尝试将 dict 转换为 pyarrow table(似乎我也可以将条目保存在列(1 行)中)。

table = pa.Table.from_pydict({'data', pa.array(df3)})

在上面一行之后我有一个错误,我找不到解决方案(TypeError: unhashable type: 'pyarrow.lib.StructArray')。下一步是通过 pyarrow 将 table 保存到镶木地板中。

选项 2 - 通过 pyarrow

构造

在这里,我尝试在 parquet 内部工作以更改模式(或写入新模式)

df = pd.read_csv('original.csv')
df = df.drop(columns=['date'])
df.to_parquet('test.parquet')
table = pq.read_table('test.parquet', columns=['bidopen', 'bidclose', 'bidhigh', 'bidlow', 'tickqty'])

在这里,我阅读了 parquet 的模式以查看每一列的数据类型。下面我设置了新架构:

struct = pa.struct([
    pa.field('bidopen', pa.float64()),
    pa.field('bidclose', pa.float64()),
    pa.field('bidhigh', pa.float64()),
    pa.field('bidlow', pa.float64()),
    pa.field('tickqty', pa.int64())
])
fields = ([pa.field('data', pa.list_(struct))])
schema = pa.schema(fields)
writer = pq.ParquetWriter('test2.parquet', schema)
writer.write_table(table)
writer.close()

我遇到了一个错误,我也找不到解决方案(ValueError:Table 架构与用于创建文件的架构不匹配:...),因为我认为它会保存到新提供的架构。

选项 3 - pyarrow cast

#(the upper part is from the Option 2)
...
schema = pa.schema(fields)
table2 = table.cast(schema)
writer = pq.ParquetWriter('test2.parquet', schema)
writer.write_table(table2)
writer.close()

我收到另一个错误(ValueError:目标架构的字段名称与 table 的字段名称不匹配:)。在这里我说 - 来吧,我正在做演员正是因为模式不一样......那没有帮助。

选项 4 - 另一种尝试在从 pandas 加载到 pyarrow 时更改架构,以便稍后将其保存到 parquet

arrays = [['data','data','data','data','data'],['bidopen', 'bidclose','bidhigh','bidlow','tickqty']]
tuples = list(zip(*arrays))
index = pd.MultiIndex.from_tuples(tuples)
df2 = pd.DataFrame(df.values[:, 1:], columns=index)
pa.Schema.from_pandas(df2)

这里我遇到了一个错误(AttributeError: 'list' object has no attribute 'columns'),我也找不到解决方法。

选项 5 - pyspark

这对我来说是最糟糕的,因为我花了大约 3 天的时间来“学习”它,因为它应该能够转换为结构和旋转。但是,后来我发现如果没有额外的软件包:Hadoop 和 Java SDK(它不是免费使用的),我无法在我的 Win10 上通过 pyspark 将数据保存到镶木地板中。因此,我停止进一步开发它。

对于问题的第一部分,您可以这样做(注意,StructArray.from_arrays 需要数组,因此您需要展平分块数组):

fields, arrs = [], []
for column_index in range(table.num_columns):
    fields.append(table.field(column_index))
    arrs.append(table.column(column_index).flatten()[0].chunks[0])
struct_array = pa.StructArray.from_arrays(arrs, fields=fields)
print(struct_array)
print(struct_array.to_pylist())

示例输出:

-- is_valid: all not null
-- child 0 type: double
  [
    1.1,
    2.2
  ]
-- child 1 type: double
  [
    3.3,
    4.4
  ]
-- child 2 type: double
  [
    5.5,
    6.6
  ]
-- child 3 type: double
  [
    7.7,
    8.8
  ]
-- child 4 type: int64
  [
    9,
    10
  ]
[{'bidopen': 1.1, 'bidclose': 3.3, 'bidhigh': 5.5, 'bidlow': 7.7, 'tickqty': 9}, {'bidopen': 2.2, 'bidclose': 4.4, 'bidhigh': 6.6, 'bidlow': 8.8, 'tickqty': 10}]

我不认为 pyarrow 可以转置,如果那是你要问的问题的第二部分。您可以使用 pandas 进行转置,但它会是另一个副本。

df = pa.Table.from_arrays([struct_array], ['data']).to_pandas()
print(df.transpose())

示例输出:

                                                      0  \
data  {'bidopen': 1.1, 'bidclose': 3.3, 'bidhigh': 5...   

                                                      1  
data  {'bidopen': 2.2, 'bidclose': 4.4, 'bidhigh': 6...  

​

这种情况下的输出将始终是单行 table 和 N 列,每个单元格都是一个结构。