PySpark:withColumn() 有两个条件和三个结果

PySpark: withColumn() with two conditions and three outcomes

我正在使用 Spark 和 PySpark。我正在尝试实现等效于以下伪代码的结果:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

我正尝试在 PySpark 中执行此操作,但我不确定语法。任何指针?我查看了 expr(),但无法正常工作。

请注意 dfpyspark.sql.dataframe.DataFrame

您需要使用如下的 udf

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def func(fruit1, fruit2):
    if fruit1 == None or fruit2 == None:
        return 3
    if fruit1 == fruit2:
        return 1
    return 0

func_udf = udf(func, IntegerType())
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))

有几种有效的方法可以实现这一点。让我们从必需的导入开始:

from pyspark.sql.functions import col, expr, when

您可以在 expr:

中使用 Hive IF 函数
new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

when + otherwise:

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

最后你可以使用以下技巧:

from pyspark.sql.functions import coalesce, lit

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))

示例数据:

df = sc.parallelize([
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])

您可以按如下方式使用它:

(df
    .withColumn("new_column_1", new_column_1)
    .withColumn("new_column_2", new_column_2)
    .withColumn("new_column_3", new_column_3))

结果是:

+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple|           0|           0|           0|
|  kiwi|  null|           3|           3|           3|
|  null|banana|           3|           3|           3|
| mango| mango|           1|           1|           1|
|  null|  null|           3|           3|           3|
+------+------+------------+------------+------------+

pyspark 中的 withColumn 函数使您能够创建一个带有条件的新变量,添加 whenotherwise 函数,您就拥有了一个正常工作的 if then else 结构。

对于所有这些,您需要导入 sparksql 函数,因为您将看到如果没有 col() 函数,以下代码将无法工作。

第一位,我们声明一个新的列-'new column',然后给出when函数中包含的条件(即fruit1==fruit2)如果条件为真则给出1,如果不为真则控制转到 otherwise,然后使用 isNull() 函数处理第二个条件(fruit1 或 fruit2 为 Null),如果返回 true 3,如果为 false,则再次检查 otherwise,给出 0 作为答案。

from pyspark.sql import functions as F

df=df.withColumn('new_column', 
    F.when(F.col('fruit1')==F.col('fruit2'), 1)
    .otherwise(F.when((F.col('fruit1').isNull()) | (F.col('fruit2').isNull()), 3))
    .otherwise(0))