在不更改旧值的情况下更新 pyspark 中的现有列

update an existing column in pyspark without changing older values

我正在尝试更新 pyspark 中的现有列,但似乎该列中的旧值也在更新,尽管没有其他条件

+-----+-----+-----+-----+-----+----+
|cntry|cde_1|cde_2|rsn_1|rsn_2|FLAG|
+-----+-----+-----+-----+-----+----+
|   MY|    A|     |    1|    2|null|
|   MY|    G|     |    1|    2|null|
|   MY|     |    G|    1|    2|null|
|   TH|    A|     |   16|    2|null|
|   TH|    B|     |    1|   16|   1|
|   TH|     |    W|   16|    2|   1|
+-----+-----+-----+-----+-----+----+
df = sc.parallelize([ ["MY","A","","1","2"], ["MY","G","","1","2"], ["MY","","G","1","2"],  ["TH","A","","16","2"], ["TH","B","","1","16"], ["TH","","W","16","2"] ]).toDF(("cntry", "cde_1", "cde_2", "rsn_1", "rsn_2"))


df = df.withColumn('FLAG', F.when( (df.cntry == "MY") &  ( (df.cde_1.isin("G") ) |  (df.cde_2.isin("G") ) )   &  ( (df.rsn_1 == "1") | (df.rsn_2 == "1") ) , 1))

df = df.withColumn('FLAG', F.when( (df.cntry == "TH") &  ( (df.cde_1.isin("B", "W") ) |  (df.cde_2.isin("B", "W") ) )  & ( (df.rsn_1 == "16") |  (df.rsn_2 == "16") ) , 1))

您需要使用布尔 OR 组合您的条件。像这样:

df = sc.parallelize([ ["MY","A","","1","2"], ["MY","G","","1","2"], ["MY","","G","1","2"], ["TH","A","","16","2"], ["TH","B","","1","16"], ["TH","","W","16","2"] ]).toDF(("cntry", "cde_1", "cde_2", "rsn_1", "rsn_2"))
cond1 = (df.cntry == "MY") & ( (df.cde_1.isin("G") ) | (df.cde_2.isin("G") ) ) & ( (df.rsn_1 == "1") | (df.rsn_2 == "1") )
cond2 = (df.cntry == "TH") & ( (df.cde_1.isin("B", "W") ) | (df.cde_2.isin("B", "W") ) ) & ( (df.rsn_1 == "16") | (df.rsn_2 == "16") )
df.withColumn("FLAG", F.when(cond1 | cond2, 1)).show()

在你的最后一行,你覆盖 FLAG 列,因为你没有引用它以前的状态。这就是为什么不保留以前的值的原因。

除了组合表达式,您还可以使用 when(cond1, 1).otherwise(when(cond2, 1))。这是一种风格选择。