Spark Structured Streaming 应用程序仅从 Kafka return 读取空值
Spark Structured Streaming application reading from Kafka return only null values
我打算使用 Spark Structured Streaming 从 Kafka 中提取数据,但我得到的是空数据。
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_csv, from_json
from pyspark.sql.types import StringType, StructType
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("pyspark_structured_streaming_kafka") \
.getOrCreate()
df_raw = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers","52.81.249.81:9092") \
.option("subscribe","product") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.option("kafka.isolation.level","read_committed") \
.load()
df_raw.printSchema()
product_schema = StructType() \
.add("product_name", StringType()) \
.add("product_factory", StringType()) \
.add("yield_num", StringType()) \
.add("yield_time", StringType())
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json("value",product_schema).alias("data")) \
.select("data.*") \
.write \
.format("console") \
.save()
我的测试数据如下
{
"product_name": "X Laptop",
"product_factory": "B-3231",
"yield_num": 899,
"yield_time": "20210201 22:00:01"
}
但结果出乎我的预料
./spark-submit ~/Documents/3-Playground/kbatch.py
+------------+---------------+---------+----------+
|product_name|product_factory|yield_num|yield_time|
+------------+---------------+---------+----------+
| null| null| null| null|
| null| null| null| null|
测试数据发布命令:
./kafka-producer-perf-test.sh --topic product --num-records 90000000 --throughput 5 --producer.config ../config/producer.properties --payload-file ~/Downloads/product.json
如果像这样删掉一些代码
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation","file:///Users/picomy/Kafka-Output/checkpoint") \
.start() \
.awaitTermination()
结果如下
Batch: 3130
-------------------------------------------
+--------------------+
| value|
+--------------------+
| "yield_time":...|
| "product_name...|
| "yield_num": ...|
| "product_fact...|
| "yield_num": ...|
| "yield_num": ...|
| "product_fact...|
| "product_fact...|
| "product_name...|
| "product_fact...|
| "product_name...|
| }|
| "yield_time":...|
| "product_name...|
| }|
| "product_fact...|
| "yield_num": ...|
| "product_fact...|
| "yield_time":...|
| "product_name...|
+--------------------+
不知道问题的根本原因在哪里
导致您的代码无法正常工作的原因很少:
- 错误的架构(字段 yield_num 是一个 integer/long)
- 使用 writeStream 而不是只写(如果你想要流式传输)
- 开始和等待流式查询的终止
- 您的 json 文件中的数据应仅存储在一行中
您可以使用以下代码段替换部分代码:
from pyspark.sql.types import StringType, StructType, LongType
product_schema = StructType() \
.add("product_name", StringType()) \
.add("product_factory", StringType()) \
.add("yield_num", LongType()) \
.add("yield_time", StringType())
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json("value",product_schema).alias("data")) \
.select("data.*") \
.writeStream \
.format("console") \
.start()
.awaitTermination()
我打算使用 Spark Structured Streaming 从 Kafka 中提取数据,但我得到的是空数据。
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_csv, from_json
from pyspark.sql.types import StringType, StructType
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("pyspark_structured_streaming_kafka") \
.getOrCreate()
df_raw = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers","52.81.249.81:9092") \
.option("subscribe","product") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.option("kafka.isolation.level","read_committed") \
.load()
df_raw.printSchema()
product_schema = StructType() \
.add("product_name", StringType()) \
.add("product_factory", StringType()) \
.add("yield_num", StringType()) \
.add("yield_time", StringType())
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json("value",product_schema).alias("data")) \
.select("data.*") \
.write \
.format("console") \
.save()
我的测试数据如下
{
"product_name": "X Laptop",
"product_factory": "B-3231",
"yield_num": 899,
"yield_time": "20210201 22:00:01"
}
但结果出乎我的预料
./spark-submit ~/Documents/3-Playground/kbatch.py
+------------+---------------+---------+----------+
|product_name|product_factory|yield_num|yield_time|
+------------+---------------+---------+----------+
| null| null| null| null|
| null| null| null| null|
测试数据发布命令:
./kafka-producer-perf-test.sh --topic product --num-records 90000000 --throughput 5 --producer.config ../config/producer.properties --payload-file ~/Downloads/product.json
如果像这样删掉一些代码
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation","file:///Users/picomy/Kafka-Output/checkpoint") \
.start() \
.awaitTermination()
结果如下
Batch: 3130
-------------------------------------------
+--------------------+
| value|
+--------------------+
| "yield_time":...|
| "product_name...|
| "yield_num": ...|
| "product_fact...|
| "yield_num": ...|
| "yield_num": ...|
| "product_fact...|
| "product_fact...|
| "product_name...|
| "product_fact...|
| "product_name...|
| }|
| "yield_time":...|
| "product_name...|
| }|
| "product_fact...|
| "yield_num": ...|
| "product_fact...|
| "yield_time":...|
| "product_name...|
+--------------------+
不知道问题的根本原因在哪里
导致您的代码无法正常工作的原因很少:
- 错误的架构(字段 yield_num 是一个 integer/long)
- 使用 writeStream 而不是只写(如果你想要流式传输)
- 开始和等待流式查询的终止
- 您的 json 文件中的数据应仅存储在一行中
您可以使用以下代码段替换部分代码:
from pyspark.sql.types import StringType, StructType, LongType
product_schema = StructType() \
.add("product_name", StringType()) \
.add("product_factory", StringType()) \
.add("yield_num", LongType()) \
.add("yield_time", StringType())
df_1=df_raw.selectExpr("CAST(value AS STRING)") \
.select(from_json("value",product_schema).alias("data")) \
.select("data.*") \
.writeStream \
.format("console") \
.start()
.awaitTermination()