使用 python 中的 spark 结构化流从来自 kafka 的 json 创建数据框
create a data frame from json coming from kafka using spark structured streaming in python
我是 spark 的结构化流媒体的新手,正在研究需要在结构化流媒体上实现的 poc。
输入源:kafka
输入格式:json
语言:python3
库:火花 3.2
我正在尝试在预定义结构的 spark 数据帧中格式化传入的 json。
到目前为止,我能够获取 json 事件并能够在控制台中获得结果(不是预期的格式)。如果您能在正确的方向上轻推我或提出解决方案,那将非常有帮助。
下面是我目前的代码。
json 来自卡夫卡
{"property1" : "hello","property2" : "world"}
structured_kafka.py
"""
Run the script
`$ bin/spark-submit structured_kafka.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df = lines.select('*')
# Start running the query that prints the running counts to the console
query = df\
.writeStream\
.outputMode('Append')\
.format('console')\
.start()
query.awaitTermination()
输出
Batch: 1
-------------------------------------------
+--------------------+
| parsed_value|
+--------------------+
|{hello, world} |
+--------------------+
预计
+--------------------+--------------------+
| property1 | property2 |
+--------------------+--------------------+
|hello |world |
+--------------------+---------------------
如果我能得到这种格式的 df,我就能应用我的用例。
请多多指教。
注意:我查看了所有现有的解决方案,大多数解决方案要么在scala中,要么不用于结构化流,要么不用于kafka作为源。
行后:
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
添加:
.select(col("parsed_value.property1"), col("parsed_value.property2"))
或:
.select(col("parsed_value.*"))
我是 spark 的结构化流媒体的新手,正在研究需要在结构化流媒体上实现的 poc。
输入源:kafka 输入格式:json 语言:python3 库:火花 3.2
我正在尝试在预定义结构的 spark 数据帧中格式化传入的 json。
到目前为止,我能够获取 json 事件并能够在控制台中获得结果(不是预期的格式)。如果您能在正确的方向上轻推我或提出解决方案,那将非常有帮助。
下面是我目前的代码。
json 来自卡夫卡
{"property1" : "hello","property2" : "world"}
structured_kafka.py
"""
Run the script
`$ bin/spark-submit structured_kafka.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df = lines.select('*')
# Start running the query that prints the running counts to the console
query = df\
.writeStream\
.outputMode('Append')\
.format('console')\
.start()
query.awaitTermination()
输出
Batch: 1
-------------------------------------------
+--------------------+
| parsed_value|
+--------------------+
|{hello, world} |
+--------------------+
预计
+--------------------+--------------------+
| property1 | property2 |
+--------------------+--------------------+
|hello |world |
+--------------------+---------------------
如果我能得到这种格式的 df,我就能应用我的用例。
请多多指教。
注意:我查看了所有现有的解决方案,大多数解决方案要么在scala中,要么不用于结构化流,要么不用于kafka作为源。
行后:
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
添加:
.select(col("parsed_value.property1"), col("parsed_value.property2"))
或:
.select(col("parsed_value.*"))