如何检查是否在有限的持续时间或记录数内停止从 kafka 主题流式传输?
how to check if stop streaming from kafka topic by a limited time duration or record count?
我的最终目标是查看kafka主题是否运行ning以及其中的数据是否良好,否则失败/抛出错误
如果我只能拉取 100 条消息,或者拉取 60 秒,我想我可以完成我想要的。但是我在网上找到的所有流媒体示例/问题都没有关闭流媒体连接的意图。
这是我迄今为止最好的工作代码,它提取数据并显示它,但它一直在尝试提取更多数据,如果我尝试在下一行访问它,它就没有机会拉数据呢。我想我需要某种回电。有没有人做过类似的事情?这是解决此问题的最佳方法吗?
我正在使用数据块笔记本运行我的代码
import org.apache.spark.sql.functions.{explode, split}
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<kafka server>:9092")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.load()
val df = kafka.select(explode(split($"value".cast("string"), "\s+")).as("word"))
display(df.select($"word"))
诀窍是您根本不需要流式传输。 Kafka源支持批量查询,如果将readStream
替换为read
并调整startingOffsets
和endingOffsets
.
val df = spark
.read
.format("kafka")
... // Remaining options
.load()
您可以在 Kafka streaming documentation 中找到示例。
对于流式查询,您可以使用 once
trigger
,尽管在这种情况下它可能不是最佳选择:
df.writeStream
.trigger(Trigger.Once)
... // Handle the output, for example with foreach sink (?)
您也可以使用标准 Kafka 客户端在不启动的情况下获取一些数据 SparkSession
。
我的最终目标是查看kafka主题是否运行ning以及其中的数据是否良好,否则失败/抛出错误
如果我只能拉取 100 条消息,或者拉取 60 秒,我想我可以完成我想要的。但是我在网上找到的所有流媒体示例/问题都没有关闭流媒体连接的意图。
这是我迄今为止最好的工作代码,它提取数据并显示它,但它一直在尝试提取更多数据,如果我尝试在下一行访问它,它就没有机会拉数据呢。我想我需要某种回电。有没有人做过类似的事情?这是解决此问题的最佳方法吗?
我正在使用数据块笔记本运行我的代码
import org.apache.spark.sql.functions.{explode, split}
val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<kafka server>:9092")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.load()
val df = kafka.select(explode(split($"value".cast("string"), "\s+")).as("word"))
display(df.select($"word"))
诀窍是您根本不需要流式传输。 Kafka源支持批量查询,如果将readStream
替换为read
并调整startingOffsets
和endingOffsets
.
val df = spark
.read
.format("kafka")
... // Remaining options
.load()
您可以在 Kafka streaming documentation 中找到示例。
对于流式查询,您可以使用 once
trigger
,尽管在这种情况下它可能不是最佳选择:
df.writeStream
.trigger(Trigger.Once)
... // Handle the output, for example with foreach sink (?)
您也可以使用标准 Kafka 客户端在不启动的情况下获取一些数据 SparkSession
。