PySpark 读取 DynamoDB 格式 json
PySpark read DynamoDB formatted json
我不是 spark 的专家,所以寻求帮助。
我使用内置服务从 DynamoDB table 迁移到 S3。它以 *.json 格式保存文件。比方说下面我们有一个行的例子(每行数据是一个嵌套在键“Item”下的字典)。
{
"Item": {
"accept_languages": {
"M": {
"en": {"N": "0.9"},
"en-US": {"N": "1"}
}
},
"accept_mimetypes": {
"M": {
"*/*": {"N": "0.8"},
"image/*": {"N": "1"},
"image/apng": {"N": "1"},
"image/webp": {"N": "1"}
}
},
"id": {"S": "5cddbd53b870c2619f1083ed"},
"ip": {"S": "11.11.111.11"},
"landing_page__type": {"S": "PageMain"},
"location__city": {"S": "Scituate"},
"location__country": {"S": "United States"},
"location__country_code": {"S": "US"},
"location__region": {"S": "MA"},
"location__zip": {"S": "02066"},
"origin_url": {"S": "https://www.bing.com/"},
"session": {"S": "b4d58fd18"},
"source": {"S": "bing"},
"user_agent__browser": {"S": "Chrome"},
"user_device": {"S": "t"}
}
}
正如我们所见,每一行数据都是嵌套的。
我想创建一个 *.csv 文件作为它的结果。
有什么建议我可以解析它吗?
目前我有一个 UDF(自定义函数)来将字典本身从 DynamoDB 转换为常规视图。
例如,我如何从每一行中提取数据并将该函数应用于它。
谢谢
想法(取自)是递归地收集列表中的所有列名,然后在select
语句中使用此列表:
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.option("multiLine", "true").json(<filename>)
def flatten(schema, prefix=None):
for field in schema.fields:
if prefix is None:
colName = field.name
else:
colName = prefix + "." + field.name
if isinstance(field.dataType,T.StructType):
yield from flatten(field.dataType, colName)
else:
yield F.col(colName).alias(colName.replace(".", "_"))
df.select(list(flatten(df.schema))).show()
输出:
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N| Item_id_S| Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S| Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
| 0.9| 1| 0.8| 1| 1| 1|5cddbd53b870c2619...|11.11.111.11| PageMain| Scituate| United States| US| MA| 02066|https://www.bing....| b4d58fd18| bing| Chrome| t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
然后可以将此数据框另存为平面 csv。
我不是 spark 的专家,所以寻求帮助。
我使用内置服务从 DynamoDB table 迁移到 S3。它以 *.json 格式保存文件。比方说下面我们有一个行的例子(每行数据是一个嵌套在键“Item”下的字典)。
{
"Item": {
"accept_languages": {
"M": {
"en": {"N": "0.9"},
"en-US": {"N": "1"}
}
},
"accept_mimetypes": {
"M": {
"*/*": {"N": "0.8"},
"image/*": {"N": "1"},
"image/apng": {"N": "1"},
"image/webp": {"N": "1"}
}
},
"id": {"S": "5cddbd53b870c2619f1083ed"},
"ip": {"S": "11.11.111.11"},
"landing_page__type": {"S": "PageMain"},
"location__city": {"S": "Scituate"},
"location__country": {"S": "United States"},
"location__country_code": {"S": "US"},
"location__region": {"S": "MA"},
"location__zip": {"S": "02066"},
"origin_url": {"S": "https://www.bing.com/"},
"session": {"S": "b4d58fd18"},
"source": {"S": "bing"},
"user_agent__browser": {"S": "Chrome"},
"user_device": {"S": "t"}
}
}
正如我们所见,每一行数据都是嵌套的。 我想创建一个 *.csv 文件作为它的结果。 有什么建议我可以解析它吗? 目前我有一个 UDF(自定义函数)来将字典本身从 DynamoDB 转换为常规视图。 例如,我如何从每一行中提取数据并将该函数应用于它。
谢谢
想法(取自select
语句中使用此列表:
from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.read.option("multiLine", "true").json(<filename>)
def flatten(schema, prefix=None):
for field in schema.fields:
if prefix is None:
colName = field.name
else:
colName = prefix + "." + field.name
if isinstance(field.dataType,T.StructType):
yield from flatten(field.dataType, colName)
else:
yield F.col(colName).alias(colName.replace(".", "_"))
df.select(list(flatten(df.schema))).show()
输出:
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
|Item_accept_languages_M_en_N|Item_accept_languages_M_en-US_N|Item_accept_mimetypes_M_*/*_N|Item_accept_mimetypes_M_image/*_N|Item_accept_mimetypes_M_image/apng_N|Item_accept_mimetypes_M_image/webp_N| Item_id_S| Item_ip_S|Item_landing_page__type_S|Item_location__city_S|Item_location__country_S|Item_location__country_code_S|Item_location__region_S|Item_location__zip_S| Item_origin_url_S|Item_session_S|Item_source_S|Item_user_agent__browser_S|Item_user_device_S|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
| 0.9| 1| 0.8| 1| 1| 1|5cddbd53b870c2619...|11.11.111.11| PageMain| Scituate| United States| US| MA| 02066|https://www.bing....| b4d58fd18| bing| Chrome| t|
+----------------------------+-------------------------------+-----------------------------+---------------------------------+------------------------------------+------------------------------------+--------------------+------------+-------------------------+---------------------+------------------------+-----------------------------+-----------------------+--------------------+--------------------+--------------+-------------+--------------------------+------------------+
然后可以将此数据框另存为平面 csv。