无法弄清楚如何使用 NiFi 将嵌套 JSON 数据的键和值插入 SQL 行

Can't figure out how to insert keys and values of nested JSON data into SQL rows with NiFi

我正在做一个个人项目,并且对 JSON、NiFi、SQL 等非常陌生(边走边学),所以请原谅这里使用的任何令人困惑的语言或潜在的非常明显的解决方案。我可以根据需要澄清。

我需要从网站的 API 调用中获取 JSON 输出并将其插入到我设置的 MariaDB 本地服务器中的 table 中。问题是 JSON 数据是嵌套的,我需要插入的关键数据中有两个用作变量键对象而不是值,所以我不知道如何提取它并放入它在数据库 table 中。本质上,我想我需要识别 JSON 表达式的不同部分并将它们作为值插入,但我不知道该怎么做。

我特别尝试过 EvaluateJSON、SplitJSON 和 FlattenJSON 处理器,但我无法让它工作。我所能做的就是得到整个表达式的结果,而不是每个表达式的结果。

{"5381":{"wind_speed":4.0,"tm_st_snp":26.0,"tm_off_snp":74.0,"tm_def_snp":63.0,"temperature":58.0,"st_snp":8.0,"punts":4.0,"punt_yds":178.0,"punt_lng":55.0,"punt_in_20":1.0,"punt_avg":44.5,"humidity":47.0,"gp":1.0,"gms_active":1.0},

"1023":{"wind_speed":4.0,"tm_st_snp":26.0,"tm_off_snp":82.0,"tm_def_snp":56.0,"temperature":74.0,"off_snp":82.0,"humidity":66.0,"gs":1.0,"gp":1.0,"gms_active":1.0},

"5300":{"wind_speed":17.0,"tm_st_snp":27.0,"tm_off_snp":80.0,"tm_def_snp":64.0,"temperature":64.0,"st_snp":21.0,"pts_std":9.0,"pts_ppr":9.0,"pts_half_ppr":9.0,"idp_tkl_solo":4.0,"idp_tkl_loss":1.0,"idp_tkl":4.0,"idp_sack":1.0,"idp_qb_hit":2.0,"humidity":100.0,"gp":1.0,"gms_active":1.0,"def_snp":23.0},

"608":{"wind_speed":6.0,"tm_st_snp":20.0,"tm_off_snp":53.0,"tm_def_snp":79.0,"temperature":88.0,"st_snp":4.0,"pts_std":5.5,"pts_ppr":5.5,"pts_half_ppr":5.5,"idp_tkl_solo":4.0,"idp_tkl_loss":1.0,"idp_tkl_ast":1.0,"idp_tkl":5.0,"humidity":78.0,"gs":1.0,"gp":1.0,"gms_active":1.0,"def_snp":56.0},

"3396":{"wind_speed":6.0,"tm_st_snp":20.0,"tm_off_snp":60.0,"tm_def_snp":70.0,"temperature":63.0,"st_snp":19.0,"off_snp":13.0,"humidity":100.0,"gp":1.0,"gms_active":1.0}}

这是几千行输出的快照。您在上面看到的每个数字键(5381、1023、5300 等)都是以下统计数据的玩家 ID。我有一个包含三列的 table:Player IDStat IDStat Value。例如,我需要将第一个片段插入到我的 table 中:

Player ID        Stat ID        Stat Value
5381             wind_speed     4.0
5381             tm_st_snp      26.0
5381             tm_off_snp     74.0

以此类推,针对每条数据。但我不知道如何让 NiFi select 将正确的数据插入正确的列中。

我相信可以使用 jolt 将您的 json 转换为以下格式:

[
  {"playerId":"5381", "statId":"wind_speed", "statValue": 0.123},
  {"playerId":"5381", "statId":"tm_st_snp", "statValue": 0.456},
  ...
] 

然后将 PutDatabaseRecord 与 json reader 一起使用。


另一种方法是使用 ExecuteGroovyScript 处理器。

向其添加名称为 SQL.mydb 的新参数,并将其 link 添加到您的 DBCP 控制器服务

并使用以下脚本作为 Script Body 参数:

import groovy.json.JsonSlurper
import groovy.json.JsonBuilder

def ff=session.get()
if(!ff)return


//read flow file content and parse it
def body = ff.read().withReader("UTF-8"){reader-> 
    new JsonSlurper().parse(reader) 
}

def results = []
//use defined sql connection to create a batch
SQL.mydb.withTransaction{
    def cmd = 'insert into mytable(playerId, statId, statValue) values(?,?,?)'
    results = SQL.mydb.withBatch(100, cmd){statement->
        //run through all keys/subkeys in flow file body
        body.each{pid,keys->
            keys.each{k,v->
                statement.addBatch(pid,k,v)
            }
        }
    }
}

//write results as a new flow file content
ff.write("UTF-8"){writer-> 
    new JsonBuilder(results).writeTo(writer) 
}
//transfer to success
REL_SUCCESS << ff