pySpark - 在插入数据库之前将整个数据框列转换为 JSON 对象

pySpark - convert an entire dataframe column into JSON object before inserting into DB

目前我对 pyspark 的了解非常有限,所以我正在寻找一个快速解决方案来解决我当前实施中遇到的这个问题。我正在尝试通过 pyspark 将 JSON 文件读取到数据帧中,将其转换为一个对象,我可以将该对象插入数据库 table (DynamoDB)。 table 中的列应该代表 JSON 文件中指定的键。例如,如果我的 JSON 文件包含以下元素:

{
   "Records":[
      {
         "column1":"Value1",
         "column2":"Value2",
         "column3":"Value3",
         "column4":{
            "sub1":"Value4",
            "sub2":"Value5",
            "sub3":{
               "sub4":"Value6",
               "sub5":"Value7"
            }
         }
      },
      {
         "column1":"Value8",
         "column2":"Value9",
         "column3":"Value10",
         "column4":{
            "sub1":"Value11",
            "sub2":"Value12",
            "sub3":{
               "sub4":"Value13",
               "sub5":"Value14"
            }
         }
      }
   ]
}

数据库table中的列分别是column1、column2、column3和column4。对于 column4,它是 Map 类型,我需要在将其插入数据库之前将整个对象转换为字符串。因此,在第一行的情况下,我可以期望看到该列:

{
   "sub1":"Value4",
   "sub2":"Value5",
   "sub3":{
      "sub4":"Value6",
      "sub5":"Value7"
   }
}

但是,这是我在 运行 我的脚本 table 之后在数据库中看到的:

{ Value4, Value5, { Value6, Value7 }}

我知道发生这种情况是因为在执行数据库插入操作之前,需要先将所有列值转换为 String 类型:

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

我正在寻找一种方法来纠正 Column4 的内容以表示原始 JSON 对象,然后再将它们转换为 String 类型。这是我到目前为止写的(不包括数据库插入操作)

import pyspark.sql.types as T
from pyspark.sql import functions as SF

df = spark.read.option("multiline", "true").json('/home/abhishek.tirkey/Documents/test')

Records = df.withColumn("Records", SF.explode(SF.col("Records")))

Rows = Records.select(
    "Records.column1",
    "Records.column2",
    "Records.column3",
    "Records.column4",
)

for col in Rows.columns:
    Rows = Rows.withColumn(col, Rows[col].cast(StringType()))

RowsJSON = Rows.toJSON()

有一个 to_json 函数可以做到这一点 :

from pyspark.sql import functions as F

df = df.withColumn("record", F.explode("records")).select(
    "record.column1",
    "record.column2",
    "record.column3",
    F.to_json("record.column4").alias("column4"),
)

df.show()
+-------+-------+-------+--------------------+                                  
|column1|column2|column3|             column4|
+-------+-------+-------+--------------------+
| Value1| Value2| Value3|{"sub1":"Value4",...|
| Value8| Value9|Value10|{"sub1":"Value11"...|
+-------+-------+-------+--------------------+

df.printSchema()
root
 |-- column1: string (nullable = true)
 |-- column2: string (nullable = true)
 |-- column3: string (nullable = true)
 |-- column4: string (nullable = true)