根据 Pyspark 中另一列中的值,有条件地从同一列中的另一行值替换一行中的值?
Conditionally replace value in a row from another row value in the same column based on value in another column in Pyspark?
网络上有这方面的变体,但并不完全符合我的预期。
我有一个这样的数据框:
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL |New_LL |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160 |0 |1 |0 |26.1184 |23.2954 |
|790026|13509 |0 |0 |1 |Infinity|-Infinity|
|790026|9162 |0 |0 |0 |25.03535|23.48585 |
|790026|13510 |0 |0 |1 |Infinity|-Infinity|
|790048|9162 |0 |0 |0 |33.5 |30.5 |
|790048|13509 |0 |0 |1 |Infinity|-Infinity|
|790048|13510 |0 |0 |0 |NaN |NaN |
|790048|9160 |0 |1 |0 |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+
我想将 New_UL
和 New_LL
值(其中 use_golden_limit
为 1)替换为每个 SEQ_ID
的 is_golden_limit
为 1 的值。因此,在这种情况下,预期结果将是:
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL |New_LL |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160 |0 |1 |0 |26.1184 |23.2954 |
|790026|13509 |0 |0 |1 |26.1184 |23.2954 |
|790026|9162 |0 |0 |0 |25.03535|23.48585 |
|790026|13510 |0 |0 |1 |26.1184 |23.2954 |
|790048|9162 |0 |0 |0 |33.5 |30.5 |
|790048|13509 |0 |0 |1 |33.94075|30.75925 |
|790048|13510 |0 |0 |0 |NaN |NaN |
|790048|9160 |0 |1 |0 |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+
这可能吗?
根据要求,每个ID只取is_golden_limit的第一个值。
正在创建数据框
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
import numpy as np
list=[[790026,9160,0,1,0,26.1184,23.2954],
[790026,13509,0,0,1,np.inf,-np.inf],
[790026,9162,0,0,0,25.03535,23.48585],
[790026,13510,0,0,1,np.inf,-np.inf],
[790048,9162,0,0,0,33.5,30.5],
[790048,13509,0,0,1,np.inf,-np.inf],
[790048,13510,0,0,0,np.NaN,np.NaN],
[790048,9160,0,1,0,33.94075,30.75925 ]]
df= spark.createDataFrame(list,['SEQ_ID','TOOL_ID','isfleetlevel','is_golden_limit','use_golden_limit','New_UL','New_LL'])
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit| New_UL| New_LL|
+------+-------+------------+---------------+----------------+--------+---------+
|790026| 9160| 0| 1| 0| 26.1184| 23.2954|
|790026| 13509| 0| 0| 1|Infinity|-Infinity|
|790026| 9162| 0| 0| 0|25.03535| 23.48585|
|790026| 13510| 0| 0| 1|Infinity|-Infinity|
|790048| 9162| 0| 0| 0| 33.5| 30.5|
|790048| 13509| 0| 0| 1|Infinity|-Infinity|
|790048| 13510| 0| 0| 0| NaN| NaN|
|790048| 9160| 0| 1| 0|33.94075| 30.75925|
+------+-------+------------+---------------+----------------+--------+---------+
正在选择要用于自连接的新数据帧
并为每个 ID
取 is_golden_limit 值的首次出现
w=Window().partitionBy("SEQ_ID").orderBy("SEQ_ID")
df1=df.select(F.col("is_golden_limit").alias("use_golden_limit"),F.col("New_UL").alias("New_UL1"),F.col("New_LL").alias("New_LL1"),"SEQ_ID").filter(F.col("is_golden_limit")==1).withColumn('row_num',F.row_number().over(w)).filter(F.col("row_num")==1).drop("row_num")
+----------------+--------+--------+------+
|use_golden_limit| New_UL1| New_LL1|SEQ_ID|
+----------------+--------+--------+------+
| 1| 26.1184| 23.2954|790026|
| 1|33.94075|30.75925|790048|
+----------------+--------+--------+------+
加入并创建符合条件的新列
df1 自然会是一个小得多的数据帧,因此,最好使用广播连接(将小数据帧广播到所有节点,以便在连接中更好地协同定位)。
df2=df.join(df1.hint("broadcast"), on=['use_golden_limit','SEQ_ID'], how='left')
df3=df2.withColumn("New_UL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_UL1")).otherwise(F.col("New_UL")))\
.withColumn("New_LL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_LL1")).otherwise(F.col("New_LL")))\
.orderBy("SEQ_ID").drop("New_UL","New_LL","New_LL1","New_UL1")
选择最终数据框和 .show()
df4=df3.select("SEQ_ID","TOOL_ID","isfleetlevel","is_golden_limit","use_golden_limit",F.col("New_UL_Final").alias("New_UL"),
F.col("New_LL_Final").alias("New_LL"))
df4.show()
最终数据帧:
+------+-------+------------+---------------+----------------+--------+--------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit| New_UL| New_LL|
+------+-------+------------+---------------+----------------+--------+--------+
|790026| 13510| 0| 0| 1| 26.1184| 23.2954|
|790026| 9162| 0| 0| 0|25.03535|23.48585|
|790026| 13509| 0| 0| 1| 26.1184| 23.2954|
|790026| 9160| 0| 1| 0| 26.1184| 23.2954|
|790048| 13509| 0| 0| 1|33.94075|30.75925|
|790048| 9160| 0| 1| 0|33.94075|30.75925|
|790048| 9162| 0| 0| 0| 33.5| 30.5|
|790048| 13510| 0| 0| 0| NaN| NaN|
+------+-------+------------+---------------+----------------+--------+--------+
网络上有这方面的变体,但并不完全符合我的预期。 我有一个这样的数据框:
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL |New_LL |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160 |0 |1 |0 |26.1184 |23.2954 |
|790026|13509 |0 |0 |1 |Infinity|-Infinity|
|790026|9162 |0 |0 |0 |25.03535|23.48585 |
|790026|13510 |0 |0 |1 |Infinity|-Infinity|
|790048|9162 |0 |0 |0 |33.5 |30.5 |
|790048|13509 |0 |0 |1 |Infinity|-Infinity|
|790048|13510 |0 |0 |0 |NaN |NaN |
|790048|9160 |0 |1 |0 |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+
我想将 New_UL
和 New_LL
值(其中 use_golden_limit
为 1)替换为每个 SEQ_ID
的 is_golden_limit
为 1 的值。因此,在这种情况下,预期结果将是:
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL |New_LL |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160 |0 |1 |0 |26.1184 |23.2954 |
|790026|13509 |0 |0 |1 |26.1184 |23.2954 |
|790026|9162 |0 |0 |0 |25.03535|23.48585 |
|790026|13510 |0 |0 |1 |26.1184 |23.2954 |
|790048|9162 |0 |0 |0 |33.5 |30.5 |
|790048|13509 |0 |0 |1 |33.94075|30.75925 |
|790048|13510 |0 |0 |0 |NaN |NaN |
|790048|9160 |0 |1 |0 |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+
这可能吗?
根据要求,每个ID只取is_golden_limit的第一个值。
正在创建数据框
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
import numpy as np
list=[[790026,9160,0,1,0,26.1184,23.2954],
[790026,13509,0,0,1,np.inf,-np.inf],
[790026,9162,0,0,0,25.03535,23.48585],
[790026,13510,0,0,1,np.inf,-np.inf],
[790048,9162,0,0,0,33.5,30.5],
[790048,13509,0,0,1,np.inf,-np.inf],
[790048,13510,0,0,0,np.NaN,np.NaN],
[790048,9160,0,1,0,33.94075,30.75925 ]]
df= spark.createDataFrame(list,['SEQ_ID','TOOL_ID','isfleetlevel','is_golden_limit','use_golden_limit','New_UL','New_LL'])
+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit| New_UL| New_LL|
+------+-------+------------+---------------+----------------+--------+---------+
|790026| 9160| 0| 1| 0| 26.1184| 23.2954|
|790026| 13509| 0| 0| 1|Infinity|-Infinity|
|790026| 9162| 0| 0| 0|25.03535| 23.48585|
|790026| 13510| 0| 0| 1|Infinity|-Infinity|
|790048| 9162| 0| 0| 0| 33.5| 30.5|
|790048| 13509| 0| 0| 1|Infinity|-Infinity|
|790048| 13510| 0| 0| 0| NaN| NaN|
|790048| 9160| 0| 1| 0|33.94075| 30.75925|
+------+-------+------------+---------------+----------------+--------+---------+
正在选择要用于自连接的新数据帧
并为每个 ID
取 is_golden_limit 值的首次出现w=Window().partitionBy("SEQ_ID").orderBy("SEQ_ID")
df1=df.select(F.col("is_golden_limit").alias("use_golden_limit"),F.col("New_UL").alias("New_UL1"),F.col("New_LL").alias("New_LL1"),"SEQ_ID").filter(F.col("is_golden_limit")==1).withColumn('row_num',F.row_number().over(w)).filter(F.col("row_num")==1).drop("row_num")
+----------------+--------+--------+------+
|use_golden_limit| New_UL1| New_LL1|SEQ_ID|
+----------------+--------+--------+------+
| 1| 26.1184| 23.2954|790026|
| 1|33.94075|30.75925|790048|
+----------------+--------+--------+------+
加入并创建符合条件的新列
df1 自然会是一个小得多的数据帧,因此,最好使用广播连接(将小数据帧广播到所有节点,以便在连接中更好地协同定位)。
df2=df.join(df1.hint("broadcast"), on=['use_golden_limit','SEQ_ID'], how='left')
df3=df2.withColumn("New_UL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_UL1")).otherwise(F.col("New_UL")))\
.withColumn("New_LL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_LL1")).otherwise(F.col("New_LL")))\
.orderBy("SEQ_ID").drop("New_UL","New_LL","New_LL1","New_UL1")
选择最终数据框和 .show()
df4=df3.select("SEQ_ID","TOOL_ID","isfleetlevel","is_golden_limit","use_golden_limit",F.col("New_UL_Final").alias("New_UL"),
F.col("New_LL_Final").alias("New_LL"))
df4.show()
最终数据帧:
+------+-------+------------+---------------+----------------+--------+--------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit| New_UL| New_LL|
+------+-------+------------+---------------+----------------+--------+--------+
|790026| 13510| 0| 0| 1| 26.1184| 23.2954|
|790026| 9162| 0| 0| 0|25.03535|23.48585|
|790026| 13509| 0| 0| 1| 26.1184| 23.2954|
|790026| 9160| 0| 1| 0| 26.1184| 23.2954|
|790048| 13509| 0| 0| 1|33.94075|30.75925|
|790048| 9160| 0| 1| 0|33.94075|30.75925|
|790048| 9162| 0| 0| 0| 33.5| 30.5|
|790048| 13510| 0| 0| 0| NaN| NaN|
+------+-------+------------+---------------+----------------+--------+--------+