pyspark:如何使用 KafkaUtils 执行结构化流式传输

pyspark: how to perform structured streaming using KafkaUtils

我正在使用 SparkSession.readStream 进行结构化流式传输并将其写入配置单元 table,但它似乎不允许我进行基于时间的微批处理,即我需要一批 5 个秒。所有消息都应该形成一个 5 秒的批次,并且批次数据应该写入配置单元 table.
现在它在消息发布到 Kafka 主题时读取消息,每条消息都是 table.
的一条记录 工作代码

def hive_write_batch_data(data, batchId):
    data.write.format("parquet").mode("append").saveAsTable("test.my_table")

kafka_config = {
        "checkpointLocation":"/user/aiman/temp/checkpoint",
        "kafka.bootstrap.servers":"kafka.bootstrap.server.com:9093",
        "subscribe":"TEST_TOPIC",
        "startingOffsets": offsetValue,
        "kafka.security.protocol":"SSL",
        "kafka.ssl.keystore.location": "kafka.keystore.uat.jks",
        "kafka.ssl.keystore.password": "abcd123",
        "kafka.ssl.key.password":"abcd123",
        "kafka.ssl.truststore.type":"JKS",
        "kafka.ssl.truststore.location": "kafka.truststore.uat.jks",
        "kafka.ssl.truststore.password":"abdc123",
        "kafka.ssl.enabled.protocols":"TLSv1",
        "kafka.ssl.endpoint.identification.algorithm":""
    }

df = spark.readStream \
              .format("kafka") \
              .options(**kafka_config) \
              .load()
data = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","offset","timestamp","partition")

data_new = data.select(col("offset"),col("partition"),col("key"),json_tuple(col("value"),"product_code","rec_time")) \
               .toDF("offset","partition","key","product_code","rec_time")

data_new.writeStream. \
        .foreachBatch(hive_write_batch_data) \
        .start() \
        .awaitTermination()

问题陈述
由于每条消息都被视为配置单元 table 中的一个记录条目,因此正在为每个记录创建一个 parquet 文件,这可能会触发配置单元的小文件问题。

我需要创建一个基于时间的批处理,以便将多条记录一次性插入配置单元 table。为此,我只发现 KafkaUtils 支持基于时间的使用 ssc = StreamingContext(sc, 5) 但它不会创建 Dataframes.
我应该如何使用 KafkaUtils 创建读入数据帧的批次?

添加触发器有效。在流写入器中添加了一个触发器:

data_new.writeStream \
        .trigger(processingTime="5 seconds") \ #Trigger
        .foreachBatch(hive_write_batch_data) \
        .start() \
        .awaitTermination()

找到文章here