在 Foundry 中,我如何解析具有 JSON 响应的数据框列
In Foundry, how can I parse a dataframe column that has a JSON response
我正在尝试使用外部 API 将 JIRA 数据导入 Foundry。当它通过 Magritte 进入时,数据会存储在 AVRO 中,并且有一个名为响应的列。响应列中的数据如下所示...
[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]
由于它作为 AVRO 导入,因此 Foundry 中讨论如何转换此数据的文档不起作用。如何将此数据转换为单独的列和行?
这是我尝试使用的代码:
from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T
@transform_df(
Output("json output"),
json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):
sqlContext = SQLContext(sc)
source = json_raw.select('response').collect() # noqa
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
json_schema = T.StructType([
T.StructField("id", T.StringType(), False),
T.StructField("name", T.StringType(), False),
T.StructField("custom", T.StringType(), False),
T.StructField("orderable", T.StringType(), False),
T.StructField("navigable", T.StringType(), False),
T.StructField("searchable", T.StringType(), False),
T.StructField("clauseNames", T.StringType(), False),
T.StructField("schema", T.StringType(), False)
])
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
df_new = df.select(udf_parse_json(df.response).alias("response"))
return df_new
# Function to convert JSON array string to a list
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
使用 F.from_json 函数可以轻松地将字符串列中的 Json 解析为结构列(然后解析为单独的列)。
对于你的情况,你需要做:
df = df.withColumn("response_parsed", F.from_json("response", json_schema))
然后您可以执行此操作或类似操作以将内容放入不同的列中:
df = df.select("response_parsed.*")
但是,这不会起作用,因为您的架构不正确,您实际上每行都有一个 json 结构列表,而不仅仅是 1 个,因此您需要一个 T.ArrayType(your_schema)
环绕整个事情,你还需要在选择之前做一个F.explode,让每个数组元素在它自己的行中。
另一个有用的函数是 F.get_json_object,它允许您从 json 字符串中获取 json 一个 json 对象。
像您所做的那样使用 UDF 是可行的,但 UDF 通常 比原生 spark 函数。
此外,AVRO 文件格式在这种情况下所做的就是将多个 json 文件合并为一个大文件,每个文件在其自己的行中,因此“Rest API”下的示例插件”- “Processing JSON in Foundry”应该可以工作,只要您跳过 'put this schema on the raw dataset' 步骤。
我使用 magritte-rest 连接器遍历搜索的分页结果:
type: rest-source-adapter2
restCalls:
- type: magritte-paging-inc-param-call
method: GET
path: search
paramToIncrease: startAt
increaseBy: 50
initValue: 0
parameters:
startAt: '{%startAt%}'
extractor:
- type: json
assign:
issues: /issues
allowNull: true
condition:
type: magritte-rest-non-empty-condition
var: issues
maxIterationsAllowed: 4096
cacheToDisk: false
oneFilePerResponse: false
这产生了一个看起来像这样的数据集:
一旦我有了它,这个解析扩展并将返回的 JSON 问题解析成一个正确类型的 DataFrame,其中 fields
将问题的内部结构保存为一个(非常复杂的)结构:
import json
from pyspark.sql import Row
from pyspark.sql.functions import explode
def issues_enumerated(All_Issues_Paged):
def generate_issue_row(input_row: Row) -> Row:
"""
Generates a dataframe of each responses issue array as a single array record per-Row
"""
d = input_row.asDict()
resp_json = d['response']
resp_obj = json.loads(resp_json)
issues = list(map(json.dumps,resp_obj['issues']))
return Row(issues=issues)
# array-per-record
unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
# row-per-record
row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
# raw JSON string per-record RDD
issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
# JSON object dataframe
issues_df = spark.read.json(issue_json_strings_rdd)
issues_df.printSchema()
return issues_df
我正在尝试使用外部 API 将 JIRA 数据导入 Foundry。当它通过 Magritte 进入时,数据会存储在 AVRO 中,并且有一个名为响应的列。响应列中的数据如下所示...
[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]
由于它作为 AVRO 导入,因此 Foundry 中讨论如何转换此数据的文档不起作用。如何将此数据转换为单独的列和行?
这是我尝试使用的代码:
from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T
@transform_df(
Output("json output"),
json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):
sqlContext = SQLContext(sc)
source = json_raw.select('response').collect() # noqa
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
json_schema = T.StructType([
T.StructField("id", T.StringType(), False),
T.StructField("name", T.StringType(), False),
T.StructField("custom", T.StringType(), False),
T.StructField("orderable", T.StringType(), False),
T.StructField("navigable", T.StringType(), False),
T.StructField("searchable", T.StringType(), False),
T.StructField("clauseNames", T.StringType(), False),
T.StructField("schema", T.StringType(), False)
])
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
df_new = df.select(udf_parse_json(df.response).alias("response"))
return df_new
# Function to convert JSON array string to a list
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
使用 F.from_json 函数可以轻松地将字符串列中的 Json 解析为结构列(然后解析为单独的列)。
对于你的情况,你需要做:
df = df.withColumn("response_parsed", F.from_json("response", json_schema))
然后您可以执行此操作或类似操作以将内容放入不同的列中:
df = df.select("response_parsed.*")
但是,这不会起作用,因为您的架构不正确,您实际上每行都有一个 json 结构列表,而不仅仅是 1 个,因此您需要一个 T.ArrayType(your_schema)
环绕整个事情,你还需要在选择之前做一个F.explode,让每个数组元素在它自己的行中。
另一个有用的函数是 F.get_json_object,它允许您从 json 字符串中获取 json 一个 json 对象。
像您所做的那样使用 UDF 是可行的,但 UDF 通常
此外,AVRO 文件格式在这种情况下所做的就是将多个 json 文件合并为一个大文件,每个文件在其自己的行中,因此“Rest API”下的示例插件”- “Processing JSON in Foundry”应该可以工作,只要您跳过 'put this schema on the raw dataset' 步骤。
我使用 magritte-rest 连接器遍历搜索的分页结果:
type: rest-source-adapter2
restCalls:
- type: magritte-paging-inc-param-call
method: GET
path: search
paramToIncrease: startAt
increaseBy: 50
initValue: 0
parameters:
startAt: '{%startAt%}'
extractor:
- type: json
assign:
issues: /issues
allowNull: true
condition:
type: magritte-rest-non-empty-condition
var: issues
maxIterationsAllowed: 4096
cacheToDisk: false
oneFilePerResponse: false
这产生了一个看起来像这样的数据集:
一旦我有了它,这个解析扩展并将返回的 JSON 问题解析成一个正确类型的 DataFrame,其中 fields
将问题的内部结构保存为一个(非常复杂的)结构:
import json
from pyspark.sql import Row
from pyspark.sql.functions import explode
def issues_enumerated(All_Issues_Paged):
def generate_issue_row(input_row: Row) -> Row:
"""
Generates a dataframe of each responses issue array as a single array record per-Row
"""
d = input_row.asDict()
resp_json = d['response']
resp_obj = json.loads(resp_json)
issues = list(map(json.dumps,resp_obj['issues']))
return Row(issues=issues)
# array-per-record
unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
# row-per-record
row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
# raw JSON string per-record RDD
issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
# JSON object dataframe
issues_df = spark.read.json(issue_json_strings_rdd)
issues_df.printSchema()
return issues_df