PySpark 2 - 合并来自多行的记录

PySpark 2 - Combine Records from multiple rows

我有一个文本文件,里面有如下记录:

<BR>Datetime:2018.06.30^
Name:ABC^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>
<BR>Datetime:2018.05.30-EDT^
Name:DEF^
Se:4^
Machine:XXXXXXX^
InnerTrace:^
AdditionalInfo:^
<ER>

我正在尝试将其读入 spark 并处理文件,以便结果为:

Datetime     Name  Se  Machine  InnerTrace  AdditionalInfo
2018.06.30   ABC   4   XXXXXXX      
2018.05.30   DEF   4   XXXXXXX

当我尝试使用

将文件读入 spark 时
sparkSession.read.csv("filename") 

我将每一行分开,这使得很难将
之间的所有行放在一起。有什么简单的解决方法吗?

我正在使用 PySpark 2 执行此操作。

这种文件格式不是 spark 友好的格式。如果您无法修改该文件,则必须进行大量处理才能按照您想要的方式进行处理。

这是一种可能适合您的方法-

读取文件

假设您有以下 DataFrame:

df = spark.read.csv(path="filename", quote='')
df.show(truncate=False)
#+-----------------------------+
#|_c0                          |
#+-----------------------------+
#|"<BR>Datetime:2018.06.30^    |
#|Name:ABC^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#|"<BR>Datetime:2018.05.30-EDT^|
#|Name:DEF^                    |
#|Se:4^                        |
#|Machine:XXXXXXX^             |
#|InnerTrace:^                 |
#|AdditionalInfo:^             |
#|<ER>"                        |
#+-----------------------------+

添加一列将行分隔成记录组

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy("id").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("group", f.col("_c0").rlike('^"<BR>.+').cast("int"))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("group", f.sum("group").over(w)).drop("id")
df.show(truncate=False)
#+-----------------------------+-----+
#|_c0                          |group|
#+-----------------------------+-----+
#|"<BR>Datetime:2018.06.30^    |1    |
#|Name:ABC^                    |1    |
#|Se:4^                        |1    |
#|Machine:XXXXXXX^             |1    |
#|InnerTrace:^                 |1    |
#|AdditionalInfo:^             |1    |
#|<ER>"                        |1    |
#|"<BR>Datetime:2018.05.30-EDT^|2    |
#|Name:DEF^                    |2    |
#|Se:4^                        |2    |
#|Machine:XXXXXXX^             |2    |
#|InnerTrace:^                 |2    |
#|AdditionalInfo:^             |2    |
#|<ER>"                        |2    |
#+-----------------------------+-----+

使用正则表达式清理和拆分字符串

df = df.select(
    "group",
    f.regexp_replace(pattern=r'(^"<BR>|<ER>"$|\^$)', replacement='', str="_c0").alias("col")
).where(f.col("col") != '')
df = df.select("group", f.split("col", ":").alias("split"))

df.show(truncate=False)
#+-----+--------------------------+
#|group|split                     |
#+-----+--------------------------+
#|1    |[Datetime, 2018.06.30]    |
#|1    |[Name, ABC]               |
#|1    |[Se, 4]                   |
#|1    |[Machine, XXXXXXX]        |
#|1    |[InnerTrace, ]            |
#|1    |[AdditionalInfo, ]        |
#|2    |[Datetime, 2018.05.30-EDT]|
#|2    |[Name, DEF]               |
#|2    |[Se, 4]                   |
#|2    |[Machine, XXXXXXX]        |
#|2    |[InnerTrace, ]            |
#|2    |[AdditionalInfo, ]        |
#+-----+--------------------------+

从数组、groupby 和 pivot 中提取元素

df = df.select(
        "group",
        f.col("split").getItem(0).alias("key"),
        f.col("split").getItem(1).alias("value")
    )\
    .groupBy("group").pivot("key").agg(f.first("value"))\
    .drop("group")
df.show(truncate=False)
#+--------------+--------------+----------+-------+----+---+
#|AdditionalInfo|Datetime      |InnerTrace|Machine|Name|Se |
#+--------------+--------------+----------+-------+----+---+
#|              |2018.06.30    |          |XXXXXXX|ABC |4  |
#|              |2018.05.30-EDT|          |XXXXXXX|DEF |4  |
#+--------------+--------------+----------+-------+----+---+