根据自身的过滤版本加入数据框
Joining a dataframe against a filtered version of itself
我有两个数据帧,left
和 right
。后者 right
是 left
的子集,因此 left
包含 right
包含的所有行。我想使用 right
通过执行简单的“left_anti”连接从 left
中删除冗余行。
我发现如果我在右侧使用 left
的过滤版本,连接将不起作用。只有当我从头开始重建正确的数据帧时它才有效。
- 这是怎么回事?
- 是否有不涉及重新创建正确数据框的解决方法?
from pyspark.sql import Row, SparkSession
import pyspark.sql.types as t
schema = t.StructType(
[
t.StructField("street_number", t.IntegerType()),
t.StructField("street_name", t.StringType()),
t.StructField("lower_street_number", t.IntegerType()),
t.StructField("upper_street_number", t.IntegerType()),
]
)
data = [
# Row that conflicts w/ range row, and should be removed
Row(
street_number=123,
street_name="Main St",
lower_street_number=None,
upper_street_number=None,
),
# Range row
Row(
street_number=None,
street_name="Main St",
lower_street_number=120,
upper_street_number=130,
),
]
def join_files(left_side, right_side):
join_condition = [
(
(right_side.lower_street_number.isNotNull())
& (right_side.upper_street_number.isNotNull())
& (right_side.lower_street_number <= left_side.street_number)
& (right_side.upper_street_number >= left_side.street_number)
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)
right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
您可以为 DF 起别名:
import pyspark.sql.functions as F
def join_files(left_side, right_side):
join_condition = [
(
(F.col("right_side.lower_street_number").isNotNull())
& (F.col("right_side.upper_street_number").isNotNull())
& (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
& (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")
right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
不知道您使用的是哪个 pyspark 版本,但 pyspark==3.0.1
,我收到以下解释性错误。
AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;
我有两个数据帧,left
和 right
。后者 right
是 left
的子集,因此 left
包含 right
包含的所有行。我想使用 right
通过执行简单的“left_anti”连接从 left
中删除冗余行。
我发现如果我在右侧使用 left
的过滤版本,连接将不起作用。只有当我从头开始重建正确的数据帧时它才有效。
- 这是怎么回事?
- 是否有不涉及重新创建正确数据框的解决方法?
from pyspark.sql import Row, SparkSession
import pyspark.sql.types as t
schema = t.StructType(
[
t.StructField("street_number", t.IntegerType()),
t.StructField("street_name", t.StringType()),
t.StructField("lower_street_number", t.IntegerType()),
t.StructField("upper_street_number", t.IntegerType()),
]
)
data = [
# Row that conflicts w/ range row, and should be removed
Row(
street_number=123,
street_name="Main St",
lower_street_number=None,
upper_street_number=None,
),
# Range row
Row(
street_number=None,
street_name="Main St",
lower_street_number=120,
upper_street_number=130,
),
]
def join_files(left_side, right_side):
join_condition = [
(
(right_side.lower_street_number.isNotNull())
& (right_side.upper_street_number.isNotNull())
& (right_side.lower_street_number <= left_side.street_number)
& (right_side.upper_street_number >= left_side.street_number)
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)
right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
您可以为 DF 起别名:
import pyspark.sql.functions as F
def join_files(left_side, right_side):
join_condition = [
(
(F.col("right_side.lower_street_number").isNotNull())
& (F.col("right_side.upper_street_number").isNotNull())
& (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
& (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")
right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
不知道您使用的是哪个 pyspark 版本,但 pyspark==3.0.1
,我收到以下解释性错误。
AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;