如何变量json字段名spark
How to variable json field name spark
我有 json 日志文件(json 分隔符 /n)并且需要 spark 结构类型,但每个 json 第一个字段名称在我的 txt 文件中不同
我该怎么做?
val elementSchema = new StructType()
.add("name",StringType,true)
.add("object_type",StringType,true)
.add("privilege",StringType,true)
val simpleSchema = new StructType()
.add("authorization_failure",StringType,true)
.add("catalog_objects",elementSchema,true)
.add("impersonator",StringType,true)
.add("network_address",StringType,true)
.add("query_id",StringType,true)
.add("session_id",StringType,true)
.add("sql_statement",StringType,true)
.add("start_time",StringType,true)
.add("statement_type",StringType,true)
.add("status",StringType,true)
.add("user",StringType,true)
val anaSchema = new StructType()
.add("saasd",StringType,true)
val config = new SparkConf()`
config.set("spark.sql.shuffle.partitions","300")
val spark=SparkSession.builder().config(config).master("local[2]")
.appName("Example")
.getOrCreate()
val dataframe = spark.read
.json(s"/home/ogn/denemeler/big_data/impala_audit_spark/file/testa.txt")
dataframe.printSchema()
val df =dataframe.select(to_json( struct( dataframe.columns.map(col(`_`)):`_`* ) ).alias("all"))
期待
每个字段结构
authorization_failure|catalog_objects|impersonator|network_address|query_id|session_id|sql_statement|start_time|statement_type|status|user|
testa.txt内容是单个文件有近3mjson
{"1648039261379":{"query_id":"x","session_id":"da40931781b4b8ed:978bb8edb9177dbd","start_time":"2022-03-23 15:41:01.234826","authorization_failure":false,"status":"","user":"x","impersonator":null,"statement_type":"QUERY","network_address":"x","sql_statement":"y","catalog_objects":[{"name":"_impala_builtins","object_type":"DATABASE","privilege":"VIEW_METADATA"},{"name":"s","object_type":"TABLE","privilege":"SELECT"}]}}
{"1648039261510":{"query_id":"x","session_id":"344247956fada236:7d9c0930b7c51b9a","start_time":"2022-03-23 15:41:01.507023","authorization_failure":false,"status":"","user":"x","impersonator":null,"statement_type":"USE","network_address":"x","sql_statement":"t","catalog_objects":[{"name":"g","object_type":"DATABASE","privilege":"ANY"}]}}
步骤 1:使用 textFile:
将 Json 文件作为简单文本文件读取
val ds: Dataset[String] = spark.read.textFile("testa.txt")
步骤 2:使用 regexp_extract 删除第一个 Json 级别。您也可以解析 json 字符串,但我认为这种方法更快。
import spark.implicits._
val ds2: Dataset[String] = ds.withColumn("value", regexp_extract('value, "\{.*:(\{.*\})\}", 1)).as[String]
步骤 3: 将字符串解析为数据帧:
val df3: DataFrame = spark.read.json(ds2)
df3
现在有结构
root
|-- authorization_failure: boolean (nullable = true)
|-- catalog_objects: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- object_type: string (nullable = true)
| | |-- privilege: string (nullable = true)
|-- impersonator: string (nullable = true)
|-- network_address: string (nullable = true)
|-- query_id: string (nullable = true)
|-- session_id: string (nullable = true)
|-- sql_statement: string (nullable = true)
|-- start_time: string (nullable = true)
|-- statement_type: string (nullable = true)
|-- status: string (nullable = true)
|-- user: string (nullable = true)
我有 json 日志文件(json 分隔符 /n)并且需要 spark 结构类型,但每个 json 第一个字段名称在我的 txt 文件中不同 我该怎么做?
val elementSchema = new StructType()
.add("name",StringType,true)
.add("object_type",StringType,true)
.add("privilege",StringType,true)
val simpleSchema = new StructType()
.add("authorization_failure",StringType,true)
.add("catalog_objects",elementSchema,true)
.add("impersonator",StringType,true)
.add("network_address",StringType,true)
.add("query_id",StringType,true)
.add("session_id",StringType,true)
.add("sql_statement",StringType,true)
.add("start_time",StringType,true)
.add("statement_type",StringType,true)
.add("status",StringType,true)
.add("user",StringType,true)
val anaSchema = new StructType()
.add("saasd",StringType,true)
val config = new SparkConf()`
config.set("spark.sql.shuffle.partitions","300")
val spark=SparkSession.builder().config(config).master("local[2]")
.appName("Example")
.getOrCreate()
val dataframe = spark.read
.json(s"/home/ogn/denemeler/big_data/impala_audit_spark/file/testa.txt")
dataframe.printSchema()
val df =dataframe.select(to_json( struct( dataframe.columns.map(col(`_`)):`_`* ) ).alias("all"))
期待
每个字段结构
authorization_failure|catalog_objects|impersonator|network_address|query_id|session_id|sql_statement|start_time|statement_type|status|user|
testa.txt内容是单个文件有近3mjson
{"1648039261379":{"query_id":"x","session_id":"da40931781b4b8ed:978bb8edb9177dbd","start_time":"2022-03-23 15:41:01.234826","authorization_failure":false,"status":"","user":"x","impersonator":null,"statement_type":"QUERY","network_address":"x","sql_statement":"y","catalog_objects":[{"name":"_impala_builtins","object_type":"DATABASE","privilege":"VIEW_METADATA"},{"name":"s","object_type":"TABLE","privilege":"SELECT"}]}}
{"1648039261510":{"query_id":"x","session_id":"344247956fada236:7d9c0930b7c51b9a","start_time":"2022-03-23 15:41:01.507023","authorization_failure":false,"status":"","user":"x","impersonator":null,"statement_type":"USE","network_address":"x","sql_statement":"t","catalog_objects":[{"name":"g","object_type":"DATABASE","privilege":"ANY"}]}}
步骤 1:使用 textFile:
将 Json 文件作为简单文本文件读取val ds: Dataset[String] = spark.read.textFile("testa.txt")
步骤 2:使用 regexp_extract 删除第一个 Json 级别。您也可以解析 json 字符串,但我认为这种方法更快。
import spark.implicits._
val ds2: Dataset[String] = ds.withColumn("value", regexp_extract('value, "\{.*:(\{.*\})\}", 1)).as[String]
步骤 3: 将字符串解析为数据帧:
val df3: DataFrame = spark.read.json(ds2)
df3
现在有结构
root
|-- authorization_failure: boolean (nullable = true)
|-- catalog_objects: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- object_type: string (nullable = true)
| | |-- privilege: string (nullable = true)
|-- impersonator: string (nullable = true)
|-- network_address: string (nullable = true)
|-- query_id: string (nullable = true)
|-- session_id: string (nullable = true)
|-- sql_statement: string (nullable = true)
|-- start_time: string (nullable = true)
|-- statement_type: string (nullable = true)
|-- status: string (nullable = true)
|-- user: string (nullable = true)