在pyspark中包含两个日期之间获取开始和结束
Fetch start and end between two dates inclusive in pyspark
我一直在尝试获取 2 个给定日期的月份范围,但它没有按预期工作。
例如
- start_date (dd-mm-yyyy) = 12-01-2022
- end_date (dd-mm-yyyy) = 03-06-2022
预期输出:
Valid_From
Valid_To
2022-01-12
2022-01-31
2022-02-01
2022-02-28
2022-03-01
2022-03-31
2022-04-01
2022-04-30
2022-05-01
2022-05-31
2022-06-01
2022-06-03
我的代码:
var_forecast_start_date = datetime.datetime(2022, 1, 12)
var_forecast_end_date = datetime.datetime(2022, 6, 2)
df_datetime = pandas_to_spark(
df_datetime(start=var_forecast_start_date, end=var_forecast_end_date)
)
df_datetime = df_datetime.withColumn(
"DateID", date_format(df_datetime.Date, "yyyyMMdd").cast(IntegerType())
).withColumn("FiscalDate", date_format(df_datetime.Date, "yyyy-MM-dd"))
df_datetime = df_datetime.selectExpr(
"add_months(date_add(last_day(Date),1),-1) AS Valid_From",
"last_day(Date) AS Valid_To",
).distinct()
尝试以下方法:
import findspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.appName("local").getOrCreate()
columns = ["start_date", "end_date"]
data = [("12-01-2022", "03-06-2022")]
df = spark.createDataFrame(data).toDF(*columns)
df = (
df.withColumn(
"start_date", F.to_date(F.col("start_date"), "dd-MM-yyyy").cast("DATE")
)
.withColumn(
"end_date", F.to_date(F.col("end_date"), "dd-MM-yyyy").cast("DATE")
)
.withColumn(
"months_between",
F.round(
F.months_between(F.col("end_date"), F.col("start_date"), True)
).cast("Integer"),
)
.withColumn(
"months_between_seq", F.sequence(F.lit(1), F.col("months_between"))
)
.withColumn("months_between_seq", F.explode(F.col("months_between_seq")))
.withColumn(
"end_of_month",
F.expr(
"""
LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1))
"""
),
)
.withColumn(
"begin_of_month",
F.expr(
"""
LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1)) + 1
"""
),
)
)
start_window_agg = Window.partitionBy().orderBy("Valid_From")
start_union_sdf = (
df.select(
F.col("start_date").alias("Valid_From")
)
.unionByName(
df.select(
F.col("begin_of_month").alias("Valid_From")
)
)
.drop_duplicates()
.withColumn(
"row_number",
F.row_number().over(start_window_agg)
)
)
end_window_agg = Window.partitionBy().orderBy("Valid_To")
end_union_sdf = (
df.select(
F.col("end_date").alias("Valid_To")
)
.unionByName(
df.select(
F.col("end_of_month").alias("Valid_To")
)
)
.drop_duplicates()
.withColumn(
"row_number",
F.row_number().over(end_window_agg)
)
)
join_sdf = (
end_union_sdf
.join(
start_union_sdf,
how="inner",
on=["row_number"]
)
.drop("row_number")
.withColumn("Valid_To", F.col("Valid_To").cast("DATE"))
.withColumn("Valid_From", F.col("Valid_From").cast("DATE"))
.select("Valid_From", "Valid_To")
.orderBy("Valid_From")
)
join_sdf.show()
它returns:
+----------+----------+
|Valid_From| Valid_To|
+----------+----------+
|2022-01-12|2022-01-31|
|2022-02-01|2022-02-28|
|2022-03-01|2022-03-31|
|2022-04-01|2022-04-30|
|2022-05-01|2022-05-31|
|2022-06-01|2022-06-03|
+----------+----------+
我一直在尝试获取 2 个给定日期的月份范围,但它没有按预期工作。
例如
- start_date (dd-mm-yyyy) = 12-01-2022
- end_date (dd-mm-yyyy) = 03-06-2022
预期输出:
Valid_From | Valid_To |
---|---|
2022-01-12 | 2022-01-31 |
2022-02-01 | 2022-02-28 |
2022-03-01 | 2022-03-31 |
2022-04-01 | 2022-04-30 |
2022-05-01 | 2022-05-31 |
2022-06-01 | 2022-06-03 |
我的代码:
var_forecast_start_date = datetime.datetime(2022, 1, 12)
var_forecast_end_date = datetime.datetime(2022, 6, 2)
df_datetime = pandas_to_spark(
df_datetime(start=var_forecast_start_date, end=var_forecast_end_date)
)
df_datetime = df_datetime.withColumn(
"DateID", date_format(df_datetime.Date, "yyyyMMdd").cast(IntegerType())
).withColumn("FiscalDate", date_format(df_datetime.Date, "yyyy-MM-dd"))
df_datetime = df_datetime.selectExpr(
"add_months(date_add(last_day(Date),1),-1) AS Valid_From",
"last_day(Date) AS Valid_To",
).distinct()
尝试以下方法:
import findspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
findspark.init()
spark = SparkSession.builder.appName("local").getOrCreate()
columns = ["start_date", "end_date"]
data = [("12-01-2022", "03-06-2022")]
df = spark.createDataFrame(data).toDF(*columns)
df = (
df.withColumn(
"start_date", F.to_date(F.col("start_date"), "dd-MM-yyyy").cast("DATE")
)
.withColumn(
"end_date", F.to_date(F.col("end_date"), "dd-MM-yyyy").cast("DATE")
)
.withColumn(
"months_between",
F.round(
F.months_between(F.col("end_date"), F.col("start_date"), True)
).cast("Integer"),
)
.withColumn(
"months_between_seq", F.sequence(F.lit(1), F.col("months_between"))
)
.withColumn("months_between_seq", F.explode(F.col("months_between_seq")))
.withColumn(
"end_of_month",
F.expr(
"""
LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1))
"""
),
)
.withColumn(
"begin_of_month",
F.expr(
"""
LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1)) + 1
"""
),
)
)
start_window_agg = Window.partitionBy().orderBy("Valid_From")
start_union_sdf = (
df.select(
F.col("start_date").alias("Valid_From")
)
.unionByName(
df.select(
F.col("begin_of_month").alias("Valid_From")
)
)
.drop_duplicates()
.withColumn(
"row_number",
F.row_number().over(start_window_agg)
)
)
end_window_agg = Window.partitionBy().orderBy("Valid_To")
end_union_sdf = (
df.select(
F.col("end_date").alias("Valid_To")
)
.unionByName(
df.select(
F.col("end_of_month").alias("Valid_To")
)
)
.drop_duplicates()
.withColumn(
"row_number",
F.row_number().over(end_window_agg)
)
)
join_sdf = (
end_union_sdf
.join(
start_union_sdf,
how="inner",
on=["row_number"]
)
.drop("row_number")
.withColumn("Valid_To", F.col("Valid_To").cast("DATE"))
.withColumn("Valid_From", F.col("Valid_From").cast("DATE"))
.select("Valid_From", "Valid_To")
.orderBy("Valid_From")
)
join_sdf.show()
它returns:
+----------+----------+
|Valid_From| Valid_To|
+----------+----------+
|2022-01-12|2022-01-31|
|2022-02-01|2022-02-28|
|2022-03-01|2022-03-31|
|2022-04-01|2022-04-30|
|2022-05-01|2022-05-31|
|2022-06-01|2022-06-03|
+----------+----------+