Databricks调用一次Trigger处理Kinesis Stream

Calling Trigger once in Databricks to process Kinesis Stream

我正在寻找一种方法来触发我的 Databricks notebook 一次以处理 Kinesis Stream 并使用以下模式

 import org.apache.spark.sql.streaming.Trigger

// Load your Streaming DataFrame
   val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
   sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")

看起来 AWS Kinesis 是不可能的,这也是 Databricks 文档所建议的。我的问题是我们还能做些什么来实现这一目标?

正如您在问题中提到的 trigger once isn't supported for Kinesis

但是您可以通过在图片中添加 Databricks 上可用的 Kinesis Data Firehose that will write data from Kinesis into S3 bucket (you can select format that you need, like, Parquet, ORC, or just leave in JSON), and then you can point the streaming job to given bucket, and use Trigger.Once for it, as it's a normal streaming source (For efficiency it's better to use Auto Loader 来实现您的需要。此外,为了控制成本,您可以为 S3 目的地设置保留策略,以便在一段时间后删除或归档文件,例如 1 周或一个月。

一种解决方法是在 X 运行 秒后停止,不触发。它将保证每个 运行 的固定行数。 唯一的问题是,如果您有数百万行在队列中等待,您将无法保证处理所有行

在scala中你可以添加一个事件监听器,在python中计算批次的数量。

from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")

#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
  print("Batchs #:"+str(len(s.recentProgress)))
  sleep(10)
s.stop()

您可以使用更高级的逻辑来计算每批处理的消息数并在队列为空时停止(吞吐量一旦全部消耗完就会降低,因为您只会获得“实时”流,不是历史)