如何检查是否在有限的持续时间或记录数内停止从 kafka 主题流式传输?

how to check if stop streaming from kafka topic by a limited time duration or record count?

我的最终目标是查看kafka主题是否运行ning以及其中的数据是否良好,否则失败/抛出错误

如果我只能拉取 100 条消息,或者拉取 60 秒,我想我可以完成我想要的。但是我在网上找到的所有流媒体示例/问题都没有关闭流媒体连接的意图。

这是我迄今为止最好的工作代码,它提取数据并显示它,但它一直在尝试提取更多数据,如果我尝试在下一行访问它,它就没有机会拉数据呢。我想我需要某种回电。有没有人做过类似的事情?这是解决此问题的最佳方法吗?

我正在使用数据块笔记本运行我的代码

import org.apache.spark.sql.functions.{explode, split}
val kafka = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<kafka server>:9092")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .load()

  val df = kafka.select(explode(split($"value".cast("string"), "\s+")).as("word"))

  display(df.select($"word"))

诀窍是您根本不需要流式传输。 Kafka源支持批量查询,如果将readStream替换为read并调整startingOffsetsendingOffsets.

val df = spark
  .read
  .format("kafka")
  ... // Remaining options
  .load()

您可以在 Kafka streaming documentation 中找到示例。

对于流式查询,您可以使用 once trigger,尽管在这种情况下它可能不是最佳选择:

df.writeStream
  .trigger(Trigger.Once)
  ... // Handle the output, for example with foreach sink (?)

您也可以使用标准 Kafka 客户端在不启动的情况下获取一些数据 SparkSession