来自 Kafka JSON 的结构化流模式 - 查询错误
Structured streaming schema from Kafka JSON - query error
我正在使用 Spark 3.2 从 Kafka 2.12-3.0.0 获取 JSON 流。
我在解析 JSON 后收到查询错误。
Kafka 主题流 JSON:
b'{"pmu_id": 2, "time": 1642771653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 49.99, "rocof": 1}'
b'{"pmu_id": 2, "time": 1642734653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 50.00, "rocof": -1}'
DataFrame 架构:
stream01Schema= StructType()\
.add("pmu_id", ByteType())\
.add("time", TimestampType()).add("stream_id", ByteType())
.add("analog", StringType()).add("digital", ByteType()).add("frequency", FloatType()).add("rocof", ByteType())
构建一个从主题读取的流数据帧:
stream01DF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
.select(col("key").cast("string") from_json(col("value").cast("string").alias("pmudata"), stream01Schema))
正在打印生成的架构:
root
|-- key: string (nullable = true)
|-- from_json(CAST(value AS STRING) AS pmudata): struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- analog: string (nullable = true)
| |-- digital: byte (nullable = true)
| |-- frequency: float (nullable = true)
| |-- rocof: byte (nullable = true)
测试查询:
testQuery = stream01DF.groupBy("pmudata.rocof").count()
testQuery.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", False) \
.start() \
.awaitTermination()
收到错误:
pyspark.sql.utils.AnalysisException: cannot resolve 'pmudata.rocof' given input columns: [from_json(CAST(value AS STRING) AS pmudata), key];
似乎您正在寻找这个,因为您正在尝试将 from_json()
列(检查您的括号)别名化为一个名称,稍后您可以 select/group 通过。
from_json(col("value").cast("string"), stream01Schema).alias("pmudata")
完整用法在this Databricks post
中的end-to-end示例中
我正在使用 Spark 3.2 从 Kafka 2.12-3.0.0 获取 JSON 流。 我在解析 JSON 后收到查询错误。
Kafka 主题流 JSON:
b'{"pmu_id": 2, "time": 1642771653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 49.99, "rocof": 1}'
b'{"pmu_id": 2, "time": 1642734653.06, "stream_id": 2,"analog": [], "digital": 0, "frequency": 50.00, "rocof": -1}'
DataFrame 架构:
stream01Schema= StructType()\
.add("pmu_id", ByteType())\
.add("time", TimestampType()).add("stream_id", ByteType())
.add("analog", StringType()).add("digital", ByteType()).add("frequency", FloatType()).add("rocof", ByteType())
构建一个从主题读取的流数据帧:
stream01DF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
.select(col("key").cast("string") from_json(col("value").cast("string").alias("pmudata"), stream01Schema))
正在打印生成的架构:
root
|-- key: string (nullable = true)
|-- from_json(CAST(value AS STRING) AS pmudata): struct (nullable = true)
| |-- pmu_id: byte (nullable = true)
| |-- time: timestamp (nullable = true)
| |-- stream_id: byte (nullable = true)
| |-- analog: string (nullable = true)
| |-- digital: byte (nullable = true)
| |-- frequency: float (nullable = true)
| |-- rocof: byte (nullable = true)
测试查询:
testQuery = stream01DF.groupBy("pmudata.rocof").count()
testQuery.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", False) \
.start() \
.awaitTermination()
收到错误:
pyspark.sql.utils.AnalysisException: cannot resolve 'pmudata.rocof' given input columns: [from_json(CAST(value AS STRING) AS pmudata), key];
似乎您正在寻找这个,因为您正在尝试将 from_json()
列(检查您的括号)别名化为一个名称,稍后您可以 select/group 通过。
from_json(col("value").cast("string"), stream01Schema).alias("pmudata")
完整用法在this Databricks post
中的end-to-end示例中