pyspark writeStream:每个数据框行在一个单独的 json 文件中
pyspark writeStream: Each Data Frame row in a separate json file
我正在使用 pyspark 从 Kafka 主题读取数据作为流数据帧,如下所示:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
sdf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))
sdf_ = sdf.select("parsed_value.*")
我的目标是将 sdf_
行的每一行写成 单独的 json 文件。
以下代码:
writing_sink = sdf_.writeStream \
.format("json") \
.option("path", "/Desktop/...") \
.option("checkpointLocation", "/Desktop/...") \
.start()
writing_sink.awaitTermination()
将在同一个 json 中写入多行数据帧,具体取决于微批次的大小(或者至少这是我的假设)。
我需要的是调整上面的内容,以便数据帧的每一行都写入一个单独的 json 文件中。
我也尝试过使用 partitionBy('column')
,但这仍然不能完全满足我的需要,而是创建文件夹,其中 json 文件可能仍有多行写入(如果他们有相同的 ID)。
有什么可以帮助解决这个问题的想法吗?提前致谢。
发现以下选项可以解决问题:
.option("maxRecordsPerFile", 1)
我正在使用 pyspark 从 Kafka 主题读取数据作为流数据帧,如下所示:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
sdf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))
sdf_ = sdf.select("parsed_value.*")
我的目标是将 sdf_
行的每一行写成 单独的 json 文件。
以下代码:
writing_sink = sdf_.writeStream \
.format("json") \
.option("path", "/Desktop/...") \
.option("checkpointLocation", "/Desktop/...") \
.start()
writing_sink.awaitTermination()
将在同一个 json 中写入多行数据帧,具体取决于微批次的大小(或者至少这是我的假设)。 我需要的是调整上面的内容,以便数据帧的每一行都写入一个单独的 json 文件中。
我也尝试过使用 partitionBy('column')
,但这仍然不能完全满足我的需要,而是创建文件夹,其中 json 文件可能仍有多行写入(如果他们有相同的 ID)。
有什么可以帮助解决这个问题的想法吗?提前致谢。
发现以下选项可以解决问题:
.option("maxRecordsPerFile", 1)