Scala schema_of_json 函数在 spark 结构化流中失败
Scala schema_of_json function fails in spark structured streaming
我创建了一个函数来读取 JSON 作为具有其模式的字符串。然后在火花流中使用该功能。这样做时出现错误。当我首先创建模式,然后使用该模式读取时,同一块工作,但在单行中不起作用。我该如何解决?
def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
TOPICS.split(',').foreach(topic =>{
var TableName = topic.split('.').last.toUpperCase
var df = microBatchOutputDF
/*var schema = schema_of_json(df
.select($"value")
.filter($"topic".contains(topic))
.as[String]
)*/
var jsonDataDf = df.filter($"topic".contains(topic))
.withColumn("jsonData", from_json($"value", schema_of_json(lit($"value".as[String])), scala.collection.immutable.Map[String, String]().asJava))
var srcTable = jsonDataDf
.select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")
srcTable
.select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
.write
.mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
} )
}
Spark 流代码
import org.apache.spark.sql.streaming.Trigger
val StreamingQuery = InputDf
.select("*")
.writeStream.outputMode("update")
.option("queryName", "StreamingQuery")
.foreachBatch(processBatch _)
.start()
错误:
org.apache.spark.sql.AnalysisException:模式应以 DDL 格式指定为字符串文字或 schema_of_json/schema_of_csv 函数的输出,而不是 schema_of_json(value)
Error –org.apache.spark.sql.AnalysisException: Schema should be
specified in DDL format as a string literal or output of the
schema_of_json/schema_of_csv functions instead of
schema_of_json(value)
以上错误表明 from_json()
函数存在问题。
语法:- from_json(jsonStr, schema[, options])
- Returns 具有给定 jsonStr
和 schema
.
的结构值
参考以下示例:
> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
{"a":1,"b":0.8}
> SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
{"time":2015-08-26 00:00:00}
参考 - https://docs.databricks.com/sql/language-manual/functions/from_json.html
我就是这样解决的。
我从 kafka 输出数据帧创建了一个过滤数据帧,并像以前一样应用了其中的所有逻辑。读取时生成模式的问题是,from_json
不知道要使用数据帧所有行中的确切行。
def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
TOPICS.split(',').foreach(topic =>{
var TableName = topic.split('.').last.toUpperCase
var df = microBatchOutputDF.where(col("topic") === topic)
var schema = schema_of_json(df
.select($"value")
.filter($"topic".contains(topic))
.as[String]
)
var jsonDataDf = df.withColumn("jsonData", from_json($"value", schema, scala.collection.immutable.Map[String, String]().asJava))
var srcTable = jsonDataDf
.select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")
srcTable
.select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
.write
.mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
} )
}
我创建了一个函数来读取 JSON 作为具有其模式的字符串。然后在火花流中使用该功能。这样做时出现错误。当我首先创建模式,然后使用该模式读取时,同一块工作,但在单行中不起作用。我该如何解决?
def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
TOPICS.split(',').foreach(topic =>{
var TableName = topic.split('.').last.toUpperCase
var df = microBatchOutputDF
/*var schema = schema_of_json(df
.select($"value")
.filter($"topic".contains(topic))
.as[String]
)*/
var jsonDataDf = df.filter($"topic".contains(topic))
.withColumn("jsonData", from_json($"value", schema_of_json(lit($"value".as[String])), scala.collection.immutable.Map[String, String]().asJava))
var srcTable = jsonDataDf
.select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")
srcTable
.select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
.write
.mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
} )
}
Spark 流代码
import org.apache.spark.sql.streaming.Trigger
val StreamingQuery = InputDf
.select("*")
.writeStream.outputMode("update")
.option("queryName", "StreamingQuery")
.foreachBatch(processBatch _)
.start()
错误: org.apache.spark.sql.AnalysisException:模式应以 DDL 格式指定为字符串文字或 schema_of_json/schema_of_csv 函数的输出,而不是 schema_of_json(value)
Error –org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json/schema_of_csv functions instead of schema_of_json(value)
以上错误表明 from_json()
函数存在问题。
语法:- from_json(jsonStr, schema[, options])
- Returns 具有给定 jsonStr
和 schema
.
参考以下示例:
> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
{"a":1,"b":0.8}
> SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
{"time":2015-08-26 00:00:00}
参考 - https://docs.databricks.com/sql/language-manual/functions/from_json.html
我就是这样解决的。
我从 kafka 输出数据帧创建了一个过滤数据帧,并像以前一样应用了其中的所有逻辑。读取时生成模式的问题是,from_json
不知道要使用数据帧所有行中的确切行。
def processBatch(microBatchOutputDF: DataFrame, batchId: Long) {
TOPICS.split(',').foreach(topic =>{
var TableName = topic.split('.').last.toUpperCase
var df = microBatchOutputDF.where(col("topic") === topic)
var schema = schema_of_json(df
.select($"value")
.filter($"topic".contains(topic))
.as[String]
)
var jsonDataDf = df.withColumn("jsonData", from_json($"value", schema, scala.collection.immutable.Map[String, String]().asJava))
var srcTable = jsonDataDf
.select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp")
srcTable
.select(srcTable.columns.map(c => col(c).cast(StringType)) : _*)
.write
.mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName)
spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""")
} )
}