pyspark - 在列之间

pyspark - isin between columns

我正在尝试使用 isin 函数来检查 pyspark 数据帧列的值是否出现在另一列的同一行上。

+---+-------------+----+------------+--------+
| ID|         date| loc|   main_list|  GOAL_f|
+---+-------------+----+------------+--------+
|ID1|   2017-07-01|  L1|        [L1]|       1|
|ID1|   2017-07-02|  L1|        [L1]|       1|
|ID1|   2017-07-03|  L2|        [L1]|       0|
|ID1|   2017-07-04|  L2|     [L1,L2]|       1|
|ID1|   2017-07-05|  L1|     [L1,L2]|       1|
|ID1|   2017-07-06|  L3|     [L1,L2]|       0|
|ID1|   2017-07-07|  L3|  [L1,L2,L3]|       1|
+---+-------------+----+------------+--------+

但是我在尝试收集 main_list 进行比较时遇到错误。 这是我尝试失败的方法:

df.withColumn('GOAL_f', F.col('loc').isin(F.col('main_list').collect())

合并代码:

w = Window.partitionBy('id').orderBy('date').rowsBetween(Window.unboundedPreceeding,-1)
df.withColumn('main_list', F.collect_set('loc').over(w))
  .withColumn('GOAL_f', F.col('loc').isin(F.col('main_list').collect())

您可以反转查询,不是询问值是否在某物中,而是询问某物是否包含该值。

示例:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    data = [
        {"loc": "L1", "main_list": ["L1", "L2"]},
        {"loc": "L1", "main_list": ["L2"]},
    ]
    df = spark.createDataFrame(data=data)
    df = df.withColumn(
        "GOAL_f",
        F.when(F.array_contains(F.col("main_list"), F.col("loc")), 1).otherwise(0),
    )

结果:

+---+---------+------+
|loc|main_list|GOAL_f|
+---+---------+------+
|L1 |[L1, L2] |1     |
|L1 |[L2]     |0     |
+---+---------+------+