PYSPARK UDF 根据日期范围分解记录
PYSPARK UDF to explode records based on date range
我是 Python 和 Pyspark 的菜鸟。
我需要将一排患者分解成每年的日期,这样每个患者每年都有 1 行。
我写了一个python函数(下),并将其注册为pyspark UDF(在这里阅读了很多文章)。我的问题是,当我将它应用于我的 pyspark 数据框时,它失败了。我的函数 returns 4 个值,但我不知道如何 use/apply 使用 .withColumn 或其他 method/function 使用此 UDF。
在将 python 函数注册为 UDF 时,我无法理解要返回的数据类型。我确定我在某个地方搞砸了,但不知道在哪里。
请帮忙
import sys
import pyspark.sql.functions as f
import pyspark.sql as t
from pyspark.sql.types import *
import datetime
import dateutil
def process_data(identifier, eff_date, end_date):
eff_year = int(eff_date.split("/")[2])
end_year = int(end_date.split("/")[2])
current_year = eff_year
while current_year <= end_year:
if str(current_year) in eff_date:
first_date_in_year = eff_date
else:
first_date_in_year = "%02d/%02d/%4d" % (1, 1, current_year)
if str(current_year) in end_date:
last_date_in_year = end_date
else:
last_date_in_year = "%02d/%02d/%4d" % (12, 31, current_year)
print ("|".join([identifier, first_date_in_year, last_date_in_year]))
current_year += 1
return identifier, first_date_in_year, last_date_of_year, current_year
def main():
with open('date.input') as f:
for line in f.read().splitlines():
identifier, eff_date, end_date = line.split('|')
process_data(identifier, eff_date, end_date)
process_data_UDF=f.udf(process_data, StringType())
df_explode=df.withColumn("NEWCOLS", process_data_UDF("INDV_ID", "EFF_DATE", "END_DATE"))
df_explode.show()
输入数据
Identifier
Eff_Date
End_Date
A0001
2/1/2019
3/31/2021
B0001
6/1/2020
11/30/2020
C0001
6/1/2020
1/31/2021
D0001
6/1/2020
12/31/2021
E0001
1/1/2019
6/30/2019
E0001
1/1/2020
12/31/2020
输出数据
Identifier
New_Eff_Date
New_End_Date
A0001
2/1/2019
12/31/2019
A0001
01/01/2020
12/31/2020
A0001
01/01/2021
3/31/2021
B0001
6/1/2020
11/30/2020
C0001
6/1/2020
12/31/2020
C0001
01/01/2021
1/31/2021
D0001
6/1/2020
12/31/2020
D0001
01/01/2021
12/31/2021
E0001
1/1/2019
6/30/2019
E0001
1/1/2020
12/31/2020
我的输入数据框,基于您的数据:
from datetime import date
b = "Identifier Eff_Date End_Date".split()
a = [
["A0001", date(2019, 2, 1), date(2021, 3, 31)],
]
df = spark.createDataFrame(a, b)
df.printSchema()
root
|-- Identifier: string (nullable = true)
|-- Eff_Date: date (nullable = true)
|-- End_Date: date (nullable = true)
df.show()
+----------+----------+----------+
|Identifier| Eff_Date| End_Date|
+----------+----------+----------+
| A0001|2019-02-01|2021-03-31|
+----------+----------+----------+
这是您需要的功能:
def generate_dates(start, end):
out = []
for i in range((end.year - start.year)+1):
s = date(start.year+i, 1,1)
e = date(start.year+i, 12,31)
out.append((max(s, start), min(e, end)))
return out
让我们在 UDF 中转换它,在带有 explode
的数据框中使用它,您应该会得到预期的结果:
from pyspark.sql import functions as F, types as T
schema = T.ArrayType(
T.StructType(
[
T.StructField("New_Eff_Date", T.DateType()),
T.StructField("New_End_Date", T.DateType()),
]
)
)
generate_dates_udf = F.udf(generate_dates, schema)
df.withColumn(
"new_dates", generate_dates_udf(F.col("Eff_Date"), F.col("End_Date"))
).withColumn("new_date", F.explode("new_dates")).select(
"Identifier",
"new_date.New_Eff_Date",
"new_date.New_End_Date",
).show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+
相同版本但火花四射:
df = (
df.withColumn("year", F.explode(F.sequence(F.year("Eff_Date"), F.year("End_Date"))))
.withColumn(
"s",
F.concat_ws("-", F.col("year").cast("string"), F.lit("01"), F.lit("01")).cast(
"date"
),
)
.withColumn(
"e",
F.concat_ws("-", F.col("year").cast("string"), F.lit("12"), F.lit("31")).cast(
"date"
),
)
.select(
"Identifier",
F.greatest(F.col("Eff_Date"), F.col("s")).alias("New_Eff_Date"),
F.least(F.col("End_Date"), F.col("e")).alias("New_End_Date"),
)
)
df.show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+
我是 Python 和 Pyspark 的菜鸟。 我需要将一排患者分解成每年的日期,这样每个患者每年都有 1 行。
我写了一个python函数(下),并将其注册为pyspark UDF(在这里阅读了很多文章)。我的问题是,当我将它应用于我的 pyspark 数据框时,它失败了。我的函数 returns 4 个值,但我不知道如何 use/apply 使用 .withColumn 或其他 method/function 使用此 UDF。
在将 python 函数注册为 UDF 时,我无法理解要返回的数据类型。我确定我在某个地方搞砸了,但不知道在哪里。
请帮忙
import sys
import pyspark.sql.functions as f
import pyspark.sql as t
from pyspark.sql.types import *
import datetime
import dateutil
def process_data(identifier, eff_date, end_date):
eff_year = int(eff_date.split("/")[2])
end_year = int(end_date.split("/")[2])
current_year = eff_year
while current_year <= end_year:
if str(current_year) in eff_date:
first_date_in_year = eff_date
else:
first_date_in_year = "%02d/%02d/%4d" % (1, 1, current_year)
if str(current_year) in end_date:
last_date_in_year = end_date
else:
last_date_in_year = "%02d/%02d/%4d" % (12, 31, current_year)
print ("|".join([identifier, first_date_in_year, last_date_in_year]))
current_year += 1
return identifier, first_date_in_year, last_date_of_year, current_year
def main():
with open('date.input') as f:
for line in f.read().splitlines():
identifier, eff_date, end_date = line.split('|')
process_data(identifier, eff_date, end_date)
process_data_UDF=f.udf(process_data, StringType())
df_explode=df.withColumn("NEWCOLS", process_data_UDF("INDV_ID", "EFF_DATE", "END_DATE"))
df_explode.show()
输入数据
Identifier | Eff_Date | End_Date |
---|---|---|
A0001 | 2/1/2019 | 3/31/2021 |
B0001 | 6/1/2020 | 11/30/2020 |
C0001 | 6/1/2020 | 1/31/2021 |
D0001 | 6/1/2020 | 12/31/2021 |
E0001 | 1/1/2019 | 6/30/2019 |
E0001 | 1/1/2020 | 12/31/2020 |
输出数据
Identifier | New_Eff_Date | New_End_Date |
---|---|---|
A0001 | 2/1/2019 | 12/31/2019 |
A0001 | 01/01/2020 | 12/31/2020 |
A0001 | 01/01/2021 | 3/31/2021 |
B0001 | 6/1/2020 | 11/30/2020 |
C0001 | 6/1/2020 | 12/31/2020 |
C0001 | 01/01/2021 | 1/31/2021 |
D0001 | 6/1/2020 | 12/31/2020 |
D0001 | 01/01/2021 | 12/31/2021 |
E0001 | 1/1/2019 | 6/30/2019 |
E0001 | 1/1/2020 | 12/31/2020 |
我的输入数据框,基于您的数据:
from datetime import date
b = "Identifier Eff_Date End_Date".split()
a = [
["A0001", date(2019, 2, 1), date(2021, 3, 31)],
]
df = spark.createDataFrame(a, b)
df.printSchema()
root
|-- Identifier: string (nullable = true)
|-- Eff_Date: date (nullable = true)
|-- End_Date: date (nullable = true)
df.show()
+----------+----------+----------+
|Identifier| Eff_Date| End_Date|
+----------+----------+----------+
| A0001|2019-02-01|2021-03-31|
+----------+----------+----------+
这是您需要的功能:
def generate_dates(start, end):
out = []
for i in range((end.year - start.year)+1):
s = date(start.year+i, 1,1)
e = date(start.year+i, 12,31)
out.append((max(s, start), min(e, end)))
return out
让我们在 UDF 中转换它,在带有 explode
的数据框中使用它,您应该会得到预期的结果:
from pyspark.sql import functions as F, types as T
schema = T.ArrayType(
T.StructType(
[
T.StructField("New_Eff_Date", T.DateType()),
T.StructField("New_End_Date", T.DateType()),
]
)
)
generate_dates_udf = F.udf(generate_dates, schema)
df.withColumn(
"new_dates", generate_dates_udf(F.col("Eff_Date"), F.col("End_Date"))
).withColumn("new_date", F.explode("new_dates")).select(
"Identifier",
"new_date.New_Eff_Date",
"new_date.New_End_Date",
).show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+
相同版本但火花四射:
df = (
df.withColumn("year", F.explode(F.sequence(F.year("Eff_Date"), F.year("End_Date"))))
.withColumn(
"s",
F.concat_ws("-", F.col("year").cast("string"), F.lit("01"), F.lit("01")).cast(
"date"
),
)
.withColumn(
"e",
F.concat_ws("-", F.col("year").cast("string"), F.lit("12"), F.lit("31")).cast(
"date"
),
)
.select(
"Identifier",
F.greatest(F.col("Eff_Date"), F.col("s")).alias("New_Eff_Date"),
F.least(F.col("End_Date"), F.col("e")).alias("New_End_Date"),
)
)
df.show()
+----------+------------+------------+
|Identifier|New_Eff_Date|New_End_Date|
+----------+------------+------------+
| A0001| 2019-02-01| 2019-12-31|
| A0001| 2020-01-01| 2020-12-31|
| A0001| 2021-01-01| 2021-03-31|
+----------+------------+------------+