Databricks Pyspark - 组相关行
Databricks Pyspark - Group related rows
我正在 Azure Databricks 中解析 EDI 文件。输入文件中的行根据它们出现的顺序与其他行相关。我需要的是一种将相关行组合在一起的方法。
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
data = [
('book000book title',),
('auth001first author',),
('auth002second author',),
('book003another book',),
('auth004third author',),
]
schema = T.StructType([T.StructField('Line', T.StringType())])
books = spark.createDataFrame(data, schema)
books = (books
.withColumn('RecordType', F.substring(F.col('Line'), 1, 4))
.withColumn('Sequence', F.substring(F.col('Line'), 5, 3))
.withColumn('Title', F.when(F.col('RecordType') == 'book', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
.withColumn('Author', F.when(F.col('RecordType') == 'auth', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
.drop('Line')
)
window = Window.orderBy('Sequence')
books = (books
.withColumn('BookID', F.row_number().over(window))
)
books.show()
在 book
记录之后,后续行是该书的作者,因此这些作者的 BookID
应该与他们之前的最后一个 book
记录相同。
这个输出是:
+----------+--------+------------+-------------+------+
|RecordType|Sequence| Title| Author|BookID|
+----------+--------+------------+-------------+------+
| book| 000| book title| null| 1|
| auth| 001| null| first author| 2|
| auth| 002| null|second author| 3|
| book| 003|another book| null| 4|
| auth| 004| null| third author| 5|
+----------+--------+------------+-------------+------+
我需要为每位作者分配正确的 BookID
,以便我能够对他们进行分组。我正在寻找的输出是:
+----------+--------+------------+-------------+------+
|RecordType|Sequence| Title| Author|BookID|
+----------+--------+------------+-------------+------+
| book| 000| book title| null| 1|
| auth| 001| null| first author| 1|
| auth| 002| null|second author| 1|
| book| 003|another book| null| 2|
| auth| 004| null| third author| 2|
+----------+--------+------------+-------------+------+
我还没弄明白。任何帮助将不胜感激。
您可以对按 sequence
排序的 window 使用条件总和聚合,如下所示:
books = (books
.withColumn('BookID', F.sum(F.when(F.col("RecordType") == "book", 1)).over(window))
)
books.show()
#+----------+--------+------------+-------------+------+
#|RecordType|Sequence| Title| Author|BookID|
#+----------+--------+------------+-------------+------+
#| book| 000| book title| null| 1|
#| auth| 001| null| first author| 1|
#| auth| 002| null|second author| 1|
#| book| 003|another book| null| 2|
#| auth| 004| null| third author| 2|
#+----------+--------+------------+-------------+------+
我正在 Azure Databricks 中解析 EDI 文件。输入文件中的行根据它们出现的顺序与其他行相关。我需要的是一种将相关行组合在一起的方法。
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
data = [
('book000book title',),
('auth001first author',),
('auth002second author',),
('book003another book',),
('auth004third author',),
]
schema = T.StructType([T.StructField('Line', T.StringType())])
books = spark.createDataFrame(data, schema)
books = (books
.withColumn('RecordType', F.substring(F.col('Line'), 1, 4))
.withColumn('Sequence', F.substring(F.col('Line'), 5, 3))
.withColumn('Title', F.when(F.col('RecordType') == 'book', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
.withColumn('Author', F.when(F.col('RecordType') == 'auth', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
.drop('Line')
)
window = Window.orderBy('Sequence')
books = (books
.withColumn('BookID', F.row_number().over(window))
)
books.show()
在 book
记录之后,后续行是该书的作者,因此这些作者的 BookID
应该与他们之前的最后一个 book
记录相同。
这个输出是:
+----------+--------+------------+-------------+------+
|RecordType|Sequence| Title| Author|BookID|
+----------+--------+------------+-------------+------+
| book| 000| book title| null| 1|
| auth| 001| null| first author| 2|
| auth| 002| null|second author| 3|
| book| 003|another book| null| 4|
| auth| 004| null| third author| 5|
+----------+--------+------------+-------------+------+
我需要为每位作者分配正确的 BookID
,以便我能够对他们进行分组。我正在寻找的输出是:
+----------+--------+------------+-------------+------+
|RecordType|Sequence| Title| Author|BookID|
+----------+--------+------------+-------------+------+
| book| 000| book title| null| 1|
| auth| 001| null| first author| 1|
| auth| 002| null|second author| 1|
| book| 003|another book| null| 2|
| auth| 004| null| third author| 2|
+----------+--------+------------+-------------+------+
我还没弄明白。任何帮助将不胜感激。
您可以对按 sequence
排序的 window 使用条件总和聚合,如下所示:
books = (books
.withColumn('BookID', F.sum(F.when(F.col("RecordType") == "book", 1)).over(window))
)
books.show()
#+----------+--------+------------+-------------+------+
#|RecordType|Sequence| Title| Author|BookID|
#+----------+--------+------------+-------------+------+
#| book| 000| book title| null| 1|
#| auth| 001| null| first author| 1|
#| auth| 002| null|second author| 1|
#| book| 003|another book| null| 2|
#| auth| 004| null| third author| 2|
#+----------+--------+------------+-------------+------+