在 Foundry 中,我如何解析具有 JSON 响应的数据框列

In Foundry, how can I parse a dataframe column that has a JSON response

我正在尝试使用外部 API 将 JIRA 数据导入 Foundry。当它通过 Magritte 进入时,数据会存储在 AVRO 中,并且有一个名为响应的列。响应列中的数据如下所示...

[{"id":"customfield_5","name":"test","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["cf[5]","test"],"schema":{"type":"user","custom":"com.atlassian.jira.plugin.system.customfieldtypes:userpicker","customId":5}},{"id":"customfield_2","name":"test2","custom":true,"orderable":true,"navigable":true,"searchable":true,"clauseNames":["test2","cf[2]"],"schema":{"type":"option","custom":"com.atlassian.jira.plugin.system.customfieldtypes:select","customId":2}}]

由于它作为 AVRO 导入,因此 Foundry 中讨论如何转换此数据的文档不起作用。如何将此数据转换为单独的列和行?

这是我尝试使用的代码:

from transforms.api import transform_df, Input, Output
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
import json
import pyspark.sql.types as T


@transform_df(
    Output("json output"),
    json_raw=Input("json input"),
)
def my_compute_function(json_raw, ctx):

    sqlContext = SQLContext(sc)

    source = json_raw.select('response').collect()  # noqa

    # Read the list into data frame
    df = sqlContext.read.json(sc.parallelize(source))

    json_schema = T.StructType([
        T.StructField("id", T.StringType(), False),
        T.StructField("name", T.StringType(), False),
        T.StructField("custom", T.StringType(), False),
        T.StructField("orderable", T.StringType(), False),
        T.StructField("navigable", T.StringType(), False),
        T.StructField("searchable", T.StringType(), False),
        T.StructField("clauseNames", T.StringType(), False),
        T.StructField("schema", T.StringType(), False)
    ])

    udf_parse_json = udf(lambda str: parse_json(str), json_schema)

    df_new = df.select(udf_parse_json(df.response).alias("response"))

    return df_new


# Function to convert JSON array string to a list
def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj:
        yield (item["a"], item["b"])

使用 F.from_json 函数可以轻松地将字符串列中的 Json 解析为结构列(然后解析为单独的列)。

对于你的情况,你需要做:

df = df.withColumn("response_parsed", F.from_json("response", json_schema))

然后您可以执行此操作或类似操作以将内容放入不同的列中:

df = df.select("response_parsed.*")

但是,这不会起作用,因为您的架构不正确,您实际上每行都有一个 json 结构列表,而不仅仅是 1 个,因此您需要一个 T.ArrayType(your_schema) 环绕整个事情,你还需要在选择之前做一个F.explode,让每个数组元素在它自己的行中。

另一个有用的函数是 F.get_json_object,它允许您从 json 字符串中获取 json 一个 json 对象。

像您所做的那样使用 UDF 是可行的,但 UDF 通常 比原生 spark 函数。

此外,AVRO 文件格式在这种情况下所做的就是将多个 json 文件合并为一个大文件,每个文件在其自己的行中,因此“Rest API”下的示例插件”- “Processing JSON in Foundry”应该可以工作,只要您跳过 'put this schema on the raw dataset' 步骤。

我使用 magritte-rest 连接器遍历搜索的分页结果:

type: rest-source-adapter2
restCalls:
  - type: magritte-paging-inc-param-call
    method: GET
    path: search
    paramToIncrease: startAt
    increaseBy: 50
    initValue: 0
    parameters:
      startAt: '{%startAt%}'
    extractor:
      - type: json
        assign:
          issues: /issues
        allowNull: true
    condition:
      type: magritte-rest-non-empty-condition
      var: issues
    maxIterationsAllowed: 4096
cacheToDisk: false
oneFilePerResponse: false

这产生了一个看起来像这样的数据集:

一旦我有了它,这个解析扩展并将返回的 JSON 问题解析成一个正确类型的 DataFrame,其中 fields 将问题的内部结构保存为一个(非常复杂的)结构:

import json
from pyspark.sql import Row
from pyspark.sql.functions import explode

def issues_enumerated(All_Issues_Paged):

    def generate_issue_row(input_row: Row) -> Row:
        """
        Generates a dataframe of each responses issue array as a single array record per-Row
        """
        d = input_row.asDict()
        resp_json = d['response']
        resp_obj = json.loads(resp_json)
        issues = list(map(json.dumps,resp_obj['issues']))

        return Row(issues=issues)
    
    # array-per-record
    unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
    # row-per-record
    row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
    # raw JSON string per-record RDD
    issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
    # JSON object dataframe
    issues_df = spark.read.json(issue_json_strings_rdd)
    issues_df.printSchema()
    return issues_df