在spark Scala的新行中添加两个日期之间的所有日期(周)

Add all the dates (week) between two dates in new Row in spark Scala

转换 spark 数据帧

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|Jhon|4/6/2018 |  100 |
|Jhon|4/6/2018 |  200 |
+----+---------+------+
|Jhon|4/13/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  500|
+----+---------+------+
|Lee |5/4/2018 |  100 |
+----+---------+------+
|Lee |4/4/2018 |  200 |
+----+---------+------+
|Lee |5/4/2018 |  300 |
+----+---------+------+
|Lee |4/11/2018 |  700|
+----+---------+------+

到预期的数据帧:

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|Jhon|4/6/2018 |  100 |
|Jhon|4/6/2018 |  200 |
+----+---------+------+
|Jhon|4/13/2018|   100|
+----+---------+------+
|Jhon|4/13/2018|   200|
+----+---------+------+
|Jhon|4/13/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  100 |
+----+---------+------+
|Jhon|4/20/2018 |  200|
+----+---------+------+
|Jhon|4/20/2018|   300|
+----+---------+------+
|Jhon|4/20/2018 |  500|
+----+---------+------+
|Lee |5/4/2018 |  100 |
+----+---------+------+
|Lee |5/4/2018 |  200 |
+----+---------+------+
|Lee |5/4/2018 |  300 |
+----+---------+------|
|Lee |5/11/2018 |  100|
+----+---------+------+
|Lee |4/11/2018 |  200|
+----+---------+------+
|Lee |5/11/2018 |  300|
+----+---------+------+
|Lee |4/11/2018 |  700|
+----+---------+------+

所以这里 300 是 04/13/2018 的新值,来自 04/06/2018 的 100,200 也将显示 04/13/2018,对于不同名称的下周五日期也是如此。我们有什么办法可以在 Spark Scala 中做到这一点吗? 任何帮助将不胜感激。

我的代码仅适用于名称 'John' 和 foFridayfriday 日期 '4/6/2018'4/13/2018

