从 Python 写入嵌套镶木地板格式

Write nested parquet format from Python

免责声明:我对这两个主题(python 以及镶木地板)都很陌生,所以如果我的想法太复杂,请告诉我。

我正在寻找有关如何以最有效的方式最好地完成以下转换的指导:

我有一个平面镶木地板文件,其中一个 varchar 列将 JSON 数据存储为字符串,我想将此数据转换为嵌套结构,即 JSON 数据变为嵌套镶木地板。如果这有任何帮助,我提前知道 JSON 的架构。

这是我到目前为止“完成”的:


构建示例数据

# load packages

import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq

# Create dummy data

# dummy data with JSON as string
person_data = {'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }

# from dict to panda df
person_df = pd.DataFrame.from_dict(person_data)

# from panda df to pyarrow table
person_pat = pa.Table.from_pandas(person_df)

# save as parquet file
pq.write_table(person_pat, 'output/example.parquet')

脚本提案

# load dummy data
sample = pa.parquet.read_table('output/example.parquet')

# transform to dict
sample_dict = sample.to_pydict()
# print with indent for checking
print(json.dumps(sample_dict, sort_keys=True, indent=4))
# load json from string and replace string
sample_dict['languages'] = json.loads(str(sample_dict['languages']))
print(json.dumps(sample_dict, sort_keys=True, indent=4))
#type(sample_dict['languages'])

# how to keep the nested structure when going from dict —> panda df —> pyarrow table?
# save dict as nested parquet...

所以,这是我的具体问题:

  1. 这种方法是正确的方法还是可以以任何方式进行优化? dict、df、pa之间的所有转换table感觉效率不高,很高兴在这里受教
  2. dict —> df 转换时如何保留嵌套结构?或者根本不需要这个?
  3. 编写嵌套 parquet 文件的最佳方法是什么?我已经阅读 并且这里提到了 fast parquet 用于阅读但缺乏写入能力 - 同时有任何可行的解决方案吗?

非常感谢 斯蒂芬

PySpark 可以用一种简单的方式来完成,如下所示。使用 PySpark 的主要好处是随着数据的增长基础设施的可扩展性,但是使用普通的 Python 可能会有问题,因为如果你不使用像 Dask 这样的框架,你将需要更大的机器来 运行它。

from pyspark.sql import HiveContext
hc = HiveContext(sc)

# This is a way to create a PySpark dataframe from your sample, but there are others 
nested_df = hc.read.json(sc.parallelize(["""
{'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }
"""]))

# You have nested Spark dataframe here. This shows the content of the spark dataframe. 20 is the max number of rows to show on the console and False means don't cut the columns that don't fit on the screen (show all columns content)
nested_df.show(20,False)

# Writes to a location as parquet
nested_df.write.parquet('/path/parquet')

# Reads the file from the previous location
spark.read.parquet('/path/parquet').show(20, False)

这段代码的输出是

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

回答您的问题

  1. 我认为如果您可以在 Spark 中使用更多的执行程序,那么无论您拥有多少数据都没有关系,这会更有效率
  2. 您可以看到,当加载 parquet 文件时,所有字典和列表都被保留了
  3. 这取决于“最佳”的定义,但我认为这是一个不错的选择;)