Databricks Pyspark - 组相关行

Databricks Pyspark - Group related rows

我正在 Azure Databricks 中解析 EDI 文件。输入文件中的行根据它们出现的顺序与其他行相关。我需要的是一种将相关行组合在一起的方法。

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

data = [
    ('book000book title',),
    ('auth001first author',),
    ('auth002second author',),
    ('book003another book',),
    ('auth004third author',),
]

schema = T.StructType([T.StructField('Line', T.StringType())])
books = spark.createDataFrame(data, schema)

books = (books
        .withColumn('RecordType', F.substring(F.col('Line'), 1, 4))
        .withColumn('Sequence', F.substring(F.col('Line'), 5, 3))
        .withColumn('Title', F.when(F.col('RecordType') == 'book', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .withColumn('Author', F.when(F.col('RecordType') == 'auth', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .drop('Line')
        )

window = Window.orderBy('Sequence')
books = (books
         .withColumn('BookID', F.row_number().over(window))
        )

books.show()

book 记录之后,后续行是该书的作者,因此这些作者的 BookID 应该与他们之前的最后一个 book 记录相同。

这个输出是:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     2|
|      auth|     002|        null|second author|     3|
|      book|     003|another book|         null|     4|
|      auth|     004|        null| third author|     5|
+----------+--------+------------+-------------+------+

我需要为每位作者分配正确的 BookID,以便我能够对他们进行分组。我正在寻找的输出是:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     1|
|      auth|     002|        null|second author|     1|
|      book|     003|another book|         null|     2|
|      auth|     004|        null| third author|     2|
+----------+--------+------------+-------------+------+

我还没弄明白。任何帮助将不胜感激。

您可以对按 sequence 排序的 window 使用条件总和聚合,如下所示:

books = (books
         .withColumn('BookID', F.sum(F.when(F.col("RecordType") == "book", 1)).over(window))
         )

books.show()
#+----------+--------+------------+-------------+------+
#|RecordType|Sequence|       Title|       Author|BookID|
#+----------+--------+------------+-------------+------+
#|      book|     000|  book title|         null|     1|
#|      auth|     001|        null| first author|     1|
#|      auth|     002|        null|second author|     1|
#|      book|     003|another book|         null|     2|
#|      auth|     004|        null| third author|     2|
#+----------+--------+------------+-------------+------+