在 Scala 中使用 Spark 写入 JSON 格式之前,在每行前面添加一个新行

Add a new line in front of each line before writing to JSON format using Spark in Scala

我想在 Spark 将文档写入我的 s3 存储桶之前在每个 json 文档前面添加一个新行:

df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY, LAST_MODIFIED_DATE, NVL(CLASS_NAME, className) as CLASS_NAME, DECISION, TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()

parkSQL.write.json("s3://test-bucket/json-output-7/")

仅使用此命令,它将生成包含以下内容的文件:

{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}

但是,我想要实现的是如下所示:

{"index":{}}
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}

任何有关如何实现此结果的见解都将不胜感激!

下面的代码会将 {"index":{}}DataFrame 中的现有行数据连接起来 & 它会将数据转换为 json 然后使用 text 格式保存 json 数据.

df
.select(
    lit("""{"index":{}}""").as("index"),
    to_json(struct($"*")).as("json_data")
)
.select(
    concat_ws(
        "\n", // This will split index column & other column data into two lines.
        $"index",
        $"json_data"
    ).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")

最终输出

cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt

{"index":{}}
{"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}