读取 json 加载到 pyspark 数据帧

Read json load to pyspark dataframe

我想从 azure cosmos db 获取数据,我正在使用 python sdk 在数据块中进行连接。

我希望能够将我的 json.load(data) 保存到 pyspark 数据框中,因为我需要将数据保存在 databricks delta lake 中,如何将此数据读取到 pyspark 数据框中。下面是我的代码和示例数据

{
 "appUuid": "aaaa-bbbb-cccc",
 "SystemId": null,
 "city": "Lancaster",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}
{
 "appUuid": "bbbb-dddd-eeee",
 "SystemId": null,
 "city": "Alden ",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}
from azure.cosmos import CosmosClient

client = CosmosClient('https://<cosmos_client>.documents.azure.com:443/', credential='AccountKey')
DATABASE_NAME = 'TestDB'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'Test'
container = database.get_container_client(CONTAINER_NAME)

import json
for item in container.query_items(
         query='SELECT Top 10 * FROM Test',
        enable_cross_partition_query=True):
    data = json.dumps(item, indent=True)
    print(data)
    print(type(data))

# converting string to json dict
data1 = json.loads(data)
print(data1)
print(type(data1))


from pyspark.sql import *
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.getOrCreate()

df = spark.read.json(data1) -- I am getting error on this line.
display(df)

我收到这个错误:

"IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: {"

您需要正确解析您的 JSON 数据

data = """{
 "appUuid": "aaaa-bbbb-cccc",
 "SystemId": null,
 "city": "Lancaster",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}
{
 "appUuid": "bbbb-dddd-eeee",
 "SystemId": null,
 "city": "Alden ",
 "state": "NY",
 "zipCode": "140",
 "field1": "others",
 "field2": "others"
}"""

import re  
#convert null for parsing
data = re.findall('[^}]+}',data.replace("null",'"null"'))

#cast to dict type
import ast
data = list(map(ast.literal_eval,data))

df = spark.read.json(sc.parallelize(data))
df.show()

输出:

+--------+--------------+---------+------+------+-----+-------+
|SystemId|       appUuid|     city|field1|field2|state|zipCode|
+--------+--------------+---------+------+------+-----+-------+
|    null|aaaa-bbbb-cccc|Lancaster|others|others|   NY|    140|
|    null|bbbb-dddd-eeee|   Alden |others|others|   NY|    140|
+--------+--------------+---------+------+------+-----+-------+

您正在尝试将 JSON 字符串指定为数据路径 - 它不会那样工作。 .json() 函数要么接受文件路径,要么接受字符串的 RDD。要创建 RDD,请使用以下代码:

rdd = sc.parallelize([data1])
df = spark.read.json(rdd)