Scala Dataframe/SQL:报告的动态列选择

Scala Dataframe/SQL: Dynamic Column Selection for Reporting

上下文

我们有一个有 5 个流的模型。

问题

方法:

问题:

无论如何,我们可以处理 Scala/SparkSQL 中的问题(SQL 中的动态列选择)吗? 注意:s3 文件始终只包含每个 run.Issue 中的必填字段,而不是从 s3 中读取动态字段(这将由数据框自动处理,问题是我们如何使 SQL(saprkSQL/API) 处理这个的代码)

Example/Scenario-1:

--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
       df1.percent/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country
****how can we handle this part of new field 2nd_buy dynamically****

--Sample SQL code (need to convert to sparkSQL/dataframe API)
SELECT df1.1st_buy,
       df1.2nd_buy,
       df1.1st_buy/df2.percent -- DERIVED FIELD
       df1.2nd_buy/df2.percent -- DERIVED FIELD
FROM df1,df2
WHERE df1.country=df2.country

Example/Scenario-2:

--Sample SQL
SELECT country,sum(df1.1st_buy)
FROM df1
GROUP BY country

--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy").show()


****how can we handle this part of new field 2nd_buy dynamically****

--Sample SQL
SELECT country,sum(df1.1st_buy),sum(2nd_buy)
FROM df1
GROUP BY country

--Dataframe API/SparkSQL
df1.groupBy("country").sum("1st_buy","2nd_buy").show() 
{
  1.
  val sqlScript = "select col1, col2, .... from ... "
  // string we can create dynamic
  val df = spark.sql(sqlScript)
 2. try use schema =  = StructType(Seq(
  StructField("id",LongType,true),
  ....
  )) 
  // and then use schema.fieldsName... or
  val cols: List[Columns] = ...
  // in df.select(cols:_*)
 3. get schema (list fields with json file)
     package spark

import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}

import scala.io.Source

object DFFieldsWithJson extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class TestData (
    id:         Int,
    firstName:  String,
    lastName:   String,
    descr:      String
  )

  val dataTestDF = Seq(
    TestData(1, "First Name 1", "Last Name 1", "Description 1"),
    TestData(2, "First Name 2", "Last Name 2", "Description 2"),
    TestData(3, "First Name 3", "Last Name 3", "Description 3")
  ).toDF()

  dataTestDF.show(false)
//  +---+------------+-----------+-------------+
//  |id |firstName   |lastName   |descr        |
//  +---+------------+-----------+-------------+
//  |1  |First Name 1|Last Name 1|Description 1|
//  |2  |First Name 2|Last Name 2|Description 2|
//  |3  |First Name 3|Last Name 3|Description 3|
//  +---+------------+-----------+-------------+

  val schemaJson =
    """{ "type" : "struct",
      |"fields" : [
      |{
      |    "name" : "id",
      |    "type" : "integer",
      |    "nullable" : true,
      |    "metadata" : { }
      |  },
      |  {
      |    "name" : "firstName",
      |    "type" : "string",
      |    "nullable" : true,
      |    "metadata" : {}
      |  },
      |  {
      |    "name" : "lastName",
      |    "type" : "string",
      |    "nullable" : true,
      |    "metadata" : {}
      |  }
      |  ]}""".stripMargin

  val schemaSource = schemaJson.mkString
  val schemaFromJson =   DataType.fromJson(schemaSource).asInstanceOf[StructType]

  println(schemaFromJson)
//  StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true))


  val cols: List[String] = schemaFromJson.fieldNames.toList
  val col: List[Column] = cols.map(dataTestDF(_))
  val df = dataTestDF.select(col: _*)


  df.printSchema()

//  root
//  |-- id: integer (nullable = false)
//  |-- firstName: string (nullable = true)
//  |-- lastName: string (nullable = true)

  df.show(false)
//  +---+------------+-----------+
//  |id |firstName   |lastName   |
//  +---+------------+-----------+
//  |1  |First Name 1|Last Name 1|
//  |2  |First Name 2|Last Name 2|
//  |3  |First Name 3|Last Name 3|
//  +---+------------+-----------+
}



}

