Pyarrow 上带有空列的镶木地板

Parquet with null columns on Pyarrow

我正在使用 pandas.read_sql 在 PostgreSQL 上读取 table,然后将其转换为 Pyarrow table 并将其分区保存在本地文件系统中。

# Retrieve schema.table data from database
def basename_file(date_partition):
    basename_file = f"{table_schema}.{table_name}-{date}.parquet"
    return basename_file

def get_table_data(table_schema, table_name, date):
    s  = ""
    s += "SELECT"
    s += " *"
    s += " , date(created_on) as date_partition"
    s += " FROM {table_schema}.{table_name}"
    s += " WHERE created_on = '{date}';" 
    sql = s.format(table_schema = table_schema, table_name = table_name, date = date)
#     print(sql)

    df = pd.read_sql(sql, db_conn)
    result = pa.Table.from_pandas(df)
    pq.write_to_dataset(result,
                        root_path = f"{dir_name}",
                        partition_cols = ['date_partition'],
                        partition_filename_cb = basename_file,
                        use_legacy_dataset = True
                       )
#     print(result)
    return df

问题是我的 SELECT 有一列的某些行为空。 当我将其分区以在本地文件系统中写入 (write_to_dataset) 时,一些文件只有该列为空的行,因此分区的 Parquet 文件没有此列。

当我尝试通过多个分区读取它时,出现架构错误,因为其中一列无法正确转换。

这是为什么?有什么设置可以应用到 write_to_dataset 来管理这个吗? 我一直在寻找解决方法,但没有成功...... 我这里的主要目标是每天导出数据,按交易日期分区并从任何需要的时间段读取数据,而不关心模式演变:这样,空列的行值将显示为空,简单地说。

如果可以 post 确切的错误消息可能会更有帮助。我用 pyarrow 6.0.1 做了一些实验,我发现只要第一个文件包含所有列的一些有效值就可以正常工作(pyarrow 将使用第一个文件来推断整个数据集的模式)。

在进行数据集发现时,“第一个”文件在技术上没有明确定义,但目前,对于本地数据集,它应该是按字母顺序排列的第一个文件。

如果第一个文件没有所有列的值,则会出现以下错误:

Error: Unsupported cast from string to null using function cast_null

我有点惊讶,因为这种转换应该很容易(转换为 null 只需丢弃所有数据)。话虽如此,您可能不希望所有数据都被丢弃。

最简单的解决方案是在创建数据集时提供完整的预期架构。如果您提前不知道这一点,您可以通过检查数据集中的所有文件并使用 pyarrow 的 unify_schemas 自行解决。我在 this answer.

中有一个这样做的例子

这里有一些代码展示了我的发现:

import os

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

tab = pa.Table.from_pydict({'x': [1, 2, 3], 'y': [None, None, None]})
tab2 = pa.Table.from_pydict({'x': [4, 5, 6], 'y': ['x', 'y', 'z']})

os.makedirs('/tmp/null_first_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_first_dataset/0.parquet')
pq.write_table(tab2, '/tmp/null_first_dataset/1.parquet')

os.makedirs('/tmp/null_second_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_second_dataset/1.parquet')
pq.write_table(tab2, '/tmp/null_second_dataset/0.parquet')

try:
    dataset = ds.dataset('/tmp/null_first_dataset')
    tab = dataset.to_table()
    print(f'Was able to read in null_first_dataset without schema.')
    print(tab)
except Exception as ex:
    print('Was not able to read in null_first_dataset without schema')
    print(f'  Error: {ex}')
print()

try:
    dataset = ds.dataset('/tmp/null_second_dataset')
    tab = dataset.to_table()
    print(f'Was able to read in null_second_dataset without schema.')
    print(tab)
except:
    print('Was not able to read in null_second_dataset without schema')
    print(f'  Error: {ex}')
print()

dataset = ds.dataset('/tmp/null_first_dataset', schema=tab2.schema)
tab = dataset.to_table()
print(f'Was able to read in null_first_dataset by specifying schema.')
print(tab)