在spark Scala的新行中添加两个日期之间的所有日期(周)
Add all the dates (week) between two dates in new Row in spark Scala
转换 spark 数据帧
+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |4/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+
到预期的数据帧:
+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 100|
+----+---------+------+
|Jhon|4/13/2018| 200|
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 100 |
+----+---------+------+
|Jhon|4/20/2018 | 200|
+----+---------+------+
|Jhon|4/20/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |5/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------|
|Lee |5/11/2018 | 100|
+----+---------+------+
|Lee |4/11/2018 | 200|
+----+---------+------+
|Lee |5/11/2018 | 300|
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+
所以这里 300 是 04/13/2018
的新值,来自 04/06/2018
的 100,200 也将显示 04/13/2018
,对于不同名称的下周五日期也是如此。我们有什么办法可以在 Spark Scala 中做到这一点吗?
任何帮助将不胜感激。
我的代码仅适用于名称 'John' 和 foFridayfriday 日期 '4/6/2018'
和 4/13/2018
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
var df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords.csv")
df1.show(false)
println("---- df1 row count ----"+df1.count())
if(df1.count()>0){
for (i <- 0 until (df1.count().toInt)-1) {
var df2 = df1.unionAll(df1)//.union(df1)//df3
//df2.show(false)
var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
var df3 = df2.withColumn("previousAmount", lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
// df3.show(false)
var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
//df4.show(false)
var df5 = df4.select("name","amount","newdate").distinct()
println("-----------"+df5.show(false))
df1 = df5.withColumnRenamed("newdate", "date")
}
}
}
根据您的问题,如果您尝试将所有 week
添加到 name
的最高日期。这是您可以执行的操作。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.joda.time.LocalDate
// input data
val dataDF = Seq(
("Jhon", "4/6/2018", 100),
("Jhon", "4/6/2018", 200),
("Jhon", "4/13/2018", 300),
("Jhon", "4/20/2018", 500),
("Lee", "5/4/2018", 100),
("Lee", "4/4/2018", 200),
("Lee", "5/4/2018", 300),
("Lee", "4/11/2018", 700)
).toDF("name", "date", "amount")
.withColumn("date", to_date($"date", "MM/dd/yyyy"))
val window = Window.partitionBy($"name")
//find the maximum date of each name
val df = dataDF.withColumn("maxDate", max($"date").over(window))
创建 UDF
以查找两周之间的所有周
val calculateDate = udf((min: String, max: String) => {
// to collect all the dates
val totalDates = scala.collection.mutable.MutableList[LocalDate]()
var start = LocalDate.parse(min)
val end = LocalDate.parse(max)
while ( {
!start.isAfter(end)
}) {
totalDates += start
start = start.plusWeeks(1)
}
totalDates.map(_.toString("MM/dd/yyyy"))
})
现在应用 UDF
和 explode
从 UDF
获得的 array
val finalDf = df.withColumn("date", explode(calculateDate($"date", $"maxDate")))
.drop("maxDate")
输出:
+----+----------+------+
|name|date |amount|
+----+----------+------+
|Jhon|04/06/2018|100 |
|Jhon|04/13/2018|100 |
|Jhon|04/20/2018|100 |
|Jhon|04/06/2018|200 |
|Jhon|04/13/2018|200 |
|Jhon|04/20/2018|200 |
|Jhon|04/13/2018|300 |
|Jhon|04/20/2018|300 |
|Jhon|04/20/2018|500 |
|Lee |05/04/2018|100 |
|Lee |04/04/2018|200 |
|Lee |04/11/2018|200 |
|Lee |04/18/2018|200 |
|Lee |04/25/2018|200 |
|Lee |05/02/2018|200 |
|Lee |05/04/2018|300 |
|Lee |04/11/2018|700 |
|Lee |04/18/2018|700 |
|Lee |04/25/2018|700 |
|Lee |05/02/2018|700 |
+----+----------+------+
希望对您有所帮助!
package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.to_date
import org.joda.time.LocalDate
object appendPreRowGeneral4 {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
val df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords2.csv")
println(df1.show(false)+"df1 ---------")
val df2 = df1.withColumn("date", to_date(unix_timestamp($"date", "MM/dd/yyyy").cast("timestamp")))
println("df2---"+df2.show(false))
val window1 = Window.partitionBy($"name")
val df3 = df2.withColumn("maxDate", max($"date").over(window1))
println(df3.show(false)+"df3 ---------")
val df4 = df3.withColumn("newdate1", findDate($"date", $"maxDate")).drop("date")
println("df4---"+df4.show(false))
val df5 = df4.withColumn("date", explode($"newdate1"))
println("df5 -----"+df5.show(false))
val df6 = df5.drop("maxDate","newdate1")
println("df6 -----"+df6.show(false))
val df7 = df6.alias("a").join(df2.alias("b"),$"a.date" === $"b.date","left_outer")
.select($"a.name",$"a.amount",$"a.date" , ($"b.name").alias("rt_name"),($"b.amount").alias("rt_amount"),($"b.date").alias("rt_date"))
println("df7----"+df7.show(false))
val df8 = df7.filter(df7.col("rt_date").isNotNull).select($"name", $"date", $"amount").distinct().orderBy($"name", $"date")
println("df8----"+df8.show(false))
val df9 = df8.withColumn("date",from_unixtime(unix_timestamp($"date", "yyyy-mm-dd"), "mm/dd/yyyy"))
println("df9 ---"+df9.show(df9.count().toInt,false))
println("total count --->"+df9.count())
}
val findDate = udf((first: String, last: String) => {
// to collect all the dates
val arrayDates = scala.collection.mutable.MutableList[LocalDate]()
var mindate = LocalDate.parse(first)
println("mindate -----"+mindate)
val enddate = LocalDate.parse(last)
println("enddate -----"+enddate)
println("boolean ----"+mindate.isAfter(enddate))
while ( {
!mindate.isAfter(enddate)
}) {
arrayDates += mindate
println("arrayDates --->"+arrayDates)
mindate = mindate.plusWeeks(1)
println("mindate inside ---"+mindate)
//start.plusMonths(1)
}
arrayDates.map(_.toString())//arrayDates.map(_.toString("MM/dd/yyyy"))
})
/**val convertDateUDF = udf((indate: String) => {
var ret = indate
s"${ret}"
})*/
}
转换 spark 数据帧
+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |4/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+
到预期的数据帧:
+----+---------+------+
|name|date |amount|
+----+---------+------+
|Jhon|4/6/2018 | 100 |
|Jhon|4/6/2018 | 200 |
+----+---------+------+
|Jhon|4/13/2018| 100|
+----+---------+------+
|Jhon|4/13/2018| 200|
+----+---------+------+
|Jhon|4/13/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 100 |
+----+---------+------+
|Jhon|4/20/2018 | 200|
+----+---------+------+
|Jhon|4/20/2018| 300|
+----+---------+------+
|Jhon|4/20/2018 | 500|
+----+---------+------+
|Lee |5/4/2018 | 100 |
+----+---------+------+
|Lee |5/4/2018 | 200 |
+----+---------+------+
|Lee |5/4/2018 | 300 |
+----+---------+------|
|Lee |5/11/2018 | 100|
+----+---------+------+
|Lee |4/11/2018 | 200|
+----+---------+------+
|Lee |5/11/2018 | 300|
+----+---------+------+
|Lee |4/11/2018 | 700|
+----+---------+------+
所以这里 300 是 04/13/2018
的新值,来自 04/06/2018
的 100,200 也将显示 04/13/2018
,对于不同名称的下周五日期也是如此。我们有什么办法可以在 Spark Scala 中做到这一点吗?
任何帮助将不胜感激。
我的代码仅适用于名称 'John' 和 foFridayfriday 日期 '4/6/2018'
和 4/13/2018
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
var df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords.csv")
df1.show(false)
println("---- df1 row count ----"+df1.count())
if(df1.count()>0){
for (i <- 0 until (df1.count().toInt)-1) {
var df2 = df1.unionAll(df1)//.union(df1)//df3
//df2.show(false)
var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
var df3 = df2.withColumn("previousAmount", lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
// df3.show(false)
var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
//df4.show(false)
var df5 = df4.select("name","amount","newdate").distinct()
println("-----------"+df5.show(false))
df1 = df5.withColumnRenamed("newdate", "date")
}
}
}
根据您的问题,如果您尝试将所有 week
添加到 name
的最高日期。这是您可以执行的操作。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.joda.time.LocalDate
// input data
val dataDF = Seq(
("Jhon", "4/6/2018", 100),
("Jhon", "4/6/2018", 200),
("Jhon", "4/13/2018", 300),
("Jhon", "4/20/2018", 500),
("Lee", "5/4/2018", 100),
("Lee", "4/4/2018", 200),
("Lee", "5/4/2018", 300),
("Lee", "4/11/2018", 700)
).toDF("name", "date", "amount")
.withColumn("date", to_date($"date", "MM/dd/yyyy"))
val window = Window.partitionBy($"name")
//find the maximum date of each name
val df = dataDF.withColumn("maxDate", max($"date").over(window))
创建 UDF
以查找两周之间的所有周
val calculateDate = udf((min: String, max: String) => {
// to collect all the dates
val totalDates = scala.collection.mutable.MutableList[LocalDate]()
var start = LocalDate.parse(min)
val end = LocalDate.parse(max)
while ( {
!start.isAfter(end)
}) {
totalDates += start
start = start.plusWeeks(1)
}
totalDates.map(_.toString("MM/dd/yyyy"))
})
现在应用 UDF
和 explode
从 UDF
array
val finalDf = df.withColumn("date", explode(calculateDate($"date", $"maxDate")))
.drop("maxDate")
输出:
+----+----------+------+
|name|date |amount|
+----+----------+------+
|Jhon|04/06/2018|100 |
|Jhon|04/13/2018|100 |
|Jhon|04/20/2018|100 |
|Jhon|04/06/2018|200 |
|Jhon|04/13/2018|200 |
|Jhon|04/20/2018|200 |
|Jhon|04/13/2018|300 |
|Jhon|04/20/2018|300 |
|Jhon|04/20/2018|500 |
|Lee |05/04/2018|100 |
|Lee |04/04/2018|200 |
|Lee |04/11/2018|200 |
|Lee |04/18/2018|200 |
|Lee |04/25/2018|200 |
|Lee |05/02/2018|200 |
|Lee |05/04/2018|300 |
|Lee |04/11/2018|700 |
|Lee |04/18/2018|700 |
|Lee |04/25/2018|700 |
|Lee |05/02/2018|700 |
+----+----------+------+
希望对您有所帮助!
package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.to_date
import org.joda.time.LocalDate
object appendPreRowGeneral4 {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
val df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords2.csv")
println(df1.show(false)+"df1 ---------")
val df2 = df1.withColumn("date", to_date(unix_timestamp($"date", "MM/dd/yyyy").cast("timestamp")))
println("df2---"+df2.show(false))
val window1 = Window.partitionBy($"name")
val df3 = df2.withColumn("maxDate", max($"date").over(window1))
println(df3.show(false)+"df3 ---------")
val df4 = df3.withColumn("newdate1", findDate($"date", $"maxDate")).drop("date")
println("df4---"+df4.show(false))
val df5 = df4.withColumn("date", explode($"newdate1"))
println("df5 -----"+df5.show(false))
val df6 = df5.drop("maxDate","newdate1")
println("df6 -----"+df6.show(false))
val df7 = df6.alias("a").join(df2.alias("b"),$"a.date" === $"b.date","left_outer")
.select($"a.name",$"a.amount",$"a.date" , ($"b.name").alias("rt_name"),($"b.amount").alias("rt_amount"),($"b.date").alias("rt_date"))
println("df7----"+df7.show(false))
val df8 = df7.filter(df7.col("rt_date").isNotNull).select($"name", $"date", $"amount").distinct().orderBy($"name", $"date")
println("df8----"+df8.show(false))
val df9 = df8.withColumn("date",from_unixtime(unix_timestamp($"date", "yyyy-mm-dd"), "mm/dd/yyyy"))
println("df9 ---"+df9.show(df9.count().toInt,false))
println("total count --->"+df9.count())
}
val findDate = udf((first: String, last: String) => {
// to collect all the dates
val arrayDates = scala.collection.mutable.MutableList[LocalDate]()
var mindate = LocalDate.parse(first)
println("mindate -----"+mindate)
val enddate = LocalDate.parse(last)
println("enddate -----"+enddate)
println("boolean ----"+mindate.isAfter(enddate))
while ( {
!mindate.isAfter(enddate)
}) {
arrayDates += mindate
println("arrayDates --->"+arrayDates)
mindate = mindate.plusWeeks(1)
println("mindate inside ---"+mindate)
//start.plusMonths(1)
}
arrayDates.map(_.toString())//arrayDates.map(_.toString("MM/dd/yyyy"))
})
/**val convertDateUDF = udf((indate: String) => {
var ret = indate
s"${ret}"
})*/
}