从 pyspark 中的文本文件读取键值对

read key value pairs from a text file in pyspark

给定 supersetsuperset2 输入文本文件。 Superset 拥有所有必需的 headers (keys)。

superset2 文件中的行可能缺少 header 中某些列的值,需要用 $ 个字符填充缺失值。

一个。超集:

a|b|c|d|e|f|g|h|i|j

乙。超集 2:

a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4

预期输出:

a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4

将 2 个文件读入 Dataframes 并:

  1. 获取第一个数据帧的键(列)列表
  2. 对包含数据的第二个数据帧进行一些转换,首先使用 , 拆分值,然后使用 transformmap_from_entries 的组合将值拆分为 :将每一行转换为映射列的函数
  3. 最终在键列表 select 列和 fillna 上使用列表推导式将空值替换为 $:
from pyspark.sql import functions as F


keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)

df = data.withColumn(
    "value",
    F.map_from_entries(
        F.expr("""transform(
                        split(value , ','), 
                        x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
        )""")
    )
).select(*[
    F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")

df.show(truncate=False)
#+---+---+---+---+---+---+---+---+---+---+
#|a  |b  |c  |d  |e  |f  |g  |h  |i  |j  |
#+---+---+---+---+---+---+---+---+---+---+
#|1  |1  |$  |1  |1  |$  |$  |1  |$  |$  |
#|2  |$  |$  |2  |2  |2  |$  |2  |$  |$  |
#|$  |$  |3  |3  |3  |3  |$  |3  |$  |$  |
#|4  |4  |4  |$  |4  |4  |$  |4  |4  |4  |
#+---+---+---+---+---+---+---+---+---+---+