AWS Glue 没有为 pyspark 提供一致的结果 - orderBy

AWS Glue does not give coherent result for pyspark - orderBy

当 运行 在本地使用 pyspark 时,我得到了按 BOOK_ID 排序的列表的正确结果,但是在部署 AWS Glue 作业时,书籍似乎没有被排序

root
 |-- AUTHORID: integer
 |-- NAME: string 
 |-- BOOK_LIST: array 
 |    |-- BOOK_ID: integer 
 |    |-- BOOK_NAME: string 
    from pyspark.sql import functions as F
    
    result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

注意:我正在使用 pyspark 3.2.1Glue 2.0

有什么建议吗

假设

尽管我设法 运行 在支持 spark 3.1 的 Glue 3.0 上完成作业,orderBy 仍然给出错误的结果

Migrating from AWS Glue 2.0 to AWS Glue 3.0

似乎能提供良好结果的解决方案是将工人数量减少到 2,这是允许的最小工人数量

The explanation is: Glue jobs may have many workers that allow parallelism, thus the orderBy can't give a correct result in contrary where we have only one worker

建议的解决方案

  • 使用最小数量的工人(这不是相关的解决方案)
  • join
  • 之前为每个数据帧应用 .orderBy
  • 或使用.coalesce(1)
 result = (df_authors.join(df_books, on=["AUTHOR_ID"], how="left")
              .coalesce(1)
              .orderBy(F.col("BOOK_ID").desc())
              .groupBy("AUTHOR_ID", "NAME")
              .agg(F.collect_list(F.struct("BOOK_ID", "BOOK_NAME")))
              )

这样可以获得正确的结果,但在这种情况下我们会失去性能

我正在努力简化问题,请与我合作:

让我们创建一个数据框示例:

>>> df = spark.createDataFrame([
    {"book_id": 1, "author_id": 1, "name": "David", "book_name": "Kill Bill"},
    {"book_id": 2, "author_id": 2, "name": "Roman", "book_name": "Dying is Hard"},
    {"book_id": 3, "author_id": 3, "name": "Moshe", "book_name": "Apache Kafka The Easy Way"},
    {"book_id": 4, "author_id": 1, "name": "David", "book_name": "Pyspark Is Awesome"},
    {"book_id": 5, "author_id": 2, "name": "Roman", "book_name": "Playing a Piano"},
    {"book_id": 6, "author_id": 3, "name": "Moshe", "book_name": "Awesome Scala"}
 ])

现在,这样做:

(
df
.groupBy("author_id", "name")
.agg(F.collect_list(F.struct("book_id", "book_name")).alias("data"), F.sum("book_id").alias("sorted_key"))
.orderBy(F.col("sorted_key").desc()).drop("sorted_key")
.show(10, False)
)

我得到的正是你所要求的:

+---------+-----+----------------------------------------------------+
|author_id|name |collect_list(struct(book_id, book_name))            |
+---------+-----+----------------------------------------------------+
|3        |Moshe|[{3, Apache Kafka The Easy Way}, {6, Awesome Scala}]|
|2        |Roman|[{2, Dying is Hard}, {5, Playing a Piano}]          |
|1        |David|[{1, Kill Bill}, {4, Pyspark Is Awesome}]           |
+---------+-----+----------------------------------------------------+