Scala Dataframe/SQL:报告的动态列选择
Scala Dataframe/SQL: Dynamic Column Selection for Reporting
上下文:
我们有一个有 5 个流的模型。
- Flow-1:来自不同来源的输入数据,这些输入到 Flow-2
- Flow-2、Flow-3 和 Flow-4:是 ML 模型,每个模型都将几个字段转储到 s3
- Flow-5:包含来自 Flow-2、Flow-3、Flow-4 输出的数据的报告层
- 整体数据量很小
问题:
- Flow-5 是基于几个 SQL 的报告层,输入数据来自 Flow-2、Flow-3 和 Flow-4。
Flow-2,Flow-3,Flow-4共同加入一个字段,其余字段不同
我们可以创建一个 SQL joining Flow-2,3,4 数据,存储在三个不同的 tables 中,很少 calculations/Aggregation。但是数量Flow-2、3、4 的输出字段可能在每个 运行.
中有所不同
- 问题 1:每次 s3 文件 (Flow-2/3/4) 结构更改时,这通常会导致在复制期间出现问题,因为目标 table 架构与 s3 文件结构不同(要修复,需要在目标 table 中手动 add/delete 字段,以与 s3 数据对齐)
- 问题 2:对于 s3 文件中的任何 additions/deletion,需要按 adding/removing 列
在报告中进行更改
方法:
SQL 方式 - 标准化 s3 dump/target table/report SQL 即标准化每个流中可能的列数(2,3,4 ) 输出,也在目标 table 中,因此如果任何字段不可用,只需在 s3 转储期间将它们加载为 NULL/blank 并将 COPY 为空白。标准化与 s3 模板对齐的目标 table 结构。同时标准化报告 SQL
SCALA/SPARK :目前正在探索这个选项。执行 PoC,创建了两个 s3 转储,在 scala 中创建了两个数据帧,尝试了数据帧连接以及 spark SQL 连接。我仍然不确定我们是否可以动态选择新列,即使 spark 代码通用。
通过创建直接指向 s3 的数据帧,我们可以解决 COPY 数据(动态字段)到目标 table 的问题。
但是,报告 SQL 问题仍然存在(或者至少我不知道,需要找到一种方法来处理它)
问题:
无论如何,我们可以处理 Scala/SparkSQL 中的问题(SQL 中的动态列选择)吗?
注意:s3 文件始终只包含每个 run.Issue 中的必填字段,而不是从 s3 中读取动态字段(这将由数据框自动处理,问题是我们如何使 SQL(saprkSQL/API) 处理这个的代码)
Example/Scenario-1:
运行-1
df1= country,1st_buy(这里dataframe直接指向s3,只有required属性)
df2= country,percent(这里dataframe直接指向s3,只有required属性)
--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
df1.percent/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
运行-2(这里向 df1 添加了一个额外的列)
df1= country,1st_buy,2nd_buy(这里dataframe直接指向s3,只有required属性)
df2= country,percent(这里dataframe直接指向s3,只有required属性)
****how can we handle this part of new field 2nd_buy dynamically****
--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
df1.2nd_buy,
df1.1st_buy/df2.percent -- DERIVED FIELD
df1.2nd_buy/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
Example/Scenario-2:
运行-1
- df1= country,1st_buy(这里dataframe直接指向s3,只有required属性)
--Sample SQL
SELECT country,sum(df1.1st_buy)
FROM df1
GROUP BY country
--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy").show()
运行-2(这里向 df1 添加了一个额外的列)
- df1= country,1st_buy,2nd_buy(这里dataframe直接指向s3,只有required属性)
****how can we handle this part of new field 2nd_buy dynamically****
--Sample SQL
SELECT country,sum(df1.1st_buy),sum(2nd_buy)
FROM df1
GROUP BY country
--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy","2nd_buy").show()
{
1.
val sqlScript = "select col1, col2, .... from ... "
// string we can create dynamic
val df = spark.sql(sqlScript)
2. try use schema = = StructType(Seq(
StructField("id",LongType,true),
....
))
// and then use schema.fieldsName... or
val cols: List[Columns] = ...
// in df.select(cols:_*)
3. get schema (list fields with json file)
package spark
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}
import scala.io.Source
object DFFieldsWithJson extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class TestData (
id: Int,
firstName: String,
lastName: String,
descr: String
)
val dataTestDF = Seq(
TestData(1, "First Name 1", "Last Name 1", "Description 1"),
TestData(2, "First Name 2", "Last Name 2", "Description 2"),
TestData(3, "First Name 3", "Last Name 3", "Description 3")
).toDF()
dataTestDF.show(false)
// +---+------------+-----------+-------------+
// |id |firstName |lastName |descr |
// +---+------------+-----------+-------------+
// |1 |First Name 1|Last Name 1|Description 1|
// |2 |First Name 2|Last Name 2|Description 2|
// |3 |First Name 3|Last Name 3|Description 3|
// +---+------------+-----------+-------------+
val schemaJson =
"""{ "type" : "struct",
|"fields" : [
|{
| "name" : "id",
| "type" : "integer",
| "nullable" : true,
| "metadata" : { }
| },
| {
| "name" : "firstName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| },
| {
| "name" : "lastName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| }
| ]}""".stripMargin
val schemaSource = schemaJson.mkString
val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
println(schemaFromJson)
// StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true))
val cols: List[String] = schemaFromJson.fieldNames.toList
val col: List[Column] = cols.map(dataTestDF(_))
val df = dataTestDF.select(col: _*)
df.printSchema()
// root
// |-- id: integer (nullable = false)
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
df.show(false)
// +---+------------+-----------+
// |id |firstName |lastName |
// +---+------------+-----------+
// |1 |First Name 1|Last Name 1|
// |2 |First Name 2|Last Name 2|
// |3 |First Name 3|Last Name 3|
// +---+------------+-----------+
}
}
{ 示例:
package spark
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.{col, column, sum}
object DynamicColumnSelection extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class c1(
country: String,
st_buy: Double,
nd_buy: Double
)
case class c2(
country: String,
percent: Double
)
val df1 = Seq(
c1("UA", 2, 4),
c1("PL", 3, 6),
c1("GB", 4, 8)
)
.toDF()
df1.show(false)
// +-------+------+------+
// |country|st_buy|nd_buy|
// +-------+------+------+
// |UA |2.0 |4.0 |
// |PL |3.0 |6.0 |
// |GB |4.0 |8.0 |
// +-------+------+------+
val df2 = Seq(
c2("UA", 2.21),
c2("PL", 3.26)
)
.toDF()
df2.show(false)
// +-------+-------+
// |country|percent|
// +-------+-------+
// |UA |2.21 |
// |PL |3.26 |
// +-------+-------+
// Inner Join
val df = df1.join(df2, df1.col("country") === df2.col("country"), "inner")
.select(
df1.col("country"),
df1.col("st_buy"),
df1.col("nd_buy"),
df2.col("percent")
)
df.show(false)
// +-------+------+------+-------+
// |country|st_buy|nd_buy|percent|
// +-------+------+------+-------+
// |UA |2.0 |4.0 |2.21 |
// |PL |3.0 |6.0 |3.26 |
// +-------+------+------+-------+
val res1DF = df.withColumn("st_buy_percent", 'st_buy/'percent)
.withColumn("nd_buy_percent", 'nd_buy/'percent)
res1DF.show(false)
// +-------+------+------+-------+------------------+------------------+
// |country|st_buy|nd_buy|percent|st_buy_percent |nd_buy_percent |
// +-------+------+------+-------+------------------+------------------+
// |UA |2.0 |4.0 |2.21 |0.9049773755656109|1.8099547511312217|
// |PL |3.0 |6.0 |3.26 |0.9202453987730062|1.8404907975460123|
// +-------+------+------+-------+------------------+------------------+
// GroupBy + sum
val data = Seq(
c1("UA", 2, 4),
c1("PL", 3, 6),
c1("UA", 5, 10),
c1("PL", 6, 12),
c1("GB", 4, 8)
)
.toDF()
val resGroupByDF = data
.groupBy("country")
.agg(sum("st_buy").alias("sum_st_buy")
,sum("nd_buy").alias("sum_nd_buy"))
resGroupByDF.show(false)
// +-------+----------+----------+
// |country|sum_st_buy|sum_nd_buy|
// +-------+----------+----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+----------+----------+
val resGroupByDF1 = data.groupBy($"country").sum()
resGroupByDF1.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
val exprs = data.columns.map(sum(_))
val resGroupByDF2 = data.groupBy($"country").agg(exprs.head, exprs.tail: _*)
resGroupByDF2.show(false)
// +-------+------------+-----------+-----------+
// |country|sum(country)|sum(st_buy)|sum(nd_buy)|
// +-------+------------+-----------+-----------+
// |UA |null |7.0 |14.0 |
// |PL |null |9.0 |18.0 |
// |GB |null |4.0 |8.0 |
// +-------+------------+-----------+-----------+
val exprs3 = List("st_buy", "nd_buy").map(sum(_))
val resGroupByDF3 = data.groupBy($"country").agg(exprs3.head, exprs3.tail: _*)
resGroupByDF3.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
val exprs4 = data.columns.toList.filter(_ != "country").map(sum(_))
val resGroupByDF4 = data.groupBy($"country").agg(exprs4.head, exprs4.tail: _*)
resGroupByDF4.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
// Select
val cols = data.columns.toSeq
val selectDF1 = data.select(cols.head, cols.tail:_*)
selectDF1.show(false)
// +-------+------+------+
// |country|st_buy|nd_buy|
// +-------+------+------+
// |UA |2.0 |4.0 |
// |PL |3.0 |6.0 |
// |UA |5.0 |10.0 |
// |PL |6.0 |12.0 |
// |GB |4.0 |8.0 |
// +-------+------+------+
}
}
上下文:
我们有一个有 5 个流的模型。
- Flow-1:来自不同来源的输入数据,这些输入到 Flow-2
- Flow-2、Flow-3 和 Flow-4:是 ML 模型,每个模型都将几个字段转储到 s3
- Flow-5:包含来自 Flow-2、Flow-3、Flow-4 输出的数据的报告层
- 整体数据量很小
问题:
- Flow-5 是基于几个 SQL 的报告层,输入数据来自 Flow-2、Flow-3 和 Flow-4。
Flow-2,Flow-3,Flow-4共同加入一个字段,其余字段不同
我们可以创建一个 SQL joining Flow-2,3,4 数据,存储在三个不同的 tables 中,很少 calculations/Aggregation。但是数量Flow-2、3、4 的输出字段可能在每个 运行.
中有所不同- 问题 1:每次 s3 文件 (Flow-2/3/4) 结构更改时,这通常会导致在复制期间出现问题,因为目标 table 架构与 s3 文件结构不同(要修复,需要在目标 table 中手动 add/delete 字段,以与 s3 数据对齐)
- 问题 2:对于 s3 文件中的任何 additions/deletion,需要按 adding/removing 列 在报告中进行更改
方法:
SQL 方式 - 标准化 s3 dump/target table/report SQL 即标准化每个流中可能的列数(2,3,4 ) 输出,也在目标 table 中,因此如果任何字段不可用,只需在 s3 转储期间将它们加载为 NULL/blank 并将 COPY 为空白。标准化与 s3 模板对齐的目标 table 结构。同时标准化报告 SQL
SCALA/SPARK :目前正在探索这个选项。执行 PoC,创建了两个 s3 转储,在 scala 中创建了两个数据帧,尝试了数据帧连接以及 spark SQL 连接。我仍然不确定我们是否可以动态选择新列,即使 spark 代码通用。
通过创建直接指向 s3 的数据帧,我们可以解决 COPY 数据(动态字段)到目标 table 的问题。
但是,报告 SQL 问题仍然存在(或者至少我不知道,需要找到一种方法来处理它)
问题:
无论如何,我们可以处理 Scala/SparkSQL 中的问题(SQL 中的动态列选择)吗? 注意:s3 文件始终只包含每个 run.Issue 中的必填字段,而不是从 s3 中读取动态字段(这将由数据框自动处理,问题是我们如何使 SQL(saprkSQL/API) 处理这个的代码)
Example/Scenario-1:
运行-1
df1= country,1st_buy(这里dataframe直接指向s3,只有required属性)
df2= country,percent(这里dataframe直接指向s3,只有required属性)
--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
df1.percent/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
运行-2(这里向 df1 添加了一个额外的列)
df1= country,1st_buy,2nd_buy(这里dataframe直接指向s3,只有required属性)
df2= country,percent(这里dataframe直接指向s3,只有required属性)
****how can we handle this part of new field 2nd_buy dynamically****
--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
df1.2nd_buy,
df1.1st_buy/df2.percent -- DERIVED FIELD
df1.2nd_buy/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
Example/Scenario-2:
运行-1
- df1= country,1st_buy(这里dataframe直接指向s3,只有required属性)
--Sample SQL
SELECT country,sum(df1.1st_buy)
FROM df1
GROUP BY country
--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy").show()
运行-2(这里向 df1 添加了一个额外的列)
- df1= country,1st_buy,2nd_buy(这里dataframe直接指向s3,只有required属性)
****how can we handle this part of new field 2nd_buy dynamically****
--Sample SQL
SELECT country,sum(df1.1st_buy),sum(2nd_buy)
FROM df1
GROUP BY country
--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy","2nd_buy").show()
{
1.
val sqlScript = "select col1, col2, .... from ... "
// string we can create dynamic
val df = spark.sql(sqlScript)
2. try use schema = = StructType(Seq(
StructField("id",LongType,true),
....
))
// and then use schema.fieldsName... or
val cols: List[Columns] = ...
// in df.select(cols:_*)
3. get schema (list fields with json file)
package spark
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}
import scala.io.Source
object DFFieldsWithJson extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class TestData (
id: Int,
firstName: String,
lastName: String,
descr: String
)
val dataTestDF = Seq(
TestData(1, "First Name 1", "Last Name 1", "Description 1"),
TestData(2, "First Name 2", "Last Name 2", "Description 2"),
TestData(3, "First Name 3", "Last Name 3", "Description 3")
).toDF()
dataTestDF.show(false)
// +---+------------+-----------+-------------+
// |id |firstName |lastName |descr |
// +---+------------+-----------+-------------+
// |1 |First Name 1|Last Name 1|Description 1|
// |2 |First Name 2|Last Name 2|Description 2|
// |3 |First Name 3|Last Name 3|Description 3|
// +---+------------+-----------+-------------+
val schemaJson =
"""{ "type" : "struct",
|"fields" : [
|{
| "name" : "id",
| "type" : "integer",
| "nullable" : true,
| "metadata" : { }
| },
| {
| "name" : "firstName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| },
| {
| "name" : "lastName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| }
| ]}""".stripMargin
val schemaSource = schemaJson.mkString
val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
println(schemaFromJson)
// StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true))
val cols: List[String] = schemaFromJson.fieldNames.toList
val col: List[Column] = cols.map(dataTestDF(_))
val df = dataTestDF.select(col: _*)
df.printSchema()
// root
// |-- id: integer (nullable = false)
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
df.show(false)
// +---+------------+-----------+
// |id |firstName |lastName |
// +---+------------+-----------+
// |1 |First Name 1|Last Name 1|
// |2 |First Name 2|Last Name 2|
// |3 |First Name 3|Last Name 3|
// +---+------------+-----------+
}
}
{ 示例:
package spark
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.{col, column, sum}
object DynamicColumnSelection extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class c1(
country: String,
st_buy: Double,
nd_buy: Double
)
case class c2(
country: String,
percent: Double
)
val df1 = Seq(
c1("UA", 2, 4),
c1("PL", 3, 6),
c1("GB", 4, 8)
)
.toDF()
df1.show(false)
// +-------+------+------+
// |country|st_buy|nd_buy|
// +-------+------+------+
// |UA |2.0 |4.0 |
// |PL |3.0 |6.0 |
// |GB |4.0 |8.0 |
// +-------+------+------+
val df2 = Seq(
c2("UA", 2.21),
c2("PL", 3.26)
)
.toDF()
df2.show(false)
// +-------+-------+
// |country|percent|
// +-------+-------+
// |UA |2.21 |
// |PL |3.26 |
// +-------+-------+
// Inner Join
val df = df1.join(df2, df1.col("country") === df2.col("country"), "inner")
.select(
df1.col("country"),
df1.col("st_buy"),
df1.col("nd_buy"),
df2.col("percent")
)
df.show(false)
// +-------+------+------+-------+
// |country|st_buy|nd_buy|percent|
// +-------+------+------+-------+
// |UA |2.0 |4.0 |2.21 |
// |PL |3.0 |6.0 |3.26 |
// +-------+------+------+-------+
val res1DF = df.withColumn("st_buy_percent", 'st_buy/'percent)
.withColumn("nd_buy_percent", 'nd_buy/'percent)
res1DF.show(false)
// +-------+------+------+-------+------------------+------------------+
// |country|st_buy|nd_buy|percent|st_buy_percent |nd_buy_percent |
// +-------+------+------+-------+------------------+------------------+
// |UA |2.0 |4.0 |2.21 |0.9049773755656109|1.8099547511312217|
// |PL |3.0 |6.0 |3.26 |0.9202453987730062|1.8404907975460123|
// +-------+------+------+-------+------------------+------------------+
// GroupBy + sum
val data = Seq(
c1("UA", 2, 4),
c1("PL", 3, 6),
c1("UA", 5, 10),
c1("PL", 6, 12),
c1("GB", 4, 8)
)
.toDF()
val resGroupByDF = data
.groupBy("country")
.agg(sum("st_buy").alias("sum_st_buy")
,sum("nd_buy").alias("sum_nd_buy"))
resGroupByDF.show(false)
// +-------+----------+----------+
// |country|sum_st_buy|sum_nd_buy|
// +-------+----------+----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+----------+----------+
val resGroupByDF1 = data.groupBy($"country").sum()
resGroupByDF1.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
val exprs = data.columns.map(sum(_))
val resGroupByDF2 = data.groupBy($"country").agg(exprs.head, exprs.tail: _*)
resGroupByDF2.show(false)
// +-------+------------+-----------+-----------+
// |country|sum(country)|sum(st_buy)|sum(nd_buy)|
// +-------+------------+-----------+-----------+
// |UA |null |7.0 |14.0 |
// |PL |null |9.0 |18.0 |
// |GB |null |4.0 |8.0 |
// +-------+------------+-----------+-----------+
val exprs3 = List("st_buy", "nd_buy").map(sum(_))
val resGroupByDF3 = data.groupBy($"country").agg(exprs3.head, exprs3.tail: _*)
resGroupByDF3.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
val exprs4 = data.columns.toList.filter(_ != "country").map(sum(_))
val resGroupByDF4 = data.groupBy($"country").agg(exprs4.head, exprs4.tail: _*)
resGroupByDF4.show(false)
// +-------+-----------+-----------+
// |country|sum(st_buy)|sum(nd_buy)|
// +-------+-----------+-----------+
// |UA |7.0 |14.0 |
// |PL |9.0 |18.0 |
// |GB |4.0 |8.0 |
// +-------+-----------+-----------+
// Select
val cols = data.columns.toSeq
val selectDF1 = data.select(cols.head, cols.tail:_*)
selectDF1.show(false)
// +-------+------+------+
// |country|st_buy|nd_buy|
// +-------+------+------+
// |UA |2.0 |4.0 |
// |PL |3.0 |6.0 |
// |UA |5.0 |10.0 |
// |PL |6.0 |12.0 |
// |GB |4.0 |8.0 |
// +-------+------+------+
}
}