Pyspark 将 key-value 对转换为列
Pyspark transform key-value pairs into columns
我有一个 json 文件,其中包含如下所示的数据:
"Url": "https://sample.com", "Method": "POST", "Headers": [{"Key": "accesstoken", "Value": ["123"]}, {"Key": "id", "Value": ["abc"]}, {"Key": "context", "Value": ["sample"]}]
阅读 json 时,我将架构明确定义为:
schema = StructType(
[
StructField('Url', StringType(), True),
StructField('Method', StringType(), True),
StructField("Headers",ArrayType(StructType([
StructField('Key', StringType(), True),
StructField("Value",ArrayType(StringType()),True),
]),True),True)
]
)
目标是将 Key-Value 数据读取为列而不是行。
Url
Method
accesstoken
id
context
https://sample.com
POST
123
abc
sample
展开“Headers”列只会将其转换为多行。数据的另一个问题是,我的键值对值存储在 2 个单独的对中!
,而不是文字 key-value 对(例如“accesstoken”:“123”)
我尝试遍历值以首先创建地图,但我无法遍历“Headers”列。
df_map = df.withColumn('map', to_json(array(*[create_map(element.Key, element.Value) for element in df.Headers])))
我还尝试将“Headers”列读取为 MapType(StringType, ArrayType(StringType)),但它无法读取当时的值。当我这样做时它显示为 null。
有什么办法可以实现吗?我是否必须以纯文本和 pre-process 数据而不是数据框的形式读取数据?
您的方法是正确的,但要连接您的地图必须使用 reduce
表达式:
from pyspark.sql.types import *
import pyspark.sql.functions as f
# [...] Your dataframe initialization
df = df.select('Url', 'Method', f.explode(f.expr('REDUCE(Headers, cast(map() as map<string, array<string>>), (acc, el) -> map_concat(acc, map(el.Key, el.Value)))')))
# Transform key:value into columns
df_pivot = (df
.groupBy('Url', 'Method')
.pivot('key')
.agg(f.first('value')))
array_columns = [column for column, _type in df_pivot.dtypes if _type.startswith('array')]
df_pivot = (df_pivot
.withColumn('zip', f.explode(f.arrays_zip(*array_columns)))
.select('Url', 'Method', 'zip.*'))
df_pivot.show(truncate=False)
输出
+------------------+------+-----------+-------+---+
|Url |Method|accesstoken|context|id |
+------------------+------+-----------+-------+---+
|https://sample.com|POST |123 |sample |abc|
+------------------+------+-----------+-------+---+
我有一个 json 文件,其中包含如下所示的数据:
"Url": "https://sample.com", "Method": "POST", "Headers": [{"Key": "accesstoken", "Value": ["123"]}, {"Key": "id", "Value": ["abc"]}, {"Key": "context", "Value": ["sample"]}]
阅读 json 时,我将架构明确定义为:
schema = StructType(
[
StructField('Url', StringType(), True),
StructField('Method', StringType(), True),
StructField("Headers",ArrayType(StructType([
StructField('Key', StringType(), True),
StructField("Value",ArrayType(StringType()),True),
]),True),True)
]
)
目标是将 Key-Value 数据读取为列而不是行。
Url | Method | accesstoken | id | context |
---|---|---|---|---|
https://sample.com | POST | 123 | abc | sample |
展开“Headers”列只会将其转换为多行。数据的另一个问题是,我的键值对值存储在 2 个单独的对中!
,而不是文字 key-value 对(例如“accesstoken”:“123”)我尝试遍历值以首先创建地图,但我无法遍历“Headers”列。
df_map = df.withColumn('map', to_json(array(*[create_map(element.Key, element.Value) for element in df.Headers])))
我还尝试将“Headers”列读取为 MapType(StringType, ArrayType(StringType)),但它无法读取当时的值。当我这样做时它显示为 null。
有什么办法可以实现吗?我是否必须以纯文本和 pre-process 数据而不是数据框的形式读取数据?
您的方法是正确的,但要连接您的地图必须使用 reduce
表达式:
from pyspark.sql.types import *
import pyspark.sql.functions as f
# [...] Your dataframe initialization
df = df.select('Url', 'Method', f.explode(f.expr('REDUCE(Headers, cast(map() as map<string, array<string>>), (acc, el) -> map_concat(acc, map(el.Key, el.Value)))')))
# Transform key:value into columns
df_pivot = (df
.groupBy('Url', 'Method')
.pivot('key')
.agg(f.first('value')))
array_columns = [column for column, _type in df_pivot.dtypes if _type.startswith('array')]
df_pivot = (df_pivot
.withColumn('zip', f.explode(f.arrays_zip(*array_columns)))
.select('Url', 'Method', 'zip.*'))
df_pivot.show(truncate=False)
输出
+------------------+------+-----------+-------+---+
|Url |Method|accesstoken|context|id |
+------------------+------+-----------+-------+---+
|https://sample.com|POST |123 |sample |abc|
+------------------+------+-----------+-------+---+