AWS Glue 没有为 pyspark 提供一致的结果 - orderBy
AWS Glue does not give coherent result for pyspark - orderBy
当 运行 在本地使用 pyspark 时,我得到了按 BOOK_ID 排序的列表的正确结果,但是在部署 AWS Glue 作业时,书籍似乎没有被排序
root
|-- AUTHORID: integer
|-- NAME: string
|-- BOOK_LIST: array
| |-- BOOK_ID: integer
| |-- BOOK_NAME: string
from pyspark.sql import functions as F
result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
.orderBy(F.col("BOOK_ID").desc())
.groupBy("AUTHOR_ID", "NAME")
.agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
)
注意:我正在使用 pyspark 3.2.1
和 Glue 2.0
有什么建议吗
假设
尽管我设法 运行 在支持 spark 3.1
的 Glue 3.0 上完成作业,orderBy 仍然给出错误的结果
Migrating from AWS Glue 2.0 to AWS Glue 3.0
似乎能提供良好结果的解决方案是将工人数量减少到 2,这是允许的最小工人数量
The explanation is: Glue jobs may have many workers that allow parallelism, thus the orderBy can't give a correct result in contrary where we have only one worker
建议的解决方案
- 使用最小数量的工人(这不是相关的解决方案)
- 在
join
之前为每个数据帧应用 .orderBy
- 或使用
.coalesce(1)
result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
.coalesce(1)
.orderBy(F.col("BOOK_ID").desc())
.groupBy("AUTHOR_ID", "NAME")
.agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
)
这样可以获得正确的结果,但在这种情况下我们会失去性能
我正在努力简化问题,请与我合作:
让我们创建一个数据框示例:
>>> df = spark.createDataFrame([
{"book_id": 1, "author_id": 1, "name": "David", "book_name": "Kill Bill"},
{"book_id": 2, "author_id": 2, "name": "Roman", "book_name": "Dying is Hard"},
{"book_id": 3, "author_id": 3, "name": "Moshe", "book_name": "Apache Kafka The Easy Way"},
{"book_id": 4, "author_id": 1, "name": "David", "book_name": "Pyspark Is Awesome"},
{"book_id": 5, "author_id": 2, "name": "Roman", "book_name": "Playing a Piano"},
{"book_id": 6, "author_id": 3, "name": "Moshe", "book_name": "Awesome Scala"}
])
现在,这样做:
(
df
.groupBy("author_id", "name")
.agg(F.collect_list(F.struct("book_id", "book_name")).alias("data"), F.sum("book_id").alias("sorted_key"))
.orderBy(F.col("sorted_key").desc()).drop("sorted_key")
.show(10, False)
)
我得到的正是你所要求的:
+---------+-----+----------------------------------------------------+
|author_id|name |collect_list(struct(book_id, book_name)) |
+---------+-----+----------------------------------------------------+
|3 |Moshe|[{3, Apache Kafka The Easy Way}, {6, Awesome Scala}]|
|2 |Roman|[{2, Dying is Hard}, {5, Playing a Piano}] |
|1 |David|[{1, Kill Bill}, {4, Pyspark Is Awesome}] |
+---------+-----+----------------------------------------------------+
当 运行 在本地使用 pyspark 时,我得到了按 BOOK_ID 排序的列表的正确结果,但是在部署 AWS Glue 作业时,书籍似乎没有被排序
root
|-- AUTHORID: integer
|-- NAME: string
|-- BOOK_LIST: array
| |-- BOOK_ID: integer
| |-- BOOK_NAME: string
from pyspark.sql import functions as F
result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
.orderBy(F.col("BOOK_ID").desc())
.groupBy("AUTHOR_ID", "NAME")
.agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
)
注意:我正在使用 pyspark 3.2.1
和 Glue 2.0
有什么建议吗
假设
尽管我设法 运行 在支持 spark 3.1
的 Glue 3.0 上完成作业,orderBy 仍然给出错误的结果
Migrating from AWS Glue 2.0 to AWS Glue 3.0
似乎能提供良好结果的解决方案是将工人数量减少到 2,这是允许的最小工人数量
The explanation is: Glue jobs may have many workers that allow parallelism, thus the orderBy can't give a correct result in contrary where we have only one worker
建议的解决方案
- 使用最小数量的工人(这不是相关的解决方案)
- 在
join
之前为每个数据帧应用 .orderBy
- 或使用
.coalesce(1)
result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
.coalesce(1)
.orderBy(F.col("BOOK_ID").desc())
.groupBy("AUTHOR_ID", "NAME")
.agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
)
这样可以获得正确的结果,但在这种情况下我们会失去性能
我正在努力简化问题,请与我合作:
让我们创建一个数据框示例:
>>> df = spark.createDataFrame([
{"book_id": 1, "author_id": 1, "name": "David", "book_name": "Kill Bill"},
{"book_id": 2, "author_id": 2, "name": "Roman", "book_name": "Dying is Hard"},
{"book_id": 3, "author_id": 3, "name": "Moshe", "book_name": "Apache Kafka The Easy Way"},
{"book_id": 4, "author_id": 1, "name": "David", "book_name": "Pyspark Is Awesome"},
{"book_id": 5, "author_id": 2, "name": "Roman", "book_name": "Playing a Piano"},
{"book_id": 6, "author_id": 3, "name": "Moshe", "book_name": "Awesome Scala"}
])
现在,这样做:
(
df
.groupBy("author_id", "name")
.agg(F.collect_list(F.struct("book_id", "book_name")).alias("data"), F.sum("book_id").alias("sorted_key"))
.orderBy(F.col("sorted_key").desc()).drop("sorted_key")
.show(10, False)
)
我得到的正是你所要求的:
+---------+-----+----------------------------------------------------+
|author_id|name |collect_list(struct(book_id, book_name)) |
+---------+-----+----------------------------------------------------+
|3 |Moshe|[{3, Apache Kafka The Easy Way}, {6, Awesome Scala}]|
|2 |Roman|[{2, Dying is Hard}, {5, Playing a Piano}] |
|1 |David|[{1, Kill Bill}, {4, Pyspark Is Awesome}] |
+---------+-----+----------------------------------------------------+