根据自身的过滤版本加入数据框

Joining a dataframe against a filtered version of itself

我有两个数据帧,leftright。后者 rightleft 的子集,因此 left 包含 right 包含的所有行。我想使用 right 通过执行简单的“left_anti”连接从 left 中删除冗余行。

我发现如果我在右侧使用 left 的过滤版本,连接将不起作用。只有当我从头开始重建正确的数据帧时它才有效。

from pyspark.sql import Row, SparkSession

import pyspark.sql.types as t

schema = t.StructType(
    [
        t.StructField("street_number", t.IntegerType()),
        t.StructField("street_name", t.StringType()),
        t.StructField("lower_street_number", t.IntegerType()),
        t.StructField("upper_street_number", t.IntegerType()),
    ]
)
data =  [
    # Row that conflicts w/ range row, and should be removed
    Row(
        street_number=123,
        street_name="Main St",
        lower_street_number=None,
        upper_street_number=None,
    ),
    # Range row
    Row(
        street_number=None,
        street_name="Main St",
        lower_street_number=120,
        upper_street_number=130,
    ),
]


def join_files(left_side, right_side):
    join_condition = [
      (
        (right_side.lower_street_number.isNotNull())
        & (right_side.upper_street_number.isNotNull())
        & (right_side.lower_street_number <= left_side.street_number)
        & (right_side.upper_street_number >= left_side.street_number)
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)

right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected

您可以为 DF 起别名:


import pyspark.sql.functions as F


def join_files(left_side, right_side):
    join_condition = [
      (
        (F.col("right_side.lower_street_number").isNotNull())
        & (F.col("right_side.upper_street_number").isNotNull())
        & (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
        & (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")


right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected

不知道您使用的是哪个 pyspark 版本,但 pyspark==3.0.1,我收到以下解释性错误。

AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;