如何使用 Spark DataFrames 查询 JSON 数据列?
How to query JSON data column using Spark DataFrames?
我有一个 Cassandra table 为简单起见,它看起来像这样:
key: text
jsonData: text
blobData: blob
我可以使用 spark 和 spark-cassandra-connector 为此创建一个基本数据框:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
我正在努力将 JSON 数据扩展到其底层结构中。我最终希望能够根据 json 字符串和 return blob 数据中的属性进行过滤。类似于 jsonData.foo = "bar" 和 return blobData。目前可以吗?
Spark >= 2.4
如果需要,可以使用 schema_of_json
函数确定架构(请注意,这假定任意行是架构的有效代表)。
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark >= 2.1
您可以使用from_json
函数:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark >= 1.6
您可以使用 get_json_object
,它带有一个列和一个路径:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
并将字段提取到可以进一步转换为预期类型的单个字符串。
path
参数使用点语法表示,前导 $.
表示文档根(因为上面的代码使用字符串插值 $
必须转义,因此 $$.
).
Spark <= 1.5:
Is this currently possible?
据我所知这是不可能的。您可以尝试类似的操作:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
我假设 blob
字段不能在 JSON 中表示。否则你可以忽略拆分和连接:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
另一种(更便宜,但更复杂)的方法是使用 UDF 来解析 JSON 并输出 struct
或 map
列。例如这样的事情:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)
基础 JSON 字符串是
"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
下面是过滤 JSON 并将所需数据加载到 Cassandra 的脚本。
sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
.mode(SaveMode.Append)
.save()
from_json
函数正是您要找的。您的代码将类似于:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
//You can define whatever struct type that your json states
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, true)
))
df.withColumn("jsonData", from_json(col("jsonData"), schema))
我使用以下
(自 2.2.0 起可用,我假设您的 json 字符串列位于列索引 0)
def parse(df: DataFrame, spark: SparkSession): DataFrame = {
val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
spark.read.json(stringDf)
}
它将自动推断您 JSON 中的模式。记录在这里:
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
是彻底的,但遗漏了 Spark 2.1+ 中可用的一种方法,并且比使用 schema_of_json()
:
更简单、更可靠
import org.apache.spark.sql.functions.from_json
val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))
这是 Python 等价物:
from pyspark.sql.functions import from_json
json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))
正如 zero323 指出的那样,schema_of_json()
的问题在于它检查单个字符串并从中导出模式。如果您的 JSON 数据具有不同的模式,那么您从 schema_of_json()
返回的模式将不会反映如果您要合并所有 JSON 数据的模式会得到什么数据框。使用 from_json()
解析该数据将产生大量 null
或空值,其中 schema_of_json()
返回的模式与数据不匹配。
通过使用 Spark 的能力从 JSON 字符串的 RDD 中导出一个全面的 JSON 模式,我们可以保证所有 JSON 数据都可以被解析。
示例:schema_of_json()
与 spark.read.json()
这是一个示例(在 Python 中,代码与 Scala 非常相似)来说明使用 schema_of_json()
从单个元素派生模式与使用 schema_of_json()
从所有数据派生模式之间的区别spark.read.json()
.
>>> df = spark.createDataFrame(
... [
... (1, '{"a": true}'),
... (2, '{"a": "hello"}'),
... (3, '{"b": 22}'),
... ],
... schema=['id', 'jsonData'],
... )
a
在一行中有一个布尔值,在另一行中有一个字符串值。 a
的合并模式会将其类型设置为字符串。 b
将是一个整数。
让我们看看不同方法的比较。首先,schema_of_json()
方法:
>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: boolean (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true]|
| 2| null|
| 3| []|
+---+--------+
如您所见,我们导出的 JSON 模式非常有限。 "a": "hello"
无法解析为布尔值并返回 null
,而 "b": 22
刚刚被删除,因为它不在我们的架构中。
现在 spark.read.json()
:
>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: long (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true,]|
| 2|[hello,]|
| 3| [, 22]|
+---+--------+
在这里,我们保留了所有数据,并使用一个涵盖所有数据的综合架构。 "a": true
被转换为字符串以匹配 "a": "hello"
的模式。
使用 spark.read.json()
的主要缺点是 Spark 将扫描您的所有数据以导出模式。根据您拥有的数据量,该开销可能会很大。如果您 知道 您的所有 JSON 数据都具有一致的架构,那么可以继续并仅针对单个元素使用 schema_of_json()
。如果您有架构可变性但不想扫描所有数据,您可以在调用 spark.read.json()
时将 samplingRatio
设置为小于 1.0
的值以查看数据的子集数据。
以下是 spark.read.json()
的文档:Scala API / Python API
我有一个 Cassandra table 为简单起见,它看起来像这样:
key: text
jsonData: text
blobData: blob
我可以使用 spark 和 spark-cassandra-connector 为此创建一个基本数据框:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
我正在努力将 JSON 数据扩展到其底层结构中。我最终希望能够根据 json 字符串和 return blob 数据中的属性进行过滤。类似于 jsonData.foo = "bar" 和 return blobData。目前可以吗?
Spark >= 2.4
如果需要,可以使用 schema_of_json
函数确定架构(请注意,这假定任意行是架构的有效代表)。
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark >= 2.1
您可以使用from_json
函数:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark >= 1.6
您可以使用 get_json_object
,它带有一个列和一个路径:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
并将字段提取到可以进一步转换为预期类型的单个字符串。
path
参数使用点语法表示,前导 $.
表示文档根(因为上面的代码使用字符串插值 $
必须转义,因此 $$.
).
Spark <= 1.5:
Is this currently possible?
据我所知这是不可能的。您可以尝试类似的操作:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
我假设 blob
字段不能在 JSON 中表示。否则你可以忽略拆分和连接:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
另一种(更便宜,但更复杂)的方法是使用 UDF 来解析 JSON 并输出 struct
或 map
列。例如这样的事情:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)
基础 JSON 字符串是
"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
下面是过滤 JSON 并将所需数据加载到 Cassandra 的脚本。
sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
.write.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
.mode(SaveMode.Append)
.save()
from_json
函数正是您要找的。您的代码将类似于:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
//You can define whatever struct type that your json states
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, true)
))
df.withColumn("jsonData", from_json(col("jsonData"), schema))
我使用以下
(自 2.2.0 起可用,我假设您的 json 字符串列位于列索引 0)
def parse(df: DataFrame, spark: SparkSession): DataFrame = {
val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
spark.read.json(stringDf)
}
它将自动推断您 JSON 中的模式。记录在这里: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
schema_of_json()
:
import org.apache.spark.sql.functions.from_json
val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))
这是 Python 等价物:
from pyspark.sql.functions import from_json
json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))
正如 zero323 指出的那样,schema_of_json()
的问题在于它检查单个字符串并从中导出模式。如果您的 JSON 数据具有不同的模式,那么您从 schema_of_json()
返回的模式将不会反映如果您要合并所有 JSON 数据的模式会得到什么数据框。使用 from_json()
解析该数据将产生大量 null
或空值,其中 schema_of_json()
返回的模式与数据不匹配。
通过使用 Spark 的能力从 JSON 字符串的 RDD 中导出一个全面的 JSON 模式,我们可以保证所有 JSON 数据都可以被解析。
示例:schema_of_json()
与 spark.read.json()
这是一个示例(在 Python 中,代码与 Scala 非常相似)来说明使用 schema_of_json()
从单个元素派生模式与使用 schema_of_json()
从所有数据派生模式之间的区别spark.read.json()
.
>>> df = spark.createDataFrame(
... [
... (1, '{"a": true}'),
... (2, '{"a": "hello"}'),
... (3, '{"b": 22}'),
... ],
... schema=['id', 'jsonData'],
... )
a
在一行中有一个布尔值,在另一行中有一个字符串值。 a
的合并模式会将其类型设置为字符串。 b
将是一个整数。
让我们看看不同方法的比较。首先,schema_of_json()
方法:
>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: boolean (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true]|
| 2| null|
| 3| []|
+---+--------+
如您所见,我们导出的 JSON 模式非常有限。 "a": "hello"
无法解析为布尔值并返回 null
,而 "b": 22
刚刚被删除,因为它不在我们的架构中。
现在 spark.read.json()
:
>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
|-- id: long (nullable = true)
|-- jsonData: struct (nullable = true)
| |-- a: string (nullable = true)
| |-- b: long (nullable = true)
>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
| 1| [true,]|
| 2|[hello,]|
| 3| [, 22]|
+---+--------+
在这里,我们保留了所有数据,并使用一个涵盖所有数据的综合架构。 "a": true
被转换为字符串以匹配 "a": "hello"
的模式。
使用 spark.read.json()
的主要缺点是 Spark 将扫描您的所有数据以导出模式。根据您拥有的数据量,该开销可能会很大。如果您 知道 您的所有 JSON 数据都具有一致的架构,那么可以继续并仅针对单个元素使用 schema_of_json()
。如果您有架构可变性但不想扫描所有数据,您可以在调用 spark.read.json()
时将 samplingRatio
设置为小于 1.0
的值以查看数据的子集数据。
以下是 spark.read.json()
的文档:Scala API / Python API