Spark 中的传递异构(非均匀)JSON 列
Pass-through heterogeneous (non-uniform) JSON column in Spark
我们正在使用 Apache Spark 3 处理数据。1.x 其中一个字段包含完全自由格式 JSON,因此各个记录可以包含相同的键,但具有不同的数据类型(payload.field1
在本例中可以是字符串、布尔值或数字):
{"timestamp": "2021-07-30T09:41:51Z", "payload": {"field1": "some text"}}
{"timestamp": "2021-07-30T09:41:52Z", "payload": {"field1": true}}
{"timestamp": "2021-07-30T09:41:53Z", "payload": {"field1": 123}}
我们的目标是保持 payload
字段完好无损。当我们让 Spark 自动检测架构时:
Dataset<Row> events = spark.read().json("file:////users/user/input.json");
// some processing is going on here
events.write().json("file:///users/user/output.json");
输出如下(注意payload.field
现在是一个字符串):
{"payload":{"field1":"some text"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"true"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"123"},"timestamp":"2021-07-30T09:41:52Z"}
Spark 的 printSchema()
输出:
root
|-- payload: struct (nullable = true)
| |-- field1: string (nullable = true)
|-- timestamp: string (nullable = true)
到目前为止我们想出的最佳解决方法是:
Dataset<String> eventsAsString = spark.read().text("file:////users/user/input.json").as(Encoders.STRING());
Dataset<Row> events2 = eventsAsString.select( //
get_json_object(col("value"), "$.timestamp").alias("timestamp"), //
get_json_object(col("value"), "$.payload").alias("payload") // This will keep payload as string for Spark
);
// Do some processing of events here
// We have to write JSON as string to prevent Spark from encoding payload's field JSON:
events2.withColumn("joined", concat( //
format_string("{\"timestamp\":\"%s\", ", col("timestamp")), //
format_string("\"payload\":%s}", col("payload")) //
)).select(col("joined")).write().text("file:///users/user/output.txt");
我们得到的输出就是我们想要的,数据类型不变:
{"timestamp":"2021-07-30T09:41:51Z", "payload":{"field1":"some text"}}
{"timestamp":"2021-07-30T09:41:52Z", "payload":{"field1":true}}
{"timestamp":"2021-07-30T09:41:53Z", "payload":{"field1":123}}
上面的解决方案有效,但感觉超级 hacky。也许我们在这里遗漏了一些明显的东西?
提前致谢!
对于读取,我们可以在读取之前指定模式。
对于写作,我想不出更好的主意。
val schema = StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, false, Metadata.empty),
StructField("payload", DataTypes.StringType, false, Metadata.empty)) // force string type
)
val df: Dataset[Row] = spark.read.schema(schema).json(ds)
df.map(r => "{\"timestamp\": \"%s\", \"payload\": %s".format(r.getString(0), r.getString(1)))
.write.text("xxx")
我们正在使用 Apache Spark 3 处理数据。1.x 其中一个字段包含完全自由格式 JSON,因此各个记录可以包含相同的键,但具有不同的数据类型(payload.field1
在本例中可以是字符串、布尔值或数字):
{"timestamp": "2021-07-30T09:41:51Z", "payload": {"field1": "some text"}}
{"timestamp": "2021-07-30T09:41:52Z", "payload": {"field1": true}}
{"timestamp": "2021-07-30T09:41:53Z", "payload": {"field1": 123}}
我们的目标是保持 payload
字段完好无损。当我们让 Spark 自动检测架构时:
Dataset<Row> events = spark.read().json("file:////users/user/input.json");
// some processing is going on here
events.write().json("file:///users/user/output.json");
输出如下(注意payload.field
现在是一个字符串):
{"payload":{"field1":"some text"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"true"},"timestamp":"2021-07-30T09:41:51Z"}
{"payload":{"field1":"123"},"timestamp":"2021-07-30T09:41:52Z"}
Spark 的 printSchema()
输出:
root
|-- payload: struct (nullable = true)
| |-- field1: string (nullable = true)
|-- timestamp: string (nullable = true)
到目前为止我们想出的最佳解决方法是:
Dataset<String> eventsAsString = spark.read().text("file:////users/user/input.json").as(Encoders.STRING());
Dataset<Row> events2 = eventsAsString.select( //
get_json_object(col("value"), "$.timestamp").alias("timestamp"), //
get_json_object(col("value"), "$.payload").alias("payload") // This will keep payload as string for Spark
);
// Do some processing of events here
// We have to write JSON as string to prevent Spark from encoding payload's field JSON:
events2.withColumn("joined", concat( //
format_string("{\"timestamp\":\"%s\", ", col("timestamp")), //
format_string("\"payload\":%s}", col("payload")) //
)).select(col("joined")).write().text("file:///users/user/output.txt");
我们得到的输出就是我们想要的,数据类型不变:
{"timestamp":"2021-07-30T09:41:51Z", "payload":{"field1":"some text"}}
{"timestamp":"2021-07-30T09:41:52Z", "payload":{"field1":true}}
{"timestamp":"2021-07-30T09:41:53Z", "payload":{"field1":123}}
上面的解决方案有效,但感觉超级 hacky。也许我们在这里遗漏了一些明显的东西?
提前致谢!
对于读取,我们可以在读取之前指定模式。 对于写作,我想不出更好的主意。
val schema = StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, false, Metadata.empty),
StructField("payload", DataTypes.StringType, false, Metadata.empty)) // force string type
)
val df: Dataset[Row] = spark.read.schema(schema).json(ds)
df.map(r => "{\"timestamp\": \"%s\", \"payload\": %s".format(r.getString(0), r.getString(1)))
.write.text("xxx")