{ 示例:

package spark

import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.{col, column, sum}

object DynamicColumnSelection extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  case class c1(
     country: String,
     st_buy: Double,
     nd_buy: Double
  )

  case class c2(
     country: String,
     percent: Double
  )

val df1 = Seq(
  c1("UA", 2, 4),
  c1("PL", 3, 6),
  c1("GB", 4, 8)
  )
  .toDF()

  df1.show(false)
//  +-------+------+------+
//  |country|st_buy|nd_buy|
//  +-------+------+------+
//  |UA     |2.0   |4.0   |
//  |PL     |3.0   |6.0   |
//  |GB     |4.0   |8.0   |
//  +-------+------+------+

  val df2 = Seq(
    c2("UA", 2.21),
    c2("PL", 3.26)
  )
    .toDF()
  df2.show(false)
//  +-------+-------+
//  |country|percent|
//  +-------+-------+
//  |UA     |2.21   |
//  |PL     |3.26   |
//  +-------+-------+


  // Inner Join
  val df = df1.join(df2, df1.col("country") === df2.col("country"), "inner")
      .select(
        df1.col("country"),
        df1.col("st_buy"),
        df1.col("nd_buy"),
        df2.col("percent")
      )
  df.show(false)
//  +-------+------+------+-------+
//  |country|st_buy|nd_buy|percent|
//  +-------+------+------+-------+
//  |UA     |2.0   |4.0   |2.21   |
//  |PL     |3.0   |6.0   |3.26   |
//  +-------+------+------+-------+


  val res1DF = df.withColumn("st_buy_percent", 'st_buy/'percent)
    .withColumn("nd_buy_percent", 'nd_buy/'percent)

  res1DF.show(false)
//  +-------+------+------+-------+------------------+------------------+
//  |country|st_buy|nd_buy|percent|st_buy_percent    |nd_buy_percent    |
//  +-------+------+------+-------+------------------+------------------+
//  |UA     |2.0   |4.0   |2.21   |0.9049773755656109|1.8099547511312217|
//  |PL     |3.0   |6.0   |3.26   |0.9202453987730062|1.8404907975460123|
//  +-------+------+------+-------+------------------+------------------+


  // GroupBy + sum
  val data = Seq(
    c1("UA", 2, 4),
    c1("PL", 3, 6),
    c1("UA", 5, 10),
    c1("PL", 6, 12),
    c1("GB", 4, 8)
  )
    .toDF()

  val resGroupByDF = data
    .groupBy("country")
    .agg(sum("st_buy").alias("sum_st_buy")
    ,sum("nd_buy").alias("sum_nd_buy"))

  resGroupByDF.show(false)
//  +-------+----------+----------+
//  |country|sum_st_buy|sum_nd_buy|
//  +-------+----------+----------+
//  |UA     |7.0       |14.0      |
//  |PL     |9.0       |18.0      |
//  |GB     |4.0       |8.0       |
//  +-------+----------+----------+


  val resGroupByDF1 = data.groupBy($"country").sum()
  resGroupByDF1.show(false)
//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+


  val exprs = data.columns.map(sum(_))
  val resGroupByDF2 = data.groupBy($"country").agg(exprs.head, exprs.tail: _*)
  resGroupByDF2.show(false)
//  +-------+------------+-----------+-----------+
//  |country|sum(country)|sum(st_buy)|sum(nd_buy)|
//  +-------+------------+-----------+-----------+
//  |UA     |null        |7.0        |14.0       |
//  |PL     |null        |9.0        |18.0       |
//  |GB     |null        |4.0        |8.0        |
//  +-------+------------+-----------+-----------+

  val exprs3 = List("st_buy", "nd_buy").map(sum(_))
  val resGroupByDF3 = data.groupBy($"country").agg(exprs3.head, exprs3.tail: _*)
  resGroupByDF3.show(false)
//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+


  val exprs4 = data.columns.toList.filter(_ != "country").map(sum(_))
  val resGroupByDF4 = data.groupBy($"country").agg(exprs4.head, exprs4.tail: _*)
  resGroupByDF4.show(false)

//  +-------+-----------+-----------+
//  |country|sum(st_buy)|sum(nd_buy)|
//  +-------+-----------+-----------+
//  |UA     |7.0        |14.0       |
//  |PL     |9.0        |18.0       |
//  |GB     |4.0        |8.0        |
//  +-------+-----------+-----------+

  // Select
  val cols = data.columns.toSeq
  val selectDF1 = data.select(cols.head, cols.tail:_*)
  selectDF1.show(false)
//  +-------+------+------+
//  |country|st_buy|nd_buy|
//  +-------+------+------+
//  |UA     |2.0   |4.0   |
//  |PL     |3.0   |6.0   |
//  |UA     |5.0   |10.0  |
//  |PL     |6.0   |12.0  |
//  |GB     |4.0   |8.0   |
//  +-------+------+------+
}

}