pyspark collect_list 与 groupby 和 row_number 问题:每次我调用 show() 时行的顺序都会改变
pyspark collect_list with groupby and row_number issue: order of rows changes each time I call show()
下面列出的行为是预期的还是错误??
创建一个DF
data_list = [
['Blue', 2, 3, 1],
['Green', 1, 5, 4],
['Green', 4, 1, 3],
['Blue', 2, 4, 1],
['Green', 1, 5, 2]
]
all_cols = ['COLOR','COL1','COL2','COL3']
df = sqlContext.createDataFrame(data_list, all_cols)
df.show()
+-----+----+----+----+
|COLOR|COL1|COL2|COL3|
+-----+----+----+----+
| Blue| 2| 3| 1|
|Green| 1| 5| 4|
|Green| 4| 1| 3|
| Blue| 2| 4| 1|
|Green| 1| 5| 2|
+-----+----+----+----+
添加一个ROW_ID
df.createOrReplaceTempView('df')
df = spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df')
df.printSchema()
root
|-- ROW_ID: integer (nullable = true)
|-- COLOR: string (nullable = true)
|-- COL1: long (nullable = true)
|-- COL2: long (nullable = true)
|-- COL3: long (nullable = true)
df.show()
+------+-----+----+----+----+
|ROW_ID|COLOR|COL1|COL2|COL3|
+------+-----+----+----+----+
| 1|Green| 4| 1| 3|
| 2| Blue| 2| 4| 1|
| 3|Green| 1| 5| 2|
| 4| Blue| 2| 3| 1|
| 5|Green| 1| 5| 4|
+------+-----+----+----+----+
通过在第一个上应用 'groupby' 创建另一个 DF:
grp_df = df.groupby(col_grp_by).agg(collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
+-----+---------+
|COLOR| IDX_VAL|
+-----+---------+
|Green|[1, 3, 5]|
| Blue| [2, 4]|
+-----+---------+
grp_df.printSchema()
root
|-- COLOR: string (nullable = true)
|-- IDX_VAL: array (nullable = true)
| |-- element: integer (containsNull = true)
如果我再执行'grp_df.show()',请看下面
'IDX_VAL 列中的列表元素已更改!!!
grp_df.show()
+-----+---------+
|COLOR| IDX_VAL|
+-----+---------+
|Green|[2, 3, 5]|
| Blue| [1, 4]|
+-----+---------+
正如我在 中提到的,分组数据帧中的问题实际上是原始数据帧看似随机排序的副作用。请注意,在您的示例中,ROW_ID
不会按 COLOR
对 DataFrame 进行排序,即使那是您指定它要做的, 或者是 ?
问题是您在 order by "COLOR"
中的 "COLOR"
周围有引号。这使它成为字符串 "COLOR"
而不是列 COLOR
.
考虑执行计划的差异:
spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#350, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#350], [COLOR ASC NULLS FIRST]
# +- *Sort [COLOR ASC NULLS FIRST], false, 0
# +- Exchange SinglePartition
# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]
spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#342, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR#326 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#342], [COLOR#326 ASC NULLS FIRST]
# +- *Sort [COLOR#326 ASC NULLS FIRST], false, 0
# +- Exchange SinglePartition
# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]
您可以看到第二个(正确的)在 Window 规范定义中有 COLOR#326
- 这表明它正在使用订单列。
因此,如果您删除引号,您应该会看到更一致的结果:
df = spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df')
df.show()
#+------+-----+----+----+----+
#|ROW_ID|COLOR|COL1|COL2|COL3|
#+------+-----+----+----+----+
#| 1| Blue| 2| 3| 1|
#| 2| Blue| 2| 4| 1|
#| 3|Green| 1| 5| 4|
#| 4|Green| 4| 1| 3|
#| 5|Green| 1| 5| 2|
#+------+-----+----+----+----+
注意DataFrame实际上是按COLOR
排序的。
grp_df = df.groupby('COLOR').agg(f.collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
#+-----+---------+
#|COLOR| IDX_VAL|
#+-----+---------+
#| Blue| [1, 2]|
#|Green|[3, 4, 5]|
#+-----+---------+
但是,您可能仍会在同一个 COLOR
中得到不一致的排序,因为您没有指定在按 COLOR
排序后如何打破平局。这不会影响这种特定情况下的分组值,但建议您确保排序在所有情况下都是确定的。
对于您的数据,您可以通过以下方式获得确定性结果:
df = spark.sql(
'select row_number() over (order by COLOR, COL1, COL2, COL3) as ROW_ID, * from df'
)
下面列出的行为是预期的还是错误??
创建一个DF
data_list = [
['Blue', 2, 3, 1],
['Green', 1, 5, 4],
['Green', 4, 1, 3],
['Blue', 2, 4, 1],
['Green', 1, 5, 2]
]
all_cols = ['COLOR','COL1','COL2','COL3']
df = sqlContext.createDataFrame(data_list, all_cols)
df.show()
+-----+----+----+----+
|COLOR|COL1|COL2|COL3|
+-----+----+----+----+
| Blue| 2| 3| 1|
|Green| 1| 5| 4|
|Green| 4| 1| 3|
| Blue| 2| 4| 1|
|Green| 1| 5| 2|
+-----+----+----+----+
添加一个ROW_ID
df.createOrReplaceTempView('df')
df = spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df')
df.printSchema()
root
|-- ROW_ID: integer (nullable = true)
|-- COLOR: string (nullable = true)
|-- COL1: long (nullable = true)
|-- COL2: long (nullable = true)
|-- COL3: long (nullable = true)
df.show()
+------+-----+----+----+----+
|ROW_ID|COLOR|COL1|COL2|COL3|
+------+-----+----+----+----+
| 1|Green| 4| 1| 3|
| 2| Blue| 2| 4| 1|
| 3|Green| 1| 5| 2|
| 4| Blue| 2| 3| 1|
| 5|Green| 1| 5| 4|
+------+-----+----+----+----+
通过在第一个上应用 'groupby' 创建另一个 DF:
grp_df = df.groupby(col_grp_by).agg(collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
+-----+---------+
|COLOR| IDX_VAL|
+-----+---------+
|Green|[1, 3, 5]|
| Blue| [2, 4]|
+-----+---------+
grp_df.printSchema()
root
|-- COLOR: string (nullable = true)
|-- IDX_VAL: array (nullable = true)
| |-- element: integer (containsNull = true)
如果我再执行'grp_df.show()',请看下面
'IDX_VAL 列中的列表元素已更改!!!
grp_df.show()
+-----+---------+
|COLOR| IDX_VAL|
+-----+---------+
|Green|[2, 3, 5]|
| Blue| [1, 4]|
+-----+---------+
正如我在 ROW_ID
不会按 COLOR
对 DataFrame 进行排序,即使那是您指定它要做的, 或者是 ?
问题是您在 order by "COLOR"
中的 "COLOR"
周围有引号。这使它成为字符串 "COLOR"
而不是列 COLOR
.
考虑执行计划的差异:
spark.sql('select row_number() over (order by "COLOR") as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#350, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#350], [COLOR ASC NULLS FIRST]
# +- *Sort [COLOR ASC NULLS FIRST], false, 0
# +- Exchange SinglePartition
# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]
spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df').explain()
#== Physical Plan ==
#*Project [ROW_ID#342, COLOR#326, COL1#327L, COL2#328L, COL3#329L]
#+- Window [row_number() windowspecdefinition(COLOR#326 ASC NULLS FIRST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS ROW_ID#342], [COLOR#326 ASC NULLS FIRST]
# +- *Sort [COLOR#326 ASC NULLS FIRST], false, 0
# +- Exchange SinglePartition
# +- Scan ExistingRDD[COLOR#326,COL1#327L,COL2#328L,COL3#329L]
您可以看到第二个(正确的)在 Window 规范定义中有 COLOR#326
- 这表明它正在使用订单列。
因此,如果您删除引号,您应该会看到更一致的结果:
df = spark.sql('select row_number() over (order by COLOR) as ROW_ID, * from df')
df.show()
#+------+-----+----+----+----+
#|ROW_ID|COLOR|COL1|COL2|COL3|
#+------+-----+----+----+----+
#| 1| Blue| 2| 3| 1|
#| 2| Blue| 2| 4| 1|
#| 3|Green| 1| 5| 4|
#| 4|Green| 4| 1| 3|
#| 5|Green| 1| 5| 2|
#+------+-----+----+----+----+
注意DataFrame实际上是按COLOR
排序的。
grp_df = df.groupby('COLOR').agg(f.collect_list('ROW_ID').alias('IDX_VAL'))
grp_df.show()
#+-----+---------+
#|COLOR| IDX_VAL|
#+-----+---------+
#| Blue| [1, 2]|
#|Green|[3, 4, 5]|
#+-----+---------+
但是,您可能仍会在同一个 COLOR
中得到不一致的排序,因为您没有指定在按 COLOR
排序后如何打破平局。这不会影响这种特定情况下的分组值,但建议您确保排序在所有情况下都是确定的。
对于您的数据,您可以通过以下方式获得确定性结果:
df = spark.sql(
'select row_number() over (order by COLOR, COL1, COL2, COL3) as ROW_ID, * from df'
)