从 Spark Structured Streaming 中的 Kafka 消息中读取换行分隔 json
Read newline delimited json from Kafka message in Spark Structured Streaming
我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。
然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
但此代码仅适用于单个 json 文档。
如果该值包含多条记录作为换行符分隔 json,Spark 无法正确解码。
我不想为每个事件发送一条 kafka 消息。我怎样才能做到这一点?
我设法以这种方式完成了我正在寻找的事情,用换行符拆分全文字符串,然后按行分解数组以使用模式进行解析:
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING) as data")
events = events.select(explode(split(events.data, '\n')))
events = events.select(from_json(col("col"), event_schema).alias('value'))
events = events.selectExpr('value.*')```
我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。 然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
但此代码仅适用于单个 json 文档。 如果该值包含多条记录作为换行符分隔 json,Spark 无法正确解码。
我不想为每个事件发送一条 kafka 消息。我怎样才能做到这一点?
我设法以这种方式完成了我正在寻找的事情,用换行符拆分全文字符串,然后按行分解数组以使用模式进行解析:
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING) as data")
events = events.select(explode(split(events.data, '\n')))
events = events.select(from_json(col("col"), event_schema).alias('value'))
events = events.selectExpr('value.*')```