使用 python 中的 spark 结构化流从来自 kafka 的 json 创建数据框

create a data frame from json coming from kafka using spark structured streaming in python

我是 spark 的结构化流媒体的新手,正在研究需要在结构化流媒体上实现的 poc。

输入源:kafka 输入格式:json 语言:python3 库:火花 3.2

我正在尝试在预定义结构的 spark 数据帧中格式化传入的 json。

到目前为止,我能够获取 json 事件并能够在控制台中获得结果(不是预期的格式)。如果您能在正确的方向上轻推我或提出解决方案,那将非常有帮助。

下面是我目前的代码。

json 来自卡夫卡

{"property1" : "hello","property2" : "world"}

structured_kafka.py


"""
 Run the script
    `$ bin/spark-submit structured_kafka.py \
    host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    
    schema = StructType([ 
        StructField("property1", StringType(), True),
        StructField("property2" , StringType(), True),
        ])


    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))


    df = lines.select('*')
  
    # Start running the query that prints the running counts to the console
    query = df\
        .writeStream\
        .outputMode('Append')\
        .format('console')\
        .start()

    query.awaitTermination()

输出

Batch: 1
-------------------------------------------
+--------------------+
|        parsed_value|
+--------------------+
|{hello, world}      |
+--------------------+

预计

+--------------------+--------------------+
| property1          | property2          |
+--------------------+--------------------+
|hello               |world               |
+--------------------+---------------------

如果我能得到这种格式的 df,我就能应用我的用例。

请多多指教。

注意:我查看了所有现有的解决方案,大多数解决方案要么在scala中,要么不用于结构化流,要么不用于kafka作为源。

行后:

.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

添加:

.select(col("parsed_value.property1"), col("parsed_value.property2"))

或:

.select(col("parsed_value.*"))