以一列值作为键的 Spark PySpark 输出 JSON
Spark PySpark output JSON with one column value as key
如果我在具有架构的数据框中有一个简单的 table:
a string, b string, c string
例如:
a b c
cat 3-3 78-b
cat 3-3 89-0
cat 4-4 78-n
dog 4-4 89-b
等等。
我想按 a 列对这个 table 进行分区,并将每个分区保存为单独的 JSON.
此外,我希望每个分区都作为一个 JSON 文件,其中 b 列中的值作为键。例如:
File cat.json:
{
"3-3": {"b": "3-3", "c": "78-b"},
"3-3": {"b": "3-3", "c": "89-0"},
"4-4": {"b": "4-4", "c": "78-n"}
}
File dog.json:
{
"4-4": {"b": 4-4, "c": "89-b"}
}
有没有办法在 pyspark 中做到这一点?谢谢
只需为Dataframe添加一个行映射逻辑就可以了,
请找到内联代码解释
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
object CatDog {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val df = List(("cat", "3-3" ,"78-b"),
("cat" ,"3-3", "89-0"),
("cat" ,"4-4" ,"78-n"),
("dog" ,"4-4", "89-b")).toDF("a","b","c")
// df.show()
//Write your out as JSON
df.select("a").distinct().map((a: Row) =>
(a, df.filter(col("a") === a).map(row => parseDF(row)))
).foreachPartition((iterator) => {
iterator.foreach(record => {
val aVal = record._1.getString(0)
record._2.write.json(s"src/main/resources/$aVal.json")
})
})
}
//Row mapping logic
def parseDF(row: Row): Map[String, Map[String, String]] = {
val b = row.getString(1)
val c = row.getString(2)
Map(b -> Map("b" -> b, "c" -> c))
}
}
尝试使用以下解决方案-
1。加载数据
val data =
"""
|a | b | c
|cat | 3-3 | 78-b
|cat | 3-3 | 89-0
|cat | 4-4 | 78-n
|dog | 4-4 | 89-b
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+---+----+
* |a |b |c |
* +---+---+----+
* |cat|3-3|78-b|
* |cat|3-3|89-0|
* |cat|4-4|78-n|
* |dog|4-4|89-b|
* +---+---+----+
*
* root
* |-- a: string (nullable = true)
* |-- b: string (nullable = true)
* |-- c: string (nullable = true)
*/
2。根据需要创建地图
val processedDF = df
.groupBy("a")
.agg(
collect_list(struct(col("b"), col("c"))).as("value"),
collect_list(col("b")).as("key")
)
.withColumn("map", map_from_arrays(col("key"), col("value")))
processedDF.show(false)
processedDF.printSchema()
/**
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |a |value |key |map |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |cat|[[3-3, 78-b], [3-3, 89-0], [4-4, 78-n]]|[3-3, 3-3, 4-4]|[3-3 -> [3-3, 78-b], 3-3 -> [3-3, 89-0], 4-4 -> [4-4, 78-n]]|
* |dog|[[4-4, 89-b]] |[4-4] |[4-4 -> [4-4, 89-b]] |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
*
* root
* |-- a: string (nullable = true)
* |-- value: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
* |-- key: array (nullable = true)
* | |-- element: string (containsNull = true)
* |-- map: map (nullable = true)
* | |-- key: string
* | |-- value: struct (valueContainsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
*/
3。保存数据帧
processedDF.select(col("a"), to_json(col("map"))).write
.mode(SaveMode.Overwrite)
.partitionBy("a")
.text("/Users/sokale/models/run_2")
/**
* File directory and content of file
* a=cat
* |- {"3-3":{"b":"3-3","c":"78-b"},"3-3":{"b":"3-3","c":"89-0"},"4-4":{"b":"4-4","c":"78-n"}}
* a=dog
* |- {"4-4":{"b":"4-4","c":"89-b"}}
*/
如果我在具有架构的数据框中有一个简单的 table:
a string, b string, c string
例如:
a b c
cat 3-3 78-b
cat 3-3 89-0
cat 4-4 78-n
dog 4-4 89-b
等等。 我想按 a 列对这个 table 进行分区,并将每个分区保存为单独的 JSON.
此外,我希望每个分区都作为一个 JSON 文件,其中 b 列中的值作为键。例如:
File cat.json:
{
"3-3": {"b": "3-3", "c": "78-b"},
"3-3": {"b": "3-3", "c": "89-0"},
"4-4": {"b": "4-4", "c": "78-n"}
}
File dog.json:
{
"4-4": {"b": 4-4, "c": "89-b"}
}
有没有办法在 pyspark 中做到这一点?谢谢
只需为Dataframe添加一个行映射逻辑就可以了, 请找到内联代码解释
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
object CatDog {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
val df = List(("cat", "3-3" ,"78-b"),
("cat" ,"3-3", "89-0"),
("cat" ,"4-4" ,"78-n"),
("dog" ,"4-4", "89-b")).toDF("a","b","c")
// df.show()
//Write your out as JSON
df.select("a").distinct().map((a: Row) =>
(a, df.filter(col("a") === a).map(row => parseDF(row)))
).foreachPartition((iterator) => {
iterator.foreach(record => {
val aVal = record._1.getString(0)
record._2.write.json(s"src/main/resources/$aVal.json")
})
})
}
//Row mapping logic
def parseDF(row: Row): Map[String, Map[String, String]] = {
val b = row.getString(1)
val c = row.getString(2)
Map(b -> Map("b" -> b, "c" -> c))
}
}
尝试使用以下解决方案-
1。加载数据
val data =
"""
|a | b | c
|cat | 3-3 | 78-b
|cat | 3-3 | 89-0
|cat | 4-4 | 78-n
|dog | 4-4 | 89-b
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+---+----+
* |a |b |c |
* +---+---+----+
* |cat|3-3|78-b|
* |cat|3-3|89-0|
* |cat|4-4|78-n|
* |dog|4-4|89-b|
* +---+---+----+
*
* root
* |-- a: string (nullable = true)
* |-- b: string (nullable = true)
* |-- c: string (nullable = true)
*/
2。根据需要创建地图
val processedDF = df
.groupBy("a")
.agg(
collect_list(struct(col("b"), col("c"))).as("value"),
collect_list(col("b")).as("key")
)
.withColumn("map", map_from_arrays(col("key"), col("value")))
processedDF.show(false)
processedDF.printSchema()
/**
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |a |value |key |map |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |cat|[[3-3, 78-b], [3-3, 89-0], [4-4, 78-n]]|[3-3, 3-3, 4-4]|[3-3 -> [3-3, 78-b], 3-3 -> [3-3, 89-0], 4-4 -> [4-4, 78-n]]|
* |dog|[[4-4, 89-b]] |[4-4] |[4-4 -> [4-4, 89-b]] |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
*
* root
* |-- a: string (nullable = true)
* |-- value: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
* |-- key: array (nullable = true)
* | |-- element: string (containsNull = true)
* |-- map: map (nullable = true)
* | |-- key: string
* | |-- value: struct (valueContainsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
*/
3。保存数据帧
processedDF.select(col("a"), to_json(col("map"))).write
.mode(SaveMode.Overwrite)
.partitionBy("a")
.text("/Users/sokale/models/run_2")
/**
* File directory and content of file
* a=cat
* |- {"3-3":{"b":"3-3","c":"78-b"},"3-3":{"b":"3-3","c":"89-0"},"4-4":{"b":"4-4","c":"78-n"}}
* a=dog
* |- {"4-4":{"b":"4-4","c":"89-b"}}
*/