将列添加到数据框
Adding column to dataFrame
我需要根据 json 来自其他列的数据向 DataFrame (DynamicFrame) 添加新列,最好的方法是什么?
架构:
'id' 'name' 'customJson'
--------------------------
1 ,John, {'key':'lastName','value':'Smith'}
之后:
'id' 'name' 'lastName' 'customJson'
-----------------------------------
1, John, Smith, {'key':'lastName','value':'Smith'}
尝试使用 withColumn 但不确定如何根据 json 值
获取和计算数据
DynamicFrames 不会让您进行这种级别的转换,因此您需要使用 .toDF()
方法将其转换为 PySpark DataFrame,然后在转换后,.fromDF()
.
下面是一个关于如何使用 PySpark DataFrame 解析它的例子:
创建一个 DataFrame 作为例子
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Parsing JSON") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "{'key':'lastName','value':'Smith'}")],['id','name','customJson'])
现在正在解析 JSON 列
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
schema = StructType([StructField('key', StringType()),StructField('value', StringType())])
df = df.select(
df.id,
df.name,
from_json(df.customJson, schema).value.alias('lastName'),
df.customJson
)
如果你愿意,欢迎运行this notebook。
这是一些文档:
https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.from_json.html
[编辑]
如果您的 Json 表示一个元素数组(如下面的评论),您需要在模式中包含 ArrayType
,然后使用 explode
函数来展平您需要的值。
以创建 DataFrame 为例
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "[{'key':'lastName','value':'Smith'},{'key':'lastName','value':'Silva'}]")],['id','name','customJson'])
正在解析 Json 表示元素数组
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import from_json, explode
schema = ArrayType(StructType([StructField('key', StringType()),StructField('value', StringType())]))
df = df.select(
df.id,
df.name,
explode(
from_json(df.customJson, schema)
).alias('parsedJson'),
df.customJson
)
df.select(
df.id,
df.name,
df.parsedJson.value.alias("lastName"),
).toPandas().to_markdown()
# to_markdown properly formats the DataFrame as markdown to print below as table
id
name
lastName
0
1
John
Smith
1
1
John
Silva
我需要根据 json 来自其他列的数据向 DataFrame (DynamicFrame) 添加新列,最好的方法是什么?
架构:
'id' 'name' 'customJson'
--------------------------
1 ,John, {'key':'lastName','value':'Smith'}
之后:
'id' 'name' 'lastName' 'customJson'
-----------------------------------
1, John, Smith, {'key':'lastName','value':'Smith'}
尝试使用 withColumn 但不确定如何根据 json 值
获取和计算数据DynamicFrames 不会让您进行这种级别的转换,因此您需要使用 .toDF()
方法将其转换为 PySpark DataFrame,然后在转换后,.fromDF()
.
下面是一个关于如何使用 PySpark DataFrame 解析它的例子:
创建一个 DataFrame 作为例子
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Parsing JSON") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "{'key':'lastName','value':'Smith'}")],['id','name','customJson'])
现在正在解析 JSON 列
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
schema = StructType([StructField('key', StringType()),StructField('value', StringType())])
df = df.select(
df.id,
df.name,
from_json(df.customJson, schema).value.alias('lastName'),
df.customJson
)
如果你愿意,欢迎运行this notebook。
这是一些文档: https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.from_json.html
[编辑]
如果您的 Json 表示一个元素数组(如下面的评论),您需要在模式中包含 ArrayType
,然后使用 explode
函数来展平您需要的值。
以创建 DataFrame 为例
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "[{'key':'lastName','value':'Smith'},{'key':'lastName','value':'Silva'}]")],['id','name','customJson'])
正在解析 Json 表示元素数组
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import from_json, explode
schema = ArrayType(StructType([StructField('key', StringType()),StructField('value', StringType())]))
df = df.select(
df.id,
df.name,
explode(
from_json(df.customJson, schema)
).alias('parsedJson'),
df.customJson
)
df.select(
df.id,
df.name,
df.parsedJson.value.alias("lastName"),
).toPandas().to_markdown()
# to_markdown properly formats the DataFrame as markdown to print below as table
id | name | lastName | |
---|---|---|---|
0 | 1 | John | Smith |
1 | 1 | John | Silva |