在 Spark 2.x (Scala) 中使用平面图分解 Cassandra UDT
Explode Cassandra UDT with flatmap in Spark 2.x (Scala)
我在 Cassandra (3.11.2) 中有数据,这也是我的 df :
Cassandra 中的数据:
id | some_data
-- | ---------
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]
df 详情:
df.printSchema()
//| |-- id: integer (nullable = true)
//| |-- some_data: array (nullable = true)
//| | |-- element: struct (containsNull = true)
//| | | |-- s1: string (nullable = true)
//| | | |-- s2: string (nullable = true)
此处 Cassandra 模式定义为:
id : String
some_data : list frozen test_udt created as -->
CREATE TYPE test.test_udt (
s1 text,
s2 text
);
我正在使用 spark-cassandra-connector 2.0 从 Cassandra 中提取数据以在 Spark 2.2.1 上进行处理。
需要输出
输出是 df 的分解形式
id | some_data | s1 | s2
-- | ---------------------------------------------------| ----- | ----
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34
我过去的做法
我已经使用了 spark-cassandra-connector 1.6 和 Spark 1.6,我有一个解决上述问题的巧妙方法:
import org.apache.spark.sql.functions._
case class my_data(s1 : String, s2 : String)
val flatData = df.explode(df("some_data")){
case Row(x : Seq[Row]) =>
x.map(x =>
my_data(
x.apply(0).asInstanceOf[String],
x.apply(1).asInstanceOf[String]
))
}
flatData.show()
升级到 2.x 后,我在使用 explode
功能时遇到错误。火花文档说 explode
已弃用。建议 flatMap
替代 explode
。
问题:
- 如何在 Scala 中分解 Dataframe 以获得与以前相同的结果?
- 如何使用
flatmap
翻译我的旧代码?
您可以使用 explode
function,这也被建议作为 explode
方法的替代方法。 getItem
用于通过名称从 struct
中获取字段。
df.withColumn("exploded" , explode($"some_data"))
.withColumn("s1" , $"exploded".getItem("s1"))
.withColumn("s2" , $"exploded".getItem("s2"))
.drop("exploded")
.show(false)
//+---+------------------------------+-----+-----+
//|id |some_data |s1 |s2 |
//+---+------------------------------+-----+-----+
//|1 |[[str11,str12], [str13,str14]]|str11|str12|
//|1 |[[str11,str12], [str13,str14]]|str13|str14|
//|2 |[[str21,str22], [str23,str24]]|str21|str22|
//|2 |[[str21,str22], [str23,str24]]|str23|str24|
//|3 |[[str31,str32], [str33,str44]]|str31|str32|
//|3 |[[str31,str32], [str33,str44]]|str33|str44|
//+---+------------------------------+-----+-----+
我在 Cassandra (3.11.2) 中有数据,这也是我的 df :
Cassandra 中的数据:
id | some_data
-- | ---------
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]
df 详情:
df.printSchema()
//| |-- id: integer (nullable = true)
//| |-- some_data: array (nullable = true)
//| | |-- element: struct (containsNull = true)
//| | | |-- s1: string (nullable = true)
//| | | |-- s2: string (nullable = true)
此处 Cassandra 模式定义为:
id : String
some_data : list frozen test_udt created as --> CREATE TYPE test.test_udt ( s1 text, s2 text );
我正在使用 spark-cassandra-connector 2.0 从 Cassandra 中提取数据以在 Spark 2.2.1 上进行处理。
需要输出
输出是 df 的分解形式
id | some_data | s1 | s2
-- | ---------------------------------------------------| ----- | ----
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1 | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2 | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3 | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34
我过去的做法
我已经使用了 spark-cassandra-connector 1.6 和 Spark 1.6,我有一个解决上述问题的巧妙方法:
import org.apache.spark.sql.functions._
case class my_data(s1 : String, s2 : String)
val flatData = df.explode(df("some_data")){
case Row(x : Seq[Row]) =>
x.map(x =>
my_data(
x.apply(0).asInstanceOf[String],
x.apply(1).asInstanceOf[String]
))
}
flatData.show()
升级到 2.x 后,我在使用 explode
功能时遇到错误。火花文档说 explode
已弃用。建议 flatMap
替代 explode
。
问题:
- 如何在 Scala 中分解 Dataframe 以获得与以前相同的结果?
- 如何使用
flatmap
翻译我的旧代码?
您可以使用 explode
function,这也被建议作为 explode
方法的替代方法。 getItem
用于通过名称从 struct
中获取字段。
df.withColumn("exploded" , explode($"some_data"))
.withColumn("s1" , $"exploded".getItem("s1"))
.withColumn("s2" , $"exploded".getItem("s2"))
.drop("exploded")
.show(false)
//+---+------------------------------+-----+-----+
//|id |some_data |s1 |s2 |
//+---+------------------------------+-----+-----+
//|1 |[[str11,str12], [str13,str14]]|str11|str12|
//|1 |[[str11,str12], [str13,str14]]|str13|str14|
//|2 |[[str21,str22], [str23,str24]]|str21|str22|
//|2 |[[str21,str22], [str23,str24]]|str23|str24|
//|3 |[[str31,str32], [str33,str44]]|str31|str32|
//|3 |[[str31,str32], [str33,str44]]|str33|str44|
//+---+------------------------------+-----+-----+