无法使用 spark scala 从 json 嵌套属性中删除“\”
Unable to remove "\" from the json nested attributes using spark scala
我正在处理一个 FHIR
资源,我在其中获得如下 JSON 数据:
{
"appointmentRef": "Appointment/12213#4200",
"encounterLengh": "2",
"billingAccount": "savingsAccount",
"hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
"resourceType": "Encounter",
"priority": "abc",
"status": "triaged",
"eid": "200",
"subject": "Patient/435"
}
所以,以前对于根级别的属性,如 appointmentRef
等..他们在 R.H.S 上也有 "\"
,我可以通过以下方式将其删除我的代码。但是,从上面的数据可以看出,对于嵌套属性,我的代码不起作用。
rowList.groupBy(row => row.key).foreach(rowList => {
import com.google.gson.{Gson, JsonObject}
val map: Map[String, String] = mutable.Map()
rowList._2.foreach(row => {
LOGGER.debug(s"row == $row")
if (Utility.isBlank(row.jsonElementTag)) {
val convertedObject = new Gson().fromJson(row.value, classOf[JsonObject])
val itr = convertedObject.entrySet().iterator()
while (itr.hasNext) {
val next = itr.next()
val value = next.getValue.getAsString
val key = next.getKey
LOGGER.debug(s"key-- $key value --$value")
map.put(key, value)
}
}
else {
val convertedObject = new Gson().fromJson(row.value, classOf[JsonObject])
LOGGER.debug(s"convertedObject == $convertedObject")
if (null != map.get(row.jsonElementTag).getOrElse(null)) {
LOGGER.debug("map.get(row.jsonElementTag).get === "+row.jsonElementTag +" "+map.get(row.jsonElementTag).get)
var array: JsonArray = new JsonArray
val mapElement = new Gson().fromJson(map.get(row.jsonElementTag).get, classOf[JsonObject])
array.add(mapElement)
array.add(convertedObject)
map.put(row.jsonElementTag, array.toString)
}
else {
map.put(row.jsonElementTag, convertedObject.toString)
}
}
})
我只是从数据框中获取行并迭代这些行,将其作为字符串,并将其放入键值对中。 if
循环将对父级属性执行 运行,对嵌套属性将执行 else-if
循环。
我什至尝试了更简单的replace("\","")
方法,但没有用。那么,如何从嵌套属性中删除 "\"
?
我的 expected
输出是我的嵌套 JSON 属性中应该没有 "\"
。
hospitalization
列是字符串类型,它包含 json object
。要将字符串提取或转换为 json,请根据该列中的数据准备 schema
。
检查下面的代码。
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = DataType.fromJson("""{"type":"struct","fields":[{"name":"admitSourceCode","type":"string","nullable":true,"metadata":{}},{"name":"admitSourceReason","type":"string","nullable"
:true,"metadata":{}},{"name":"destination","type":"string","nullable":true,"metadata":{}},{"name":"eid","type":"long","nullable":true,"metadata":{}},{"name":"origin","type":"string","nullable":tr
ue,"metadata":{}},{"name":"preAdmissionIdentifierSystem","type":"string","nullable":true,"metadata":{}},{"name":"preAdmissionIdentifierValue","type":"string","nullable":true,"metadata":{}}]}""").
asInstanceOf[StructType]
scala> df.withColumn("hospitalization",from_json($"hospitalization",schema)).printSchema
root
|-- appointmentRef: string (nullable = true)
|-- billingAccount: string (nullable = true)
|-- eid: string (nullable = true)
|-- encounterLengh: string (nullable = true)
|-- hospitalization: struct (nullable = true)
| |-- admitSourceCode: string (nullable = true)
| |-- admitSourceReason: string (nullable = true)
| |-- destination: string (nullable = true)
| |-- eid: long (nullable = true)
| |-- origin: string (nullable = true)
| |-- preAdmissionIdentifierSystem: string (nullable = true)
| |-- preAdmissionIdentifierValue: string (nullable = true)
|-- priority: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- status: string (nullable = true)
|-- subject: string (nullable = true)
scala> df.withColumn("hospitalization",from_json($"hospitalization",schema)).show(false)
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
|appointmentRef |billingAccount|eid|encounterLengh|hospitalization |priority|resourceType|status |subject |
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
|Appointment/12213#4200|savingsAccount|200|2 |[outp, some thing, hospital, 200, hospital, https://system123445.html, pqr]|abc |Encounter |triaged|Patient/435|
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
更新
创建了小帮手 class 以在没有架构的情况下提取或转换 json。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._
val append = udf((rowId: Long,json: String) => {
compact(render(Map("rowId" -> parse(rowId.toString),"data" ->parse(json))))
})
implicit class DFHelper(df: DataFrame) {
import df.sparkSession.implicits._
def parseJson = df.sparkSession.read.option("multiLine","true").json(df.map(_.getString(0)))
//Convert string to json object or array of json object
def extract(column: Column) = {
val updatedDF = df.withColumn("rowId",row_number().over(Window.orderBy(lit(1))))
val parsedDF = updatedDF.filter(column.isNotNull)
.select(append($"rowid",column).as("row"))
.parseJson
updatedDF.join(
parsedDF.select($"rowId",$"data".as(column.toString())),
updatedDF("rowId") === parsedDF("rowId"),
"left"
)
.drop("rowId") // Deleting added rowId column.
}
}
scala> df.extract($"hospitalization").printSchema()
root
|-- appointmentRef: string (nullable = true)
|-- billingAccount: string (nullable = true)
|-- eid: string (nullable = true)
|-- encounterLengh: string (nullable = true)
|-- hospitalization: string (nullable = true)
|-- priority: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- status: string (nullable = true)
|-- subject: string (nullable = true)
|-- hospitalization: struct (nullable = true)
| |-- admitSourceCode: string (nullable = true)
| |-- admitSourceReason: string (nullable = true)
| |-- destination: string (nullable = true)
| |-- eid: long (nullable = true)
| |-- encounterLengh: string (nullable = true)
| |-- origin: string (nullable = true)
| |-- preAdmissionIdentifierSystem: string (nullable = true)
| |-- preAdmissionIdentifierValue: string (nullable = true)
scala> df.extract($"hospitalization").show(false)
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
|appointmentRef |billingAccount|eid|encounterLengh|hospitalization |priority|resourceType|status |subject |hospitalization |
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
|Appointment/12213#4200|savingsAccount|200|1 |{"encounterLengh": "1","preAdmissionIdentifierSystem":"https://system123445.html","preAdmissionIdentifierValue":"pqr","origin":"hospital","admitSourceCode":"outp","admitSourceReason":"some thing","eid":200,"destination":"hospital"}|abc |Encounter |triaged|Patient/435|[outp, some thing, hospital, 200, 1, hospital, https://system123445.html, pqr]|
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
也许这有帮助 -
加载提供的测试数据
val data =
"""
|{
| "appointmentRef": "Appointment/12213#4200",
| "encounterLengh": "2",
| "billingAccount": "savingsAccount",
| "hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
| "resourceType": "Encounter",
| "priority": "abc",
| "status": "triaged",
| "eid": "200",
| "subject": "Patient/435"
|}
""".stripMargin
val ds = Seq(data).toDF()
ds.show(false)
ds.printSchema()
/**
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |value |
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |
* {
* "appointmentRef": "Appointment/12213#4200",
* "encounterLengh": "2",
* "billingAccount": "savingsAccount",
* "hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
* "resourceType": "Encounter",
* "priority": "abc",
* "status": "triaged",
* "eid": "200",
* "subject": "Patient/435"
* }
* |
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- value: string (nullable = true)
*/
将\
替换为''
(空字符串)
ds.withColumn("value", translate($"value", "\", ""))
.show(false)
/**
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |value |
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |
* {
* "appointmentRef": "Appointment/12213#4200",
* "encounterLengh": "2",
* "billingAccount": "savingsAccount",
* "hospitalization": "{"preAdmissionIdentifierSystem":"https://system123445.html","preAdmissionIdentifierValue":"pqr","origin":"hospital","admitSourceCode":"outp","admitSourceReason":"some thing","eid":200,"destination":"hospital"}",
* "resourceType": "Encounter",
* "priority": "abc",
* "status": "triaged",
* "eid": "200",
* "subject": "Patient/435"
* }
* |
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*/
我正在处理一个 FHIR
资源,我在其中获得如下 JSON 数据:
{
"appointmentRef": "Appointment/12213#4200",
"encounterLengh": "2",
"billingAccount": "savingsAccount",
"hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
"resourceType": "Encounter",
"priority": "abc",
"status": "triaged",
"eid": "200",
"subject": "Patient/435"
}
所以,以前对于根级别的属性,如 appointmentRef
等..他们在 R.H.S 上也有 "\"
,我可以通过以下方式将其删除我的代码。但是,从上面的数据可以看出,对于嵌套属性,我的代码不起作用。
rowList.groupBy(row => row.key).foreach(rowList => {
import com.google.gson.{Gson, JsonObject}
val map: Map[String, String] = mutable.Map()
rowList._2.foreach(row => {
LOGGER.debug(s"row == $row")
if (Utility.isBlank(row.jsonElementTag)) {
val convertedObject = new Gson().fromJson(row.value, classOf[JsonObject])
val itr = convertedObject.entrySet().iterator()
while (itr.hasNext) {
val next = itr.next()
val value = next.getValue.getAsString
val key = next.getKey
LOGGER.debug(s"key-- $key value --$value")
map.put(key, value)
}
}
else {
val convertedObject = new Gson().fromJson(row.value, classOf[JsonObject])
LOGGER.debug(s"convertedObject == $convertedObject")
if (null != map.get(row.jsonElementTag).getOrElse(null)) {
LOGGER.debug("map.get(row.jsonElementTag).get === "+row.jsonElementTag +" "+map.get(row.jsonElementTag).get)
var array: JsonArray = new JsonArray
val mapElement = new Gson().fromJson(map.get(row.jsonElementTag).get, classOf[JsonObject])
array.add(mapElement)
array.add(convertedObject)
map.put(row.jsonElementTag, array.toString)
}
else {
map.put(row.jsonElementTag, convertedObject.toString)
}
}
})
我只是从数据框中获取行并迭代这些行,将其作为字符串,并将其放入键值对中。 if
循环将对父级属性执行 运行,对嵌套属性将执行 else-if
循环。
我什至尝试了更简单的replace("\","")
方法,但没有用。那么,如何从嵌套属性中删除 "\"
?
我的 expected
输出是我的嵌套 JSON 属性中应该没有 "\"
。
hospitalization
列是字符串类型,它包含 json object
。要将字符串提取或转换为 json,请根据该列中的数据准备 schema
。
检查下面的代码。
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val schema = DataType.fromJson("""{"type":"struct","fields":[{"name":"admitSourceCode","type":"string","nullable":true,"metadata":{}},{"name":"admitSourceReason","type":"string","nullable"
:true,"metadata":{}},{"name":"destination","type":"string","nullable":true,"metadata":{}},{"name":"eid","type":"long","nullable":true,"metadata":{}},{"name":"origin","type":"string","nullable":tr
ue,"metadata":{}},{"name":"preAdmissionIdentifierSystem","type":"string","nullable":true,"metadata":{}},{"name":"preAdmissionIdentifierValue","type":"string","nullable":true,"metadata":{}}]}""").
asInstanceOf[StructType]
scala> df.withColumn("hospitalization",from_json($"hospitalization",schema)).printSchema
root
|-- appointmentRef: string (nullable = true)
|-- billingAccount: string (nullable = true)
|-- eid: string (nullable = true)
|-- encounterLengh: string (nullable = true)
|-- hospitalization: struct (nullable = true)
| |-- admitSourceCode: string (nullable = true)
| |-- admitSourceReason: string (nullable = true)
| |-- destination: string (nullable = true)
| |-- eid: long (nullable = true)
| |-- origin: string (nullable = true)
| |-- preAdmissionIdentifierSystem: string (nullable = true)
| |-- preAdmissionIdentifierValue: string (nullable = true)
|-- priority: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- status: string (nullable = true)
|-- subject: string (nullable = true)
scala> df.withColumn("hospitalization",from_json($"hospitalization",schema)).show(false)
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
|appointmentRef |billingAccount|eid|encounterLengh|hospitalization |priority|resourceType|status |subject |
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
|Appointment/12213#4200|savingsAccount|200|2 |[outp, some thing, hospital, 200, hospital, https://system123445.html, pqr]|abc |Encounter |triaged|Patient/435|
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------+--------+------------+-------+-----------+
更新
创建了小帮手 class 以在没有架构的情况下提取或转换 json。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._
val append = udf((rowId: Long,json: String) => {
compact(render(Map("rowId" -> parse(rowId.toString),"data" ->parse(json))))
})
implicit class DFHelper(df: DataFrame) {
import df.sparkSession.implicits._
def parseJson = df.sparkSession.read.option("multiLine","true").json(df.map(_.getString(0)))
//Convert string to json object or array of json object
def extract(column: Column) = {
val updatedDF = df.withColumn("rowId",row_number().over(Window.orderBy(lit(1))))
val parsedDF = updatedDF.filter(column.isNotNull)
.select(append($"rowid",column).as("row"))
.parseJson
updatedDF.join(
parsedDF.select($"rowId",$"data".as(column.toString())),
updatedDF("rowId") === parsedDF("rowId"),
"left"
)
.drop("rowId") // Deleting added rowId column.
}
}
scala> df.extract($"hospitalization").printSchema()
root
|-- appointmentRef: string (nullable = true)
|-- billingAccount: string (nullable = true)
|-- eid: string (nullable = true)
|-- encounterLengh: string (nullable = true)
|-- hospitalization: string (nullable = true)
|-- priority: string (nullable = true)
|-- resourceType: string (nullable = true)
|-- status: string (nullable = true)
|-- subject: string (nullable = true)
|-- hospitalization: struct (nullable = true)
| |-- admitSourceCode: string (nullable = true)
| |-- admitSourceReason: string (nullable = true)
| |-- destination: string (nullable = true)
| |-- eid: long (nullable = true)
| |-- encounterLengh: string (nullable = true)
| |-- origin: string (nullable = true)
| |-- preAdmissionIdentifierSystem: string (nullable = true)
| |-- preAdmissionIdentifierValue: string (nullable = true)
scala> df.extract($"hospitalization").show(false)
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
|appointmentRef |billingAccount|eid|encounterLengh|hospitalization |priority|resourceType|status |subject |hospitalization |
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
|Appointment/12213#4200|savingsAccount|200|1 |{"encounterLengh": "1","preAdmissionIdentifierSystem":"https://system123445.html","preAdmissionIdentifierValue":"pqr","origin":"hospital","admitSourceCode":"outp","admitSourceReason":"some thing","eid":200,"destination":"hospital"}|abc |Encounter |triaged|Patient/435|[outp, some thing, hospital, 200, 1, hospital, https://system123445.html, pqr]|
+----------------------+--------------+---+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------------+-------+-----------+------------------------------------------------------------------------------+
也许这有帮助 -
加载提供的测试数据
val data =
"""
|{
| "appointmentRef": "Appointment/12213#4200",
| "encounterLengh": "2",
| "billingAccount": "savingsAccount",
| "hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
| "resourceType": "Encounter",
| "priority": "abc",
| "status": "triaged",
| "eid": "200",
| "subject": "Patient/435"
|}
""".stripMargin
val ds = Seq(data).toDF()
ds.show(false)
ds.printSchema()
/**
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |value |
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |
* {
* "appointmentRef": "Appointment/12213#4200",
* "encounterLengh": "2",
* "billingAccount": "savingsAccount",
* "hospitalization": "{\"preAdmissionIdentifierSystem\":\"https://system123445.html\",\"preAdmissionIdentifierValue\":\"pqr\",\"origin\":\"hospital\",\"admitSourceCode\":\"outp\",\"admitSourceReason\":\"some thing\",\"eid\":200,\"destination\":\"hospital\"}",
* "resourceType": "Encounter",
* "priority": "abc",
* "status": "triaged",
* "eid": "200",
* "subject": "Patient/435"
* }
* |
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- value: string (nullable = true)
*/
将\
替换为''
(空字符串)
ds.withColumn("value", translate($"value", "\", ""))
.show(false)
/**
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |value |
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |
* {
* "appointmentRef": "Appointment/12213#4200",
* "encounterLengh": "2",
* "billingAccount": "savingsAccount",
* "hospitalization": "{"preAdmissionIdentifierSystem":"https://system123445.html","preAdmissionIdentifierValue":"pqr","origin":"hospital","admitSourceCode":"outp","admitSourceReason":"some thing","eid":200,"destination":"hospital"}",
* "resourceType": "Encounter",
* "priority": "abc",
* "status": "triaged",
* "eid": "200",
* "subject": "Patient/435"
* }
* |
* +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*/