从 pyspark 中的文本文件读取键值对
read key value pairs from a text file in pyspark
给定 superset
和 superset2
输入文本文件。 Superset
拥有所有必需的 headers (keys
)。
superset2
文件中的行可能缺少 header 中某些列的值,需要用 $
个字符填充缺失值。
一个。超集:
a|b|c|d|e|f|g|h|i|j
乙。超集 2:
a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4
预期输出:
a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4
将 2 个文件读入 Dataframes 并:
- 获取第一个数据帧的键(列)列表
- 对包含数据的第二个数据帧进行一些转换,首先使用
,
拆分值,然后使用 transform
和 map_from_entries
的组合将值拆分为 :
将每一行转换为映射列的函数
- 最终在键列表 select 列和
fillna
上使用列表推导式将空值替换为 $
:
from pyspark.sql import functions as F
keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)
df = data.withColumn(
"value",
F.map_from_entries(
F.expr("""transform(
split(value , ','),
x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
)""")
)
).select(*[
F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")
df.show(truncate=False)
#+---+---+---+---+---+---+---+---+---+---+
#|a |b |c |d |e |f |g |h |i |j |
#+---+---+---+---+---+---+---+---+---+---+
#|1 |1 |$ |1 |1 |$ |$ |1 |$ |$ |
#|2 |$ |$ |2 |2 |2 |$ |2 |$ |$ |
#|$ |$ |3 |3 |3 |3 |$ |3 |$ |$ |
#|4 |4 |4 |$ |4 |4 |$ |4 |4 |4 |
#+---+---+---+---+---+---+---+---+---+---+
给定 superset
和 superset2
输入文本文件。 Superset
拥有所有必需的 headers (keys
)。
superset2
文件中的行可能缺少 header 中某些列的值,需要用 $
个字符填充缺失值。
一个。超集:
a|b|c|d|e|f|g|h|i|j
乙。超集 2:
a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4
预期输出:
a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4
将 2 个文件读入 Dataframes 并:
- 获取第一个数据帧的键(列)列表
- 对包含数据的第二个数据帧进行一些转换,首先使用
,
拆分值,然后使用transform
和map_from_entries
的组合将值拆分为:
将每一行转换为映射列的函数 - 最终在键列表 select 列和
fillna
上使用列表推导式将空值替换为$
:
from pyspark.sql import functions as F
keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)
df = data.withColumn(
"value",
F.map_from_entries(
F.expr("""transform(
split(value , ','),
x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
)""")
)
).select(*[
F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")
df.show(truncate=False)
#+---+---+---+---+---+---+---+---+---+---+
#|a |b |c |d |e |f |g |h |i |j |
#+---+---+---+---+---+---+---+---+---+---+
#|1 |1 |$ |1 |1 |$ |$ |1 |$ |$ |
#|2 |$ |$ |2 |2 |2 |$ |2 |$ |$ |
#|$ |$ |3 |3 |3 |3 |$ |3 |$ |$ |
#|4 |4 |4 |$ |4 |4 |$ |4 |4 |4 |
#+---+---+---+---+---+---+---+---+---+---+