使用 PySpark 的共同基金 YTD 和 MTD
YTD and MTD of Mutual Fund using PySpark
我有 NAV 历史 CSV,我必须使用 NAV 计算给定日期互惠基金的 YTD 和 MTD 表现。
我的 CSV 看起来像这样。
MutualFundName NAV Date
A 2 2022-02-03
A 2.2 2022-02-02
A 2.1 2022-02-01
B 3 2022-02-03
B 2.9 2022-02-02
B 2.7 2022-02-01
C 6 2022-02-03
C 5.5 2022-02-02
C 5.9 2022-02-01
在给定日期我有每个互惠基金的相应资产净值。
我必须计算当前日期每个共同基金的 YTD 和 MTD。
YTD 公式为
((NAV(end) - NAV(start)) / NAV(start)) * 100
其中 NAV(end)
是当前日期,NAV(start)
是当年的 1 月 1 日。
同样,对于 MTD,它将是并且 NAV(start)
将是给定月份和年份的第一天。
我必须编写一个 pyspark 作业来实现它。目前,我在 DataFrame 中有 CSV 数据。
预计 O/P 2022 年 2 月 3 日的 MTD 将为
MutualFundName MTD
A -4.761904762
B 11.11111111
C 1.694915254
过滤 Date
等于 current_date
或当月的第一天,然后按 MutualFundName
分组并应用您的公式:
from pyspark.sql import functions as F
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.max(F.struct("Date", "NAV"))["NAV"] - F.min(F.struct("Date", "NAV"))["NAV"]) /
F.min(F.struct("Date", "NAV"))["NAV"] * 100
).alias("MTD")
)
result.show()
#+--------------+------------------+
#|MutualFundName| MTD|
#+--------------+------------------+
#| A|-4.761904761904765|
#| B|11.111111111111104|
#| C|1.6949152542372818|
#+--------------+------------------+
对于 Spark 3+,您可以使用 max_by
和 min_by
函数代替聚合中的结构排序:
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.expr("max_by(NAV, Date)") - F.expr("min_by(NAV, Date)")) /
F.expr("min_by(NAV, Date)") * 100
).alias("MTD")
)
我有 NAV 历史 CSV,我必须使用 NAV 计算给定日期互惠基金的 YTD 和 MTD 表现。
我的 CSV 看起来像这样。
MutualFundName NAV Date
A 2 2022-02-03
A 2.2 2022-02-02
A 2.1 2022-02-01
B 3 2022-02-03
B 2.9 2022-02-02
B 2.7 2022-02-01
C 6 2022-02-03
C 5.5 2022-02-02
C 5.9 2022-02-01
在给定日期我有每个互惠基金的相应资产净值。
我必须计算当前日期每个共同基金的 YTD 和 MTD。
YTD 公式为
((NAV(end) - NAV(start)) / NAV(start)) * 100
其中 NAV(end)
是当前日期,NAV(start)
是当年的 1 月 1 日。
同样,对于 MTD,它将是并且 NAV(start)
将是给定月份和年份的第一天。
我必须编写一个 pyspark 作业来实现它。目前,我在 DataFrame 中有 CSV 数据。
预计 O/P 2022 年 2 月 3 日的 MTD 将为
MutualFundName MTD
A -4.761904762
B 11.11111111
C 1.694915254
过滤 Date
等于 current_date
或当月的第一天,然后按 MutualFundName
分组并应用您的公式:
from pyspark.sql import functions as F
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.max(F.struct("Date", "NAV"))["NAV"] - F.min(F.struct("Date", "NAV"))["NAV"]) /
F.min(F.struct("Date", "NAV"))["NAV"] * 100
).alias("MTD")
)
result.show()
#+--------------+------------------+
#|MutualFundName| MTD|
#+--------------+------------------+
#| A|-4.761904761904765|
#| B|11.111111111111104|
#| C|1.6949152542372818|
#+--------------+------------------+
对于 Spark 3+,您可以使用 max_by
和 min_by
函数代替聚合中的结构排序:
result = df.filter(
"Date = current_date or Date = date_trunc('mm', current_date)"
).groupBy("MutualFundName").agg(
((F.expr("max_by(NAV, Date)") - F.expr("min_by(NAV, Date)")) /
F.expr("min_by(NAV, Date)") * 100
).alias("MTD")
)