PySpark.sql.filter 表现不佳
PySpark.sql.filter not performing as it should
我在执行以下代码时运行遇到了问题:
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
当我运行上面的代码时,df3是一个空的DataFrame。然而:
如果我将代码更改为以下,它会给出正确的结果(2 行的数据帧):
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
所以我的问题是:为什么我必须在这里创建一个临时数据框?
另外,如果我无法在我的项目部分获得 HiveContext,我该如何在现有数据框之上制作一个重复的数据框?
我在 Spark 2.0 中看到了此数据集的相同行为,但并非总是针对相同的操作。稍微不同的数据框工作正常。
df1 = spark.createDataFrame(
[(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
)
df1.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
| 3| 4| b|
+---+---+---+
df2 = df1.filter(df1.id3 == 'a')
df2.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
+---+---+---+
df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 2| 2| a| 1| 2| a|
| 2| 2| a| 2| 2| a|
+---+---+---+---+---+---+
一定有bug?不过,我还没有尝试过更高版本的 spark。您可能希望将此报告为错误。
我相信你在这里遇到的问题是一个更普遍的问题的实例,其中某些类型的 DataFrame 自连接(包括 DataFrame 与自身过滤副本的连接)可能会导致产生歧义或不正确的查询计划。
有几个与此相关的 Spark JIRA;这里有一些值得注意的:
- SPARK-15063: "filtering and joining back doesn't work" 似乎是与您报告的特定实例类型最接近的匹配项。
- SPARK-17154: "Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations" 对根本原因进行了很好的讨论。
还有其他 JIRA 工单处理这些问题的不同表现形式/方面。从上面列出的票证开始,可以通过以下 JIRA "relates to" 链接链发现这些票证。
只有在通过 DataFrame 实例引用列时(通过下标,如 df["mycol"]
,或通过字段访问,如 df.mycol
),这种歧义才会出现。可以通过别名 DataFrame 并通过别名引用列来避免这种歧义。例如,以下工作正常:
>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 4| 3| b| 3| 2| a|
| 3| 2| a| 2| 1| a|
+---+---+---+---+---+---+
我在执行以下代码时运行遇到了问题:
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
当我运行上面的代码时,df3是一个空的DataFrame。然而: 如果我将代码更改为以下,它会给出正确的结果(2 行的数据帧):
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
所以我的问题是:为什么我必须在这里创建一个临时数据框? 另外,如果我无法在我的项目部分获得 HiveContext,我该如何在现有数据框之上制作一个重复的数据框?
我在 Spark 2.0 中看到了此数据集的相同行为,但并非总是针对相同的操作。稍微不同的数据框工作正常。
df1 = spark.createDataFrame(
[(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
)
df1.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
| 3| 4| b|
+---+---+---+
df2 = df1.filter(df1.id3 == 'a')
df2.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
+---+---+---+
df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 2| 2| a| 1| 2| a|
| 2| 2| a| 2| 2| a|
+---+---+---+---+---+---+
一定有bug?不过,我还没有尝试过更高版本的 spark。您可能希望将此报告为错误。
我相信你在这里遇到的问题是一个更普遍的问题的实例,其中某些类型的 DataFrame 自连接(包括 DataFrame 与自身过滤副本的连接)可能会导致产生歧义或不正确的查询计划。
有几个与此相关的 Spark JIRA;这里有一些值得注意的:
- SPARK-15063: "filtering and joining back doesn't work" 似乎是与您报告的特定实例类型最接近的匹配项。
- SPARK-17154: "Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations" 对根本原因进行了很好的讨论。
还有其他 JIRA 工单处理这些问题的不同表现形式/方面。从上面列出的票证开始,可以通过以下 JIRA "relates to" 链接链发现这些票证。
只有在通过 DataFrame 实例引用列时(通过下标,如 df["mycol"]
,或通过字段访问,如 df.mycol
),这种歧义才会出现。可以通过别名 DataFrame 并通过别名引用列来避免这种歧义。例如,以下工作正常:
>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 4| 3| b| 3| 2| a|
| 3| 2| a| 2| 1| a|
+---+---+---+---+---+---+