PySpark 使用 window 使用先前创建的字段值创建字段
PySpark using window to create field using previous created field value
我正在尝试使用 Window 在我的 df 中创建一个名为 indexCP 的新列我想从 indexCP * (current_df['return']+1 中获取以前的值如果没有以前的 indexCP 做 100 * (current_df['return']+1).
column_list = ["id","secname"]
windowval = (Window.partitionBy(column_list).orderBy(col('calendarday').cast("timestamp").cast("long")).rangeBetween(Window.unboundedPreceding, 0))
spark_df = spark_df.withColumn('indexCP', when(spark_df["PreviousYearUnique"] == spark_df["yearUnique"], 100 * (current_df['return']+1)).otherwise(last('indexCP').over(windowval) * (current_df['return']+1)))
当我 运行 上面的代码时,我得到一个错误 "AnalysisException: "cannot resolve 'indexCP
' given input columns:" 我相信这是说你不能取一个有尚未创建,但我不确定如何修复它。
Starting Data Frame
## +---+-----------+----------+------------------+
## | id|calendarday| secName| return|
## +---+-----------+----------+------------------+
## | 1|2015-01-01 | 1| 0.0076|
## | 1|2015-01-02 | 1| 0.0026|
## | 1|2015-01-01 | 2| 0.0016|
## | 1|2015-01-02 | 2| 0.0006|
## | 2|2015-01-01 | 3| 0.0012|
## | 2|2015-01-02 | 3| 0.0014|
## +---+----------+-----------+------------------+
New Data Frame IndexCP added
## +---+-----------+--------+---------+------------+
## | id|calendarday| secName| return| IndexCP|
## +---+-----------+--------+---------+------------+
## | 1|2015-01-01 | 1| 0.0076| 100.76|(1st 100*(return+1))
## | 1|2015-01-02 | 1| 0.0026| 101.021976|(2nd 100.76*(return+1))
## | 2|2015-01-01 | 2| 0.0016| 100.16|(1st 100*(return+1))
## | 2|2015-01-02 | 2| 0.0006| 100.220096|(2nd 100.16*(return+1))
## | 3|2015-01-01 | 3| 0.0012| 100.12 |(1st 100*(return+1))
## | 3|2015-01-02 | 3| 0.0014| 100.260168|(2nd 100.12*(return+1))
## +---+----------+---------+---------+------------+
编辑:这应该是最终答案,我已将其扩展到 secName
列的另一行。
您正在寻找的是使用您的 IndexCP * (current_return + 1)
公式的滚动乘积函数。
首先你需要将所有现有的returns聚合成一个ArrayType
然后聚合。这可以通过一些 Spark SQL aggregate
函数来完成,例如:
column_list = ["id","secname"]
windowval = (
Window.partitionBy(column_list)
.orderBy(f.col('calendarday').cast("timestamp"))
.rangeBetween(Window.unboundedPreceding, 0)
)
df1.show()
+---+-----------+-------+------+
| id|calendarday|secName|return|
+---+-----------+-------+------+
| 1| 2015-01-01| 1|0.0076|
| 1| 2015-01-02| 1|0.0026|
| 1| 2015-01-03| 1|0.0014|
| 2| 2015-01-01| 2|0.0016|
| 2| 2015-01-02| 2|6.0E-4|
| 2| 2015-01-03| 2| 0.0|
| 3| 2015-01-01| 3|0.0012|
| 3| 2015-01-02| 3|0.0014|
+---+-----------+-------+------+
# f.collect_list(...) gets all your returns - this must be windowed
# cast(1 as double) is your base of 1 to begin with
# (acc, x) -> acc * (1 + x) is your formula translated to Spark SQL
# where acc is the accumulated value and x is the incoming value
df1.withColumn(
"rolling_returns",
f.collect_list("return").over(windowval)
).withColumn("IndexCP",
100 * f.expr("""
aggregate(
rolling_returns,
cast(1 as double),
(acc, x) -> acc * (1+x))
""")
).orderBy("id", "calendarday").show(truncate=False)
+---+-----------+-------+------+------------------------+------------------+
|id |calendarday|secName|return|rolling_returns |IndexCP |
+---+-----------+-------+------+------------------------+------------------+
|1 |2015-01-01 |1 |0.0076|[0.0076] |100.76 |
|1 |2015-01-02 |1 |0.0026|[0.0076, 0.0026] |101.021976 |
|1 |2015-01-03 |1 |0.0014|[0.0076, 0.0026, 0.0014]|101.16340676640002|
|2 |2015-01-01 |2 |0.0016|[0.0016] |100.16000000000001|
|2 |2015-01-02 |2 |6.0E-4|[0.0016, 6.0E-4] |100.220096 |
|2 |2015-01-03 |2 |0.0 |[0.0016, 6.0E-4, 0.0] |100.220096 |
|3 |2015-01-01 |3 |0.0012|[0.0012] |100.12 |
|3 |2015-01-02 |3 |0.0014|[0.0012, 0.0014] |100.26016800000002|
+---+-----------+-------+------+------------------------+------------------+
解释:起始值 必须 为 1 并且 100 的乘数必须在表达式的外部,否则你确实开始漂移比预期高 100 倍 returns。
我已验证这些值现在符合您的公式,例如 secName == 1 and id == 1
:
100 * ((1.0026 * (0.0076 + 1)) * (0.0014 + 1)) = 101.1634067664
根据公式(acc, x) -> acc * (1+x)
确实是正确的。希望这对您有所帮助!
我正在尝试使用 Window 在我的 df 中创建一个名为 indexCP 的新列我想从 indexCP * (current_df['return']+1 中获取以前的值如果没有以前的 indexCP 做 100 * (current_df['return']+1).
column_list = ["id","secname"]
windowval = (Window.partitionBy(column_list).orderBy(col('calendarday').cast("timestamp").cast("long")).rangeBetween(Window.unboundedPreceding, 0))
spark_df = spark_df.withColumn('indexCP', when(spark_df["PreviousYearUnique"] == spark_df["yearUnique"], 100 * (current_df['return']+1)).otherwise(last('indexCP').over(windowval) * (current_df['return']+1)))
当我 运行 上面的代码时,我得到一个错误 "AnalysisException: "cannot resolve 'indexCP
' given input columns:" 我相信这是说你不能取一个有尚未创建,但我不确定如何修复它。
Starting Data Frame
## +---+-----------+----------+------------------+
## | id|calendarday| secName| return|
## +---+-----------+----------+------------------+
## | 1|2015-01-01 | 1| 0.0076|
## | 1|2015-01-02 | 1| 0.0026|
## | 1|2015-01-01 | 2| 0.0016|
## | 1|2015-01-02 | 2| 0.0006|
## | 2|2015-01-01 | 3| 0.0012|
## | 2|2015-01-02 | 3| 0.0014|
## +---+----------+-----------+------------------+
New Data Frame IndexCP added
## +---+-----------+--------+---------+------------+
## | id|calendarday| secName| return| IndexCP|
## +---+-----------+--------+---------+------------+
## | 1|2015-01-01 | 1| 0.0076| 100.76|(1st 100*(return+1))
## | 1|2015-01-02 | 1| 0.0026| 101.021976|(2nd 100.76*(return+1))
## | 2|2015-01-01 | 2| 0.0016| 100.16|(1st 100*(return+1))
## | 2|2015-01-02 | 2| 0.0006| 100.220096|(2nd 100.16*(return+1))
## | 3|2015-01-01 | 3| 0.0012| 100.12 |(1st 100*(return+1))
## | 3|2015-01-02 | 3| 0.0014| 100.260168|(2nd 100.12*(return+1))
## +---+----------+---------+---------+------------+
编辑:这应该是最终答案,我已将其扩展到 secName
列的另一行。
您正在寻找的是使用您的 IndexCP * (current_return + 1)
公式的滚动乘积函数。
首先你需要将所有现有的returns聚合成一个ArrayType
然后聚合。这可以通过一些 Spark SQL aggregate
函数来完成,例如:
column_list = ["id","secname"]
windowval = (
Window.partitionBy(column_list)
.orderBy(f.col('calendarday').cast("timestamp"))
.rangeBetween(Window.unboundedPreceding, 0)
)
df1.show()
+---+-----------+-------+------+
| id|calendarday|secName|return|
+---+-----------+-------+------+
| 1| 2015-01-01| 1|0.0076|
| 1| 2015-01-02| 1|0.0026|
| 1| 2015-01-03| 1|0.0014|
| 2| 2015-01-01| 2|0.0016|
| 2| 2015-01-02| 2|6.0E-4|
| 2| 2015-01-03| 2| 0.0|
| 3| 2015-01-01| 3|0.0012|
| 3| 2015-01-02| 3|0.0014|
+---+-----------+-------+------+
# f.collect_list(...) gets all your returns - this must be windowed
# cast(1 as double) is your base of 1 to begin with
# (acc, x) -> acc * (1 + x) is your formula translated to Spark SQL
# where acc is the accumulated value and x is the incoming value
df1.withColumn(
"rolling_returns",
f.collect_list("return").over(windowval)
).withColumn("IndexCP",
100 * f.expr("""
aggregate(
rolling_returns,
cast(1 as double),
(acc, x) -> acc * (1+x))
""")
).orderBy("id", "calendarday").show(truncate=False)
+---+-----------+-------+------+------------------------+------------------+
|id |calendarday|secName|return|rolling_returns |IndexCP |
+---+-----------+-------+------+------------------------+------------------+
|1 |2015-01-01 |1 |0.0076|[0.0076] |100.76 |
|1 |2015-01-02 |1 |0.0026|[0.0076, 0.0026] |101.021976 |
|1 |2015-01-03 |1 |0.0014|[0.0076, 0.0026, 0.0014]|101.16340676640002|
|2 |2015-01-01 |2 |0.0016|[0.0016] |100.16000000000001|
|2 |2015-01-02 |2 |6.0E-4|[0.0016, 6.0E-4] |100.220096 |
|2 |2015-01-03 |2 |0.0 |[0.0016, 6.0E-4, 0.0] |100.220096 |
|3 |2015-01-01 |3 |0.0012|[0.0012] |100.12 |
|3 |2015-01-02 |3 |0.0014|[0.0012, 0.0014] |100.26016800000002|
+---+-----------+-------+------+------------------------+------------------+
解释:起始值 必须 为 1 并且 100 的乘数必须在表达式的外部,否则你确实开始漂移比预期高 100 倍 returns。
我已验证这些值现在符合您的公式,例如 secName == 1 and id == 1
:
100 * ((1.0026 * (0.0076 + 1)) * (0.0014 + 1)) = 101.1634067664
根据公式(acc, x) -> acc * (1+x)
确实是正确的。希望这对您有所帮助!