Apache Spark TypeError: Object of type DataFrame is not JSON serializable
Apache Spark TypeError: Object of type DataFrame is not JSON serializable
我正在将 JSON 数据从 Apache Spark / Databricks 发送到 API。 API 需要以下 JSON 格式的数据:
Sample:
{
"CtcID": 1,
"LastName": "sample string 2",
"CpyID": 3,
"HonTitle": "sample string 4",
"PositionCode": 1,
"PositionFreeText": "sample string 6",
"CreateDate": "2021-04-21T08:50:56.8602571+01:00",
"ModifDate": "2021-04-21T08:50:56.8602571+01:00",
"ModifiedBy": 1,
"SourceID": "sample string 9",
"OriginID": "sample string 10",
"DoNotExport": true,
"ParentEmailAddress": "sample string 13",
"SupInfo": [
{
"FieldName": "sample string 1",
"DATA_TYPE": "sample string 2",
"IS_NULLABLE": "sample string 3",
"FieldContent": "sample string 4"
},
{
"FieldName": "sample string 1",
"DATA_TYPE": "sample string 2",
"IS_NULLABLE": "sample string 3",
"FieldContent": "sample string 4"
}
],
我将按以下 JSON 格式发送数据:
{"Last_name":"Finnigan","First_name":"Michael","Email":"MichaelF@email.com"}
{"Last_name":"Phillips","First_name":"Austin","Email":"PhillipsA@email.com"}
{"Last_name":"Collins","First_name":"Colin","Email":"ColinCollins@email.com"}
{"Last_name":"Finnigan","First_name":"Judy","Email":"Judy@email.com"}
{"Last_name":"Jones","First_name":"Julie","Email":"Julie@email.com"}
{"Last_name":"Smith","First_name":"Barry","Email":"Barry@email.com"}
{"Last_name":"Kane","First_name":"Harry","Email":"Harry@email.com"}
{"Last_name":"Smith","First_name":"John","Email":"John@email.com"}
{"Last_name":"Colins","First_name":"Ruby","Email":"RubySmith@email.com"}
{"Last_name":"Tests","First_name":"Smoke","Email":"a.n.other@pret.com"}
Apache Spark中的代码如下:
url = 'https://enimuozygj4jqx.m.pipedream.net'
files = spark.read.json("abfss://azurestorageaccount.dfs.core.windows.net/PostContact.json")
r = requests.post(url, data=json.dumps(files))
print(r.status_code)
当我执行代码时出现以下错误:
TypeError:DataFrame 类型的对象不是 JSON 可序列化的
Dataframe 是一组 Row 对象,您不能对其执行 json.dumps
。你可以这样做:
from pyspark.sql.functions import *
files_df = spark.read.json("...")
rows = files_df.select(to_json(struct('*')).alias("json")).collect()
files = [json.loads(row[0]) for row in rows]
r = requests.post(url, data=json.dumps(files))
此代码采用 Dataframe 并将每一行转换为结构(使用 struct
函数)(如 Python 字典),然后将该对象转换为 JSON 字符串(通过 to_json
)。然后我们将其转换为 Python 对象列表,您可以对其使用 json.dumps
我正在将 JSON 数据从 Apache Spark / Databricks 发送到 API。 API 需要以下 JSON 格式的数据:
Sample:
{
"CtcID": 1,
"LastName": "sample string 2",
"CpyID": 3,
"HonTitle": "sample string 4",
"PositionCode": 1,
"PositionFreeText": "sample string 6",
"CreateDate": "2021-04-21T08:50:56.8602571+01:00",
"ModifDate": "2021-04-21T08:50:56.8602571+01:00",
"ModifiedBy": 1,
"SourceID": "sample string 9",
"OriginID": "sample string 10",
"DoNotExport": true,
"ParentEmailAddress": "sample string 13",
"SupInfo": [
{
"FieldName": "sample string 1",
"DATA_TYPE": "sample string 2",
"IS_NULLABLE": "sample string 3",
"FieldContent": "sample string 4"
},
{
"FieldName": "sample string 1",
"DATA_TYPE": "sample string 2",
"IS_NULLABLE": "sample string 3",
"FieldContent": "sample string 4"
}
],
我将按以下 JSON 格式发送数据:
{"Last_name":"Finnigan","First_name":"Michael","Email":"MichaelF@email.com"}
{"Last_name":"Phillips","First_name":"Austin","Email":"PhillipsA@email.com"}
{"Last_name":"Collins","First_name":"Colin","Email":"ColinCollins@email.com"}
{"Last_name":"Finnigan","First_name":"Judy","Email":"Judy@email.com"}
{"Last_name":"Jones","First_name":"Julie","Email":"Julie@email.com"}
{"Last_name":"Smith","First_name":"Barry","Email":"Barry@email.com"}
{"Last_name":"Kane","First_name":"Harry","Email":"Harry@email.com"}
{"Last_name":"Smith","First_name":"John","Email":"John@email.com"}
{"Last_name":"Colins","First_name":"Ruby","Email":"RubySmith@email.com"}
{"Last_name":"Tests","First_name":"Smoke","Email":"a.n.other@pret.com"}
Apache Spark中的代码如下:
url = 'https://enimuozygj4jqx.m.pipedream.net'
files = spark.read.json("abfss://azurestorageaccount.dfs.core.windows.net/PostContact.json")
r = requests.post(url, data=json.dumps(files))
print(r.status_code)
当我执行代码时出现以下错误:
TypeError:DataFrame 类型的对象不是 JSON 可序列化的
Dataframe 是一组 Row 对象,您不能对其执行 json.dumps
。你可以这样做:
from pyspark.sql.functions import *
files_df = spark.read.json("...")
rows = files_df.select(to_json(struct('*')).alias("json")).collect()
files = [json.loads(row[0]) for row in rows]
r = requests.post(url, data=json.dumps(files))
此代码采用 Dataframe 并将每一行转换为结构(使用 struct
函数)(如 Python 字典),然后将该对象转换为 JSON 字符串(通过 to_json
)。然后我们将其转换为 Python 对象列表,您可以对其使用 json.dumps