pyspark writeStream:每个数据框行在一个单独的 json 文件中

pyspark writeStream: Each Data Frame row in a separate json file

我正在使用 pyspark 从 Kafka 主题读取数据作为流数据帧,如下所示:

spark = SparkSession.builder \
  .appName("Spark Structured Streaming from Kafka") \
  .getOrCreate()

sdf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load() \
  .select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))

sdf_ = sdf.select("parsed_value.*")

我的目标是将 sdf_ 行的每一行写成 单独的 json 文件。 以下代码:

writing_sink = sdf_.writeStream \
    .format("json") \
    .option("path", "/Desktop/...") \
    .option("checkpointLocation", "/Desktop/...") \
    .start()

writing_sink.awaitTermination()

将在同一个 json 中写入多行数据帧,具体取决于微批次的大小(或者至少这是我的假设)。 我需要的是调整上面的内容,以便数据帧的每一行都写入一个单独的 json 文件中。

我也尝试过使用 partitionBy('column'),但这仍然不能完全满足我的需要,而是创建文件夹,其中 json 文件可能仍有多行写入(如果他们有相同的 ID)。

有什么可以帮助解决这个问题的想法吗?提前致谢。

发现以下选项可以解决问题:

   .option("maxRecordsPerFile", 1)