使用 PySpark 的共同基金 YTD 和 MTD

YTD and MTD of Mutual Fund using PySpark

我有 NAV 历史 CSV,我必须使用 NAV 计算给定日期互惠基金的 YTD 和 MTD 表现。

我的 CSV 看起来像这样。

MutualFundName  NAV Date
A               2   2022-02-03
A               2.2 2022-02-02
A               2.1 2022-02-01
B               3   2022-02-03
B               2.9 2022-02-02
B               2.7 2022-02-01
C               6   2022-02-03
C               5.5 2022-02-02
C               5.9 2022-02-01

在给定日期我有每个互惠基金的相应资产净值。

我必须计算当前日期每个共同基金的 YTD 和 MTD。

YTD 公式为

((NAV(end) - NAV(start)) / NAV(start)) * 100

其中 NAV(end) 是当前日期,NAV(start) 是当年的 1 月 1 日。 同样,对于 MTD,它将是并且 NAV(start) 将是给定月份和年份的第一天。

我必须编写一个 pyspark 作业来实现它。目前,我在 DataFrame 中有 CSV 数据。

预计 O/P 2022 年 2 月 3 日的 MTD 将为

MutualFundName  MTD
A               -4.761904762
B               11.11111111
C               1.694915254

过滤 Date 等于 current_date 或当月的第一天,然后按 MutualFundName 分组并应用您的公式:

from pyspark.sql import functions as F

result = df.filter(
    "Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
    ((F.max(F.struct("Date", "NAV"))["NAV"] - F.min(F.struct("Date", "NAV"))["NAV"]) /
     F.min(F.struct("Date", "NAV"))["NAV"] * 100
     ).alias("MTD")
)

result.show()
#+--------------+------------------+
#|MutualFundName|               MTD|
#+--------------+------------------+
#|             A|-4.761904761904765|
#|             B|11.111111111111104|
#|             C|1.6949152542372818|
#+--------------+------------------+

对于 Spark 3+,您可以使用 max_bymin_by 函数代替聚合中的结构排序:

result = df.filter(
    "Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
    ((F.expr("max_by(NAV, Date)") - F.expr("min_by(NAV, Date)")) /
     F.expr("min_by(NAV, Date)") * 100
     ).alias("MTD")
)