读取 json 加载到 pyspark 数据帧
Read json load to pyspark dataframe
我想从 azure cosmos db 获取数据,我正在使用 python sdk 在数据块中进行连接。
我希望能够将我的 json.load(data) 保存到 pyspark 数据框中,因为我需要将数据保存在 databricks delta lake 中,如何将此数据读取到 pyspark 数据框中。下面是我的代码和示例数据
{
"appUuid": "aaaa-bbbb-cccc",
"SystemId": null,
"city": "Lancaster",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
{
"appUuid": "bbbb-dddd-eeee",
"SystemId": null,
"city": "Alden ",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
from azure.cosmos import CosmosClient
client = CosmosClient('https://<cosmos_client>.documents.azure.com:443/', credential='AccountKey')
DATABASE_NAME = 'TestDB'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'Test'
container = database.get_container_client(CONTAINER_NAME)
import json
for item in container.query_items(
query='SELECT Top 10 * FROM Test',
enable_cross_partition_query=True):
data = json.dumps(item, indent=True)
print(data)
print(type(data))
# converting string to json dict
data1 = json.loads(data)
print(data1)
print(type(data1))
from pyspark.sql import *
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(data1) -- I am getting error on this line.
display(df)
我收到这个错误:
"IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: {"
您需要正确解析您的 JSON 数据
data = """{
"appUuid": "aaaa-bbbb-cccc",
"SystemId": null,
"city": "Lancaster",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
{
"appUuid": "bbbb-dddd-eeee",
"SystemId": null,
"city": "Alden ",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}"""
import re
#convert null for parsing
data = re.findall('[^}]+}',data.replace("null",'"null"'))
#cast to dict type
import ast
data = list(map(ast.literal_eval,data))
df = spark.read.json(sc.parallelize(data))
df.show()
输出:
+--------+--------------+---------+------+------+-----+-------+
|SystemId| appUuid| city|field1|field2|state|zipCode|
+--------+--------------+---------+------+------+-----+-------+
| null|aaaa-bbbb-cccc|Lancaster|others|others| NY| 140|
| null|bbbb-dddd-eeee| Alden |others|others| NY| 140|
+--------+--------------+---------+------+------+-----+-------+
您正在尝试将 JSON 字符串指定为数据路径 - 它不会那样工作。 .json()
函数要么接受文件路径,要么接受字符串的 RDD。要创建 RDD,请使用以下代码:
rdd = sc.parallelize([data1])
df = spark.read.json(rdd)
我想从 azure cosmos db 获取数据,我正在使用 python sdk 在数据块中进行连接。
我希望能够将我的 json.load(data) 保存到 pyspark 数据框中,因为我需要将数据保存在 databricks delta lake 中,如何将此数据读取到 pyspark 数据框中。下面是我的代码和示例数据
{
"appUuid": "aaaa-bbbb-cccc",
"SystemId": null,
"city": "Lancaster",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
{
"appUuid": "bbbb-dddd-eeee",
"SystemId": null,
"city": "Alden ",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
from azure.cosmos import CosmosClient
client = CosmosClient('https://<cosmos_client>.documents.azure.com:443/', credential='AccountKey')
DATABASE_NAME = 'TestDB'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'Test'
container = database.get_container_client(CONTAINER_NAME)
import json
for item in container.query_items(
query='SELECT Top 10 * FROM Test',
enable_cross_partition_query=True):
data = json.dumps(item, indent=True)
print(data)
print(type(data))
# converting string to json dict
data1 = json.loads(data)
print(data1)
print(type(data1))
from pyspark.sql import *
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(data1) -- I am getting error on this line.
display(df)
我收到这个错误:
"IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: {"
您需要正确解析您的 JSON 数据
data = """{
"appUuid": "aaaa-bbbb-cccc",
"SystemId": null,
"city": "Lancaster",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}
{
"appUuid": "bbbb-dddd-eeee",
"SystemId": null,
"city": "Alden ",
"state": "NY",
"zipCode": "140",
"field1": "others",
"field2": "others"
}"""
import re
#convert null for parsing
data = re.findall('[^}]+}',data.replace("null",'"null"'))
#cast to dict type
import ast
data = list(map(ast.literal_eval,data))
df = spark.read.json(sc.parallelize(data))
df.show()
输出:
+--------+--------------+---------+------+------+-----+-------+
|SystemId| appUuid| city|field1|field2|state|zipCode|
+--------+--------------+---------+------+------+-----+-------+
| null|aaaa-bbbb-cccc|Lancaster|others|others| NY| 140|
| null|bbbb-dddd-eeee| Alden |others|others| NY| 140|
+--------+--------------+---------+------+------+-----+-------+
您正在尝试将 JSON 字符串指定为数据路径 - 它不会那样工作。 .json()
函数要么接受文件路径,要么接受字符串的 RDD。要创建 RDD,请使用以下代码:
rdd = sc.parallelize([data1])
df = spark.read.json(rdd)