Pyarrow 上带有空列的镶木地板
Parquet with null columns on Pyarrow
我正在使用 pandas.read_sql 在 PostgreSQL 上读取 table,然后将其转换为 Pyarrow table 并将其分区保存在本地文件系统中。
# Retrieve schema.table data from database
def basename_file(date_partition):
basename_file = f"{table_schema}.{table_name}-{date}.parquet"
return basename_file
def get_table_data(table_schema, table_name, date):
s = ""
s += "SELECT"
s += " *"
s += " , date(created_on) as date_partition"
s += " FROM {table_schema}.{table_name}"
s += " WHERE created_on = '{date}';"
sql = s.format(table_schema = table_schema, table_name = table_name, date = date)
# print(sql)
df = pd.read_sql(sql, db_conn)
result = pa.Table.from_pandas(df)
pq.write_to_dataset(result,
root_path = f"{dir_name}",
partition_cols = ['date_partition'],
partition_filename_cb = basename_file,
use_legacy_dataset = True
)
# print(result)
return df
问题是我的 SELECT 有一列的某些行为空。
当我将其分区以在本地文件系统中写入 (write_to_dataset) 时,一些文件只有该列为空的行,因此分区的 Parquet 文件没有此列。
当我尝试通过多个分区读取它时,出现架构错误,因为其中一列无法正确转换。
这是为什么?有什么设置可以应用到 write_to_dataset 来管理这个吗?
我一直在寻找解决方法,但没有成功......
我这里的主要目标是每天导出数据,按交易日期分区并从任何需要的时间段读取数据,而不关心模式演变:这样,空列的行值将显示为空,简单地说。
如果可以 post 确切的错误消息可能会更有帮助。我用 pyarrow 6.0.1 做了一些实验,我发现只要第一个文件包含所有列的一些有效值就可以正常工作(pyarrow 将使用第一个文件来推断整个数据集的模式)。
在进行数据集发现时,“第一个”文件在技术上没有明确定义,但目前,对于本地数据集,它应该是按字母顺序排列的第一个文件。
如果第一个文件没有所有列的值,则会出现以下错误:
Error: Unsupported cast from string to null using function cast_null
我有点惊讶,因为这种转换应该很容易(转换为 null 只需丢弃所有数据)。话虽如此,您可能不希望所有数据都被丢弃。
最简单的解决方案是在创建数据集时提供完整的预期架构。如果您提前不知道这一点,您可以通过检查数据集中的所有文件并使用 pyarrow 的 unify_schemas 自行解决。我在 this answer.
中有一个这样做的例子
这里有一些代码展示了我的发现:
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
tab = pa.Table.from_pydict({'x': [1, 2, 3], 'y': [None, None, None]})
tab2 = pa.Table.from_pydict({'x': [4, 5, 6], 'y': ['x', 'y', 'z']})
os.makedirs('/tmp/null_first_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_first_dataset/0.parquet')
pq.write_table(tab2, '/tmp/null_first_dataset/1.parquet')
os.makedirs('/tmp/null_second_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_second_dataset/1.parquet')
pq.write_table(tab2, '/tmp/null_second_dataset/0.parquet')
try:
dataset = ds.dataset('/tmp/null_first_dataset')
tab = dataset.to_table()
print(f'Was able to read in null_first_dataset without schema.')
print(tab)
except Exception as ex:
print('Was not able to read in null_first_dataset without schema')
print(f' Error: {ex}')
print()
try:
dataset = ds.dataset('/tmp/null_second_dataset')
tab = dataset.to_table()
print(f'Was able to read in null_second_dataset without schema.')
print(tab)
except:
print('Was not able to read in null_second_dataset without schema')
print(f' Error: {ex}')
print()
dataset = ds.dataset('/tmp/null_first_dataset', schema=tab2.schema)
tab = dataset.to_table()
print(f'Was able to read in null_first_dataset by specifying schema.')
print(tab)
我正在使用 pandas.read_sql 在 PostgreSQL 上读取 table,然后将其转换为 Pyarrow table 并将其分区保存在本地文件系统中。
# Retrieve schema.table data from database
def basename_file(date_partition):
basename_file = f"{table_schema}.{table_name}-{date}.parquet"
return basename_file
def get_table_data(table_schema, table_name, date):
s = ""
s += "SELECT"
s += " *"
s += " , date(created_on) as date_partition"
s += " FROM {table_schema}.{table_name}"
s += " WHERE created_on = '{date}';"
sql = s.format(table_schema = table_schema, table_name = table_name, date = date)
# print(sql)
df = pd.read_sql(sql, db_conn)
result = pa.Table.from_pandas(df)
pq.write_to_dataset(result,
root_path = f"{dir_name}",
partition_cols = ['date_partition'],
partition_filename_cb = basename_file,
use_legacy_dataset = True
)
# print(result)
return df
问题是我的 SELECT 有一列的某些行为空。 当我将其分区以在本地文件系统中写入 (write_to_dataset) 时,一些文件只有该列为空的行,因此分区的 Parquet 文件没有此列。
当我尝试通过多个分区读取它时,出现架构错误,因为其中一列无法正确转换。
这是为什么?有什么设置可以应用到 write_to_dataset 来管理这个吗? 我一直在寻找解决方法,但没有成功...... 我这里的主要目标是每天导出数据,按交易日期分区并从任何需要的时间段读取数据,而不关心模式演变:这样,空列的行值将显示为空,简单地说。
如果可以 post 确切的错误消息可能会更有帮助。我用 pyarrow 6.0.1 做了一些实验,我发现只要第一个文件包含所有列的一些有效值就可以正常工作(pyarrow 将使用第一个文件来推断整个数据集的模式)。
在进行数据集发现时,“第一个”文件在技术上没有明确定义,但目前,对于本地数据集,它应该是按字母顺序排列的第一个文件。
如果第一个文件没有所有列的值,则会出现以下错误:
Error: Unsupported cast from string to null using function cast_null
我有点惊讶,因为这种转换应该很容易(转换为 null 只需丢弃所有数据)。话虽如此,您可能不希望所有数据都被丢弃。
最简单的解决方案是在创建数据集时提供完整的预期架构。如果您提前不知道这一点,您可以通过检查数据集中的所有文件并使用 pyarrow 的 unify_schemas 自行解决。我在 this answer.
中有一个这样做的例子这里有一些代码展示了我的发现:
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
tab = pa.Table.from_pydict({'x': [1, 2, 3], 'y': [None, None, None]})
tab2 = pa.Table.from_pydict({'x': [4, 5, 6], 'y': ['x', 'y', 'z']})
os.makedirs('/tmp/null_first_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_first_dataset/0.parquet')
pq.write_table(tab2, '/tmp/null_first_dataset/1.parquet')
os.makedirs('/tmp/null_second_dataset', exist_ok=True)
pq.write_table(tab, '/tmp/null_second_dataset/1.parquet')
pq.write_table(tab2, '/tmp/null_second_dataset/0.parquet')
try:
dataset = ds.dataset('/tmp/null_first_dataset')
tab = dataset.to_table()
print(f'Was able to read in null_first_dataset without schema.')
print(tab)
except Exception as ex:
print('Was not able to read in null_first_dataset without schema')
print(f' Error: {ex}')
print()
try:
dataset = ds.dataset('/tmp/null_second_dataset')
tab = dataset.to_table()
print(f'Was able to read in null_second_dataset without schema.')
print(tab)
except:
print('Was not able to read in null_second_dataset without schema')
print(f' Error: {ex}')
print()
dataset = ds.dataset('/tmp/null_first_dataset', schema=tab2.schema)
tab = dataset.to_table()
print(f'Was able to read in null_first_dataset by specifying schema.')
print(tab)