def main(args: Array[String]){
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlc = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    var df1 = sqlc.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .load("oldRecords.csv")
    df1.show(false)
    println("---- df1 row count ----"+df1.count())
    if(df1.count()>0){
      for (i <- 0 until (df1.count().toInt)-1) {
        var df2 = df1.unionAll(df1)//.union(df1)//df3
        //df2.show(false)
        var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
        var df3 = df2.withColumn("previousAmount",  lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
        // df3.show(false)
        var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
        //df4.show(false)
        var df5 = df4.select("name","amount","newdate").distinct() 
        println("-----------"+df5.show(false))
        df1 = df5.withColumnRenamed("newdate", "date")
      }
    }
    }

根据您的问题,如果您尝试将所有 week 添加到 name 的最高日期。这是您可以执行的操作。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.joda.time.LocalDate
// input data 
val dataDF  = Seq(
  ("Jhon", "4/6/2018", 100),
  ("Jhon", "4/6/2018", 200),
  ("Jhon", "4/13/2018", 300),
  ("Jhon", "4/20/2018", 500),
  ("Lee", "5/4/2018", 100),
  ("Lee", "4/4/2018", 200),
  ("Lee", "5/4/2018", 300),
  ("Lee", "4/11/2018", 700)
).toDF("name", "date", "amount")
  .withColumn("date", to_date($"date", "MM/dd/yyyy"))

val window = Window.partitionBy($"name")

//find the maximum date of each name
val df = dataDF.withColumn("maxDate", max($"date").over(window))

创建 UDF 以查找两周之间的所有周

val calculateDate = udf((min: String, max: String) => {
  // to collect all the dates
  val totalDates = scala.collection.mutable.MutableList[LocalDate]()
  var start = LocalDate.parse(min)
  val end = LocalDate.parse(max)
  while ( {
    !start.isAfter(end)
  }) {
    totalDates += start
    start = start.plusWeeks(1)
  }
  totalDates.map(_.toString("MM/dd/yyyy"))
})

现在应用 UDFexplodeUDF

获得的 array
val finalDf = df.withColumn("date", explode(calculateDate($"date", $"maxDate")))
                .drop("maxDate")

输出:

+----+----------+------+
|name|date      |amount|
+----+----------+------+
|Jhon|04/06/2018|100   |
|Jhon|04/13/2018|100   |
|Jhon|04/20/2018|100   |
|Jhon|04/06/2018|200   |
|Jhon|04/13/2018|200   |
|Jhon|04/20/2018|200   |
|Jhon|04/13/2018|300   |
|Jhon|04/20/2018|300   |
|Jhon|04/20/2018|500   |
|Lee |05/04/2018|100   |
|Lee |04/04/2018|200   |
|Lee |04/11/2018|200   |
|Lee |04/18/2018|200   |
|Lee |04/25/2018|200   |
|Lee |05/02/2018|200   |
|Lee |05/04/2018|300   |
|Lee |04/11/2018|700   |
|Lee |04/18/2018|700   |
|Lee |04/25/2018|700   |
|Lee |05/02/2018|700   |
+----+----------+------+

希望对您有所帮助!

package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.to_date
import org.joda.time.LocalDate
object appendPreRowGeneral4 {
   def main(args: Array[String]){
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlc = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    val df1 = sqlc.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .load("oldRecords2.csv")
    println(df1.show(false)+"df1 ---------")
    val df2 = df1.withColumn("date", to_date(unix_timestamp($"date", "MM/dd/yyyy").cast("timestamp")))
    println("df2---"+df2.show(false))
    val window1 = Window.partitionBy($"name")
    val df3 = df2.withColumn("maxDate", max($"date").over(window1))
    println(df3.show(false)+"df3 ---------")
    val df4 = df3.withColumn("newdate1", findDate($"date", $"maxDate")).drop("date")
    println("df4---"+df4.show(false))
    val df5 = df4.withColumn("date", explode($"newdate1"))
    println("df5 -----"+df5.show(false))
    val df6 = df5.drop("maxDate","newdate1")
    println("df6 -----"+df6.show(false))
    val df7 = df6.alias("a").join(df2.alias("b"),$"a.date" === $"b.date","left_outer")
              .select($"a.name",$"a.amount",$"a.date" , ($"b.name").alias("rt_name"),($"b.amount").alias("rt_amount"),($"b.date").alias("rt_date"))
    println("df7----"+df7.show(false))
    val df8 = df7.filter(df7.col("rt_date").isNotNull).select($"name", $"date", $"amount").distinct().orderBy($"name", $"date")
    println("df8----"+df8.show(false))
    val df9 = df8.withColumn("date",from_unixtime(unix_timestamp($"date", "yyyy-mm-dd"), "mm/dd/yyyy"))
    println("df9 ---"+df9.show(df9.count().toInt,false))
    println("total count --->"+df9.count())
   }

    val findDate = udf((first: String, last: String) => {
      // to collect all the dates
      val arrayDates = scala.collection.mutable.MutableList[LocalDate]()
      var mindate = LocalDate.parse(first)
      println("mindate -----"+mindate)
      val enddate = LocalDate.parse(last)
      println("enddate -----"+enddate)
      println("boolean ----"+mindate.isAfter(enddate))
      while ( {
        !mindate.isAfter(enddate)
      }) {
        arrayDates += mindate
        println("arrayDates --->"+arrayDates)
        mindate = mindate.plusWeeks(1)
        println("mindate inside ---"+mindate)
        //start.plusMonths(1)
      }
      arrayDates.map(_.toString())//arrayDates.map(_.toString("MM/dd/yyyy"))
  })
  /**val convertDateUDF = udf((indate: String) => {
    var ret = indate
    s"${ret}"
  })*/
}