在 Scala 中使用 Spark 写入 JSON 格式之前,在每行前面添加一个新行
Add a new line in front of each line before writing to JSON format using Spark in Scala
我想在 Spark 将文档写入我的 s3 存储桶之前在每个 json 文档前面添加一个新行:
df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY, LAST_MODIFIED_DATE, NVL(CLASS_NAME, className) as CLASS_NAME, DECISION, TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()
parkSQL.write.json("s3://test-bucket/json-output-7/")
仅使用此命令,它将生成包含以下内容的文件:
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
但是,我想要实现的是如下所示:
{"index":{}}
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
任何有关如何实现此结果的见解都将不胜感激!
下面的代码会将 {"index":{}}
与 DataFrame
中的现有行数据连接起来 & 它会将数据转换为 json
然后使用 text
格式保存 json 数据.
df
.select(
lit("""{"index":{}}""").as("index"),
to_json(struct($"*")).as("json_data")
)
.select(
concat_ws(
"\n", // This will split index column & other column data into two lines.
$"index",
$"json_data"
).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")
最终输出
cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt
{"index":{}}
{"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
我想在 Spark 将文档写入我的 s3 存储桶之前在每个 json 文档前面添加一个新行:
df.createOrReplaceTempView("ParquetTable")
val parkSQL = spark.sql("select LAST_MODIFIED_BY, LAST_MODIFIED_DATE, NVL(CLASS_NAME, className) as CLASS_NAME, DECISION, TASK_TYPE_ID from ParquetTable")
parkSQL.show(false)
parkSQL.count()
parkSQL.write.json("s3://test-bucket/json-output-7/")
仅使用此命令,它将生成包含以下内容的文件:
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
但是,我想要实现的是如下所示:
{"index":{}}
{"LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
任何有关如何实现此结果的见解都将不胜感激!
下面的代码会将 {"index":{}}
与 DataFrame
中的现有行数据连接起来 & 它会将数据转换为 json
然后使用 text
格式保存 json 数据.
df
.select(
lit("""{"index":{}}""").as("index"),
to_json(struct($"*")).as("json_data")
)
.select(
concat_ws(
"\n", // This will split index column & other column data into two lines.
$"index",
$"json_data"
).as("data")
)
.write
.format("text") // This is required.
.save("s3://test-bucket/json-output-7/")
最终输出
cat part-00000-24619b28-6501-4763-b3de-1a2f72a5a4ec-c000.txt
{"index":{}}
{"CLASS_NAME":"/SC/Trade/HTS_CA/1234abcd","DECISION":"AGREE","LAST_MODIFIED_BY":"david","LAST_MODIFIED_DATE":"2018-06-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}
{"index":{}}
{"CLASS_NAME":"/SC/Import/HTS_US/9876abcd","DECISION":"DISAGREE","LAST_MODIFIED_BY":"sarah","LAST_MODIFIED_DATE":"2018-08-26 12:02:03.0","TASK_TYPE_ID":"abcd1234-832b-43b6-afa6-361253ffe1d5"}