pyspark sql 为展开的行添加不同的 Qtr start_date、End_date
pyspark sql Add different Qtr start_date, End_date for exploded rows
我有一个包含 start_date、end_date、sales_target 的数据框。我添加了代码来识别日期范围之间的季度数,因此能够使用一些 UDF 将 sales_target 拆分为多个季度。
df = sqlContext.createDataFrame([("2020-01-01","2020-12-31","15"),("2020-04-01","2020-12-31","11"),("2020-07-01","2020-12-31","3")], ["start_date","end_date","sales_target"])
+----------+----------+------------+
|start_date| end_date |sales_target|
+----------+----------+------------+
|2020-01-01|2020-12-31| 15|
|2020-04-01|2020-12-31| 11|
|2020-07-01|2020-12-31| 3|
+----------+----------+------------+
以下是计算季度数并使用 UDF 函数拆分 sales_target 后的数据帧。
spark.sql('select *, round(months_between(end_date, start_date)/3) as noq from df_temp').createOrReplaceTempView("df_temp")
spark.sql("select *, st_udf(cast(sales_target as integer), cast(noq as integer)) as sales_target from df_temp").createOrReplaceTempView("df_temp")
+----------+----------+--------+---------------+
|start_date| end_date |num_qtrs|sales_target_n |
+----------+----------+--------+---------------+
|2020-01-01|2020-12-31| 4| [4,4,4,3] |
|2020-04-01|2020-12-31| 3| [4,4,3] |
|2020-07-01|2020-12-31| 2| [2,1] |
+----------+----------+--------+---------------+
展开sales_target后,得到如下结果:
+----------+----------+--------+-------------+---------------+------------------+
|start_date| end_date |num_qtrs|sales_target |sales_target_n | sales_target_new |
+----------+----------+--------+-------------+---------------+------------------+
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 3 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 4 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 4 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 3 |
|2020-07-01|2020-12-31| 2| 3 | [2,1] | 2 |
|2020-07-01|2020-12-31| 2| 3 | [2,1] | 1 |
+----------+----------+--------+-------------+---------------+------------------+
我需要帮助根据 num_qtrs 值为每一行添加不同的 start/end 日期。我需要获取如下数据框。
+----------+----------+--------+-------------+------------------+--------------+--------------+
|start_date| end_date |num_qtrs|sales_target | sales_target_new |new_start_date| new_end_date |
+----------+----------+--------+-------------+------------------+--------------+--------------+
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-01-01 |2020-03-31 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-04-01 |2020-06-30 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-07-01 |2020-09-30 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 3 |2020-10-01 |2020-12-31 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 4 |2020-04-01 |2020-06-30 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 4 |2020-07-01 |2020-09-30 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 3 |2020-10-01 |2020-12-31 |
|2020-07-01|2020-12-31| 2| [2,1] | 2 |2020-07-01 |2020-09-30 |
|2020-07-01|2020-12-31| 2| [2,1] | 1 |2020-10-01 |2020-12-31 |
+----------+----------+--------+-------------+------------------+--------------+--------------+
有人可以帮助我使用 pyspark 代码示例来实现上述预期结果吗?
序列错误更新:
谢谢
试试这个-
需要start_date
和end_date
来计算new_start_date
和new_end_date
加载提供的测试数据
val data =
"""
|start_date| end_date |sales_target
|2020-01-01|2020-12-31| 15
|2020-04-01|2020-12-31| 11
|2020-07-01|2020-12-31| 3
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-------------------+-------------------+------------+
* |start_date |end_date |sales_target|
* +-------------------+-------------------+------------+
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |
* +-------------------+-------------------+------------+
*
* root
* |-- start_date: timestamp (nullable = true)
* |-- end_date: timestamp (nullable = true)
* |-- sales_target: integer (nullable = true)
*/
计算new_start_date
和new_end_date
val processedDF = df.withColumn("new_start_date", explode(sequence(to_date($"start_date"), to_date($"end_date"),
expr("interval 3 month"))))
.withColumn("new_end_date",
date_sub(coalesce(lead("new_start_date", 1)
.over(Window.partitionBy("start_date").orderBy("new_start_date")), to_date($"end_date")), 1)
)
processedDF.orderBy("start_date", "new_start_date").show(false)
processedDF.printSchema()
/**
* +-------------------+-------------------+------------+--------------+------------+
* |start_date |end_date |sales_target|new_start_date|new_end_date|
* +-------------------+-------------------+------------+--------------+------------+
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-01-01 |2020-03-31 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-04-01 |2020-06-30 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-07-01 |2020-09-30 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-10-01 |2020-12-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-04-01 |2020-06-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-07-01 |2020-09-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-10-01 |2020-12-30 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |2020-07-01 |2020-09-30 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |2020-10-01 |2020-12-30 |
* +-------------------+-------------------+------------+--------------+------------+
*
* root
* |-- start_date: timestamp (nullable = true)
* |-- end_date: timestamp (nullable = true)
* |-- sales_target: integer (nullable = true)
* |-- new_start_date: date (nullable = false)
* |-- new_end_date: date (nullable = true)
*/
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object Qvartal extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val dataDF = Seq(
("2020-01-01", "2020-12-31", 4, List(4,4,4,3)),
("2020-04-01", "2020-12-31", 3, List(4,4,3)),
("2020-07-01", "2020-12-31", 2, List(2,1))
).toDF("start_date", "end_date", "num_qtrs", "sales_target_n")
val listStartEnd = udf((b: String, e: String) => {
List("2020-01-01", "2020-04-01", "2020-07-01", "2020-10-01").filter(i => i >= b && i <= e)
.zip(List("2020-03-31", "2020-06-30", "2020-09-30", "2020-12-31").filter(i => i >= b && i <= e))
})
val resDF = dataDF.withColumn("new_start_end_date", lit(listStartEnd('start_date, 'end_date)))
resDF.show(false)
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
// |start_date|end_date |num_qtrs|sales_target_n|new_start_end_date |
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
// |2020-01-01|2020-12-31|4 |[4, 4, 4, 3] |[[2020-01-01, 2020-03-31], [2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]|
// |2020-04-01|2020-12-31|3 |[4, 4, 3] |[[2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]] |
// |2020-07-01|2020-12-31|2 |[2, 1] |[[2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]] |
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
val r11 = resDF
.withColumn("sales_target_n1", explode('sales_target_n))
.withColumn("monotonically_increasing_id", monotonically_increasing_id())
val r12 = r11.select(
'monotonically_increasing_id,
'start_date,
'end_date,
'num_qtrs,
'sales_target_n1
)
val r21 = resDF
.withColumn("new_start_end_date_1", explode('new_start_end_date))
.withColumn("monotonically_increasing_id", monotonically_increasing_id())
val r22 = r21.select(
'monotonically_increasing_id,
'start_date,
'end_date,
'num_qtrs,
'new_start_end_date_1
)
val resultDF = r12.join(r22,
r22.col("monotonically_increasing_id") === r12.col("monotonically_increasing_id"),
"inner")
.select(
r12.col("start_date"),
r12.col("end_date"),
r12.col("num_qtrs"),
r12.col("sales_target_n1").alias("sales_target_n"),
r22.col("new_start_end_date_1")
)
.withColumn("new_start_date", col("new_start_end_date_1").getItem("_1"))
.withColumn("new_end_date", col("new_start_end_date_1").getItem("_2"))
.drop("new_start_end_date_1")
resultDF.show(false)
// +----------+----------+--------+--------------+--------------+------------+
// |start_date|end_date |num_qtrs|sales_target_n|new_start_date|new_end_date|
// +----------+----------+--------+--------------+--------------+------------+
// |2020-01-01|2020-12-31|4 |4 |2020-01-01 |2020-03-31 |
// |2020-01-01|2020-12-31|4 |4 |2020-04-01 |2020-06-30 |
// |2020-01-01|2020-12-31|4 |4 |2020-07-01 |2020-09-30 |
// |2020-01-01|2020-12-31|4 |3 |2020-10-01 |2020-12-31 |
// |2020-04-01|2020-12-31|3 |4 |2020-04-01 |2020-06-30 |
// |2020-04-01|2020-12-31|3 |4 |2020-07-01 |2020-09-30 |
// |2020-04-01|2020-12-31|3 |3 |2020-10-01 |2020-12-31 |
// |2020-07-01|2020-12-31|2 |2 |2020-07-01 |2020-09-30 |
// |2020-07-01|2020-12-31|2 |1 |2020-10-01 |2020-12-31 |
// +----------+----------+--------+--------------+--------------+------------+
}
在应用您的 UDF 后考虑将以下作为您的输入数据框。
输入:
+----------+----------+--------+--------------+
|start_date| end_date|num_qtrs|sales_target_n|
+----------+----------+--------+--------------+
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]|
|2020-04-01|2020-12-31| 3| [4, 4, 3]|
|2020-07-01|2020-12-31| 2| [2, 1]|
+----------+----------+--------+--------------+
您可以使用 row_number
、add_months
和 date_add
的组合来获得您想要的输出,如下所示,
from pyspark.sql.functions import explode, row_number, expr
from pyspark.sql import Window
window = Window.partitionBy('start_date').orderBy(desc("sales_target_new"))
df.withColumn('sales_target_new', explode('sales_target_n')).\
withColumn('row_num', row_number().over(window)).\
withColumn('new_start_date', expr("add_months(start_date, (row_num-1) * 3)")).\
withColumn('new_end_date', expr("add_months(date_add(start_date, -1), row_num * 3)")).\
orderBy('start_date', 'row_num').show()
输出:
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|start_date| end_date|num_qtrs|sales_target_n|sales_target_new|row_num|new_start_date|new_end_date|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 1| 2020-01-01| 2020-03-31|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 2| 2020-04-01| 2020-06-30|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 3| 2020-07-01| 2020-09-30|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 3| 4| 2020-10-01| 2020-12-31|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 4| 1| 2020-04-01| 2020-06-30|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 4| 2| 2020-07-01| 2020-09-30|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 3| 3| 2020-10-01| 2020-12-31|
|2020-07-01|2020-12-31| 2| [2, 1]| 2| 1| 2020-07-01| 2020-09-30|
|2020-07-01|2020-12-31| 2| [2, 1]| 1| 2| 2020-10-01| 2020-12-31|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
您可以根据需要修改window
。
我有一个包含 start_date、end_date、sales_target 的数据框。我添加了代码来识别日期范围之间的季度数,因此能够使用一些 UDF 将 sales_target 拆分为多个季度。
df = sqlContext.createDataFrame([("2020-01-01","2020-12-31","15"),("2020-04-01","2020-12-31","11"),("2020-07-01","2020-12-31","3")], ["start_date","end_date","sales_target"])
+----------+----------+------------+
|start_date| end_date |sales_target|
+----------+----------+------------+
|2020-01-01|2020-12-31| 15|
|2020-04-01|2020-12-31| 11|
|2020-07-01|2020-12-31| 3|
+----------+----------+------------+
以下是计算季度数并使用 UDF 函数拆分 sales_target 后的数据帧。
spark.sql('select *, round(months_between(end_date, start_date)/3) as noq from df_temp').createOrReplaceTempView("df_temp")
spark.sql("select *, st_udf(cast(sales_target as integer), cast(noq as integer)) as sales_target from df_temp").createOrReplaceTempView("df_temp")
+----------+----------+--------+---------------+
|start_date| end_date |num_qtrs|sales_target_n |
+----------+----------+--------+---------------+
|2020-01-01|2020-12-31| 4| [4,4,4,3] |
|2020-04-01|2020-12-31| 3| [4,4,3] |
|2020-07-01|2020-12-31| 2| [2,1] |
+----------+----------+--------+---------------+
展开sales_target后,得到如下结果:
+----------+----------+--------+-------------+---------------+------------------+
|start_date| end_date |num_qtrs|sales_target |sales_target_n | sales_target_new |
+----------+----------+--------+-------------+---------------+------------------+
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 4 |
|2020-01-01|2020-12-31| 4| 15 | [4,4,4,3] | 3 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 4 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 4 |
|2020-04-01|2020-12-31| 3| 11 | [4,4,3] | 3 |
|2020-07-01|2020-12-31| 2| 3 | [2,1] | 2 |
|2020-07-01|2020-12-31| 2| 3 | [2,1] | 1 |
+----------+----------+--------+-------------+---------------+------------------+
我需要帮助根据 num_qtrs 值为每一行添加不同的 start/end 日期。我需要获取如下数据框。
+----------+----------+--------+-------------+------------------+--------------+--------------+
|start_date| end_date |num_qtrs|sales_target | sales_target_new |new_start_date| new_end_date |
+----------+----------+--------+-------------+------------------+--------------+--------------+
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-01-01 |2020-03-31 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-04-01 |2020-06-30 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 4 |2020-07-01 |2020-09-30 |
|2020-01-01|2020-12-31| 4| [4,4,4,3] | 3 |2020-10-01 |2020-12-31 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 4 |2020-04-01 |2020-06-30 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 4 |2020-07-01 |2020-09-30 |
|2020-04-01|2020-12-31| 3| [4,4,3] | 3 |2020-10-01 |2020-12-31 |
|2020-07-01|2020-12-31| 2| [2,1] | 2 |2020-07-01 |2020-09-30 |
|2020-07-01|2020-12-31| 2| [2,1] | 1 |2020-10-01 |2020-12-31 |
+----------+----------+--------+-------------+------------------+--------------+--------------+
有人可以帮助我使用 pyspark 代码示例来实现上述预期结果吗?
序列错误更新:
试试这个-
需要start_date
和end_date
来计算new_start_date
和new_end_date
加载提供的测试数据
val data =
"""
|start_date| end_date |sales_target
|2020-01-01|2020-12-31| 15
|2020-04-01|2020-12-31| 11
|2020-07-01|2020-12-31| 3
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-------------------+-------------------+------------+
* |start_date |end_date |sales_target|
* +-------------------+-------------------+------------+
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |
* +-------------------+-------------------+------------+
*
* root
* |-- start_date: timestamp (nullable = true)
* |-- end_date: timestamp (nullable = true)
* |-- sales_target: integer (nullable = true)
*/
计算new_start_date
和new_end_date
val processedDF = df.withColumn("new_start_date", explode(sequence(to_date($"start_date"), to_date($"end_date"),
expr("interval 3 month"))))
.withColumn("new_end_date",
date_sub(coalesce(lead("new_start_date", 1)
.over(Window.partitionBy("start_date").orderBy("new_start_date")), to_date($"end_date")), 1)
)
processedDF.orderBy("start_date", "new_start_date").show(false)
processedDF.printSchema()
/**
* +-------------------+-------------------+------------+--------------+------------+
* |start_date |end_date |sales_target|new_start_date|new_end_date|
* +-------------------+-------------------+------------+--------------+------------+
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-01-01 |2020-03-31 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-04-01 |2020-06-30 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-07-01 |2020-09-30 |
* |2020-01-01 00:00:00|2020-12-31 00:00:00|15 |2020-10-01 |2020-12-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-04-01 |2020-06-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-07-01 |2020-09-30 |
* |2020-04-01 00:00:00|2020-12-31 00:00:00|11 |2020-10-01 |2020-12-30 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |2020-07-01 |2020-09-30 |
* |2020-07-01 00:00:00|2020-12-31 00:00:00|3 |2020-10-01 |2020-12-30 |
* +-------------------+-------------------+------------+--------------+------------+
*
* root
* |-- start_date: timestamp (nullable = true)
* |-- end_date: timestamp (nullable = true)
* |-- sales_target: integer (nullable = true)
* |-- new_start_date: date (nullable = false)
* |-- new_end_date: date (nullable = true)
*/
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object Qvartal extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val dataDF = Seq(
("2020-01-01", "2020-12-31", 4, List(4,4,4,3)),
("2020-04-01", "2020-12-31", 3, List(4,4,3)),
("2020-07-01", "2020-12-31", 2, List(2,1))
).toDF("start_date", "end_date", "num_qtrs", "sales_target_n")
val listStartEnd = udf((b: String, e: String) => {
List("2020-01-01", "2020-04-01", "2020-07-01", "2020-10-01").filter(i => i >= b && i <= e)
.zip(List("2020-03-31", "2020-06-30", "2020-09-30", "2020-12-31").filter(i => i >= b && i <= e))
})
val resDF = dataDF.withColumn("new_start_end_date", lit(listStartEnd('start_date, 'end_date)))
resDF.show(false)
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
// |start_date|end_date |num_qtrs|sales_target_n|new_start_end_date |
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
// |2020-01-01|2020-12-31|4 |[4, 4, 4, 3] |[[2020-01-01, 2020-03-31], [2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]]|
// |2020-04-01|2020-12-31|3 |[4, 4, 3] |[[2020-04-01, 2020-06-30], [2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]] |
// |2020-07-01|2020-12-31|2 |[2, 1] |[[2020-07-01, 2020-09-30], [2020-10-01, 2020-12-31]] |
// +----------+----------+--------+--------------+--------------------------------------------------------------------------------------------------------+
val r11 = resDF
.withColumn("sales_target_n1", explode('sales_target_n))
.withColumn("monotonically_increasing_id", monotonically_increasing_id())
val r12 = r11.select(
'monotonically_increasing_id,
'start_date,
'end_date,
'num_qtrs,
'sales_target_n1
)
val r21 = resDF
.withColumn("new_start_end_date_1", explode('new_start_end_date))
.withColumn("monotonically_increasing_id", monotonically_increasing_id())
val r22 = r21.select(
'monotonically_increasing_id,
'start_date,
'end_date,
'num_qtrs,
'new_start_end_date_1
)
val resultDF = r12.join(r22,
r22.col("monotonically_increasing_id") === r12.col("monotonically_increasing_id"),
"inner")
.select(
r12.col("start_date"),
r12.col("end_date"),
r12.col("num_qtrs"),
r12.col("sales_target_n1").alias("sales_target_n"),
r22.col("new_start_end_date_1")
)
.withColumn("new_start_date", col("new_start_end_date_1").getItem("_1"))
.withColumn("new_end_date", col("new_start_end_date_1").getItem("_2"))
.drop("new_start_end_date_1")
resultDF.show(false)
// +----------+----------+--------+--------------+--------------+------------+
// |start_date|end_date |num_qtrs|sales_target_n|new_start_date|new_end_date|
// +----------+----------+--------+--------------+--------------+------------+
// |2020-01-01|2020-12-31|4 |4 |2020-01-01 |2020-03-31 |
// |2020-01-01|2020-12-31|4 |4 |2020-04-01 |2020-06-30 |
// |2020-01-01|2020-12-31|4 |4 |2020-07-01 |2020-09-30 |
// |2020-01-01|2020-12-31|4 |3 |2020-10-01 |2020-12-31 |
// |2020-04-01|2020-12-31|3 |4 |2020-04-01 |2020-06-30 |
// |2020-04-01|2020-12-31|3 |4 |2020-07-01 |2020-09-30 |
// |2020-04-01|2020-12-31|3 |3 |2020-10-01 |2020-12-31 |
// |2020-07-01|2020-12-31|2 |2 |2020-07-01 |2020-09-30 |
// |2020-07-01|2020-12-31|2 |1 |2020-10-01 |2020-12-31 |
// +----------+----------+--------+--------------+--------------+------------+
}
在应用您的 UDF 后考虑将以下作为您的输入数据框。
输入:
+----------+----------+--------+--------------+
|start_date| end_date|num_qtrs|sales_target_n|
+----------+----------+--------+--------------+
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]|
|2020-04-01|2020-12-31| 3| [4, 4, 3]|
|2020-07-01|2020-12-31| 2| [2, 1]|
+----------+----------+--------+--------------+
您可以使用 row_number
、add_months
和 date_add
的组合来获得您想要的输出,如下所示,
from pyspark.sql.functions import explode, row_number, expr
from pyspark.sql import Window
window = Window.partitionBy('start_date').orderBy(desc("sales_target_new"))
df.withColumn('sales_target_new', explode('sales_target_n')).\
withColumn('row_num', row_number().over(window)).\
withColumn('new_start_date', expr("add_months(start_date, (row_num-1) * 3)")).\
withColumn('new_end_date', expr("add_months(date_add(start_date, -1), row_num * 3)")).\
orderBy('start_date', 'row_num').show()
输出:
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|start_date| end_date|num_qtrs|sales_target_n|sales_target_new|row_num|new_start_date|new_end_date|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 1| 2020-01-01| 2020-03-31|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 2| 2020-04-01| 2020-06-30|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 4| 3| 2020-07-01| 2020-09-30|
|2020-01-01|2020-12-31| 4| [4, 4, 4, 3]| 3| 4| 2020-10-01| 2020-12-31|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 4| 1| 2020-04-01| 2020-06-30|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 4| 2| 2020-07-01| 2020-09-30|
|2020-04-01|2020-12-31| 3| [4, 4, 3]| 3| 3| 2020-10-01| 2020-12-31|
|2020-07-01|2020-12-31| 2| [2, 1]| 2| 1| 2020-07-01| 2020-09-30|
|2020-07-01|2020-12-31| 2| [2, 1]| 1| 2| 2020-10-01| 2020-12-31|
+----------+----------+--------+--------------+----------------+-------+--------------+------------+
您可以根据需要修改window
。