跟踪 Spark 结构化流中消耗的消息
Track of consumed messages in Spark structured streaming
我想设置配置,让我的应用程序跟踪来自 kafka 的消费消息。因此,无论何时失败,它都可以从上次提交或消耗的偏移量开始选择。
readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();
我在网上看到 checkpointlocation
属性 可以设置,spark 可以使用它来跟踪偏移量。
想知道我可以在哪里设置这个 属性 吗?我可以在上面的代码中设置 option
吗?我可以知道如何正确设置它吗?
其次,我无法理解trigger(Trigger.Continuous("1 second"))
属性。文档说 continuous processing engine will record the progress of the query every second
,它在读取来自 kafka 的消息时记录了什么样的进度?
您可以将检查点位置设置为 writeStream
中的一个选项:
[...]
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.option("checkpointLocation", "/path/to/dir")
.trigger(Trigger.Continuous("1 second"))
.start();
从 Kafka 读取时跟踪进度意味着跟踪 TopicPartition 中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为 JSON 对象存储在给定路径中,例如
{
"topic1":{
"0":11,
"1":101
}
}
这意味着应用程序已经使用了主题 topic1
的分区 0
中的偏移量 10 和分区 1
中的偏移量 100。检查点是“提前”写入的(使用预写日志),因此应用程序将继续从 Kafka 读取消息,它在有意或无意(失败)重启之前停止的地方。
Trigger.Continuous
从 Spark 版本 2.3 开始可用。并且截至目前标记为 实验性 。与 micro-batch 方法相比,它会在 Kafka 中的每条消息到达主题时立即获取它,而无需尝试将其与其他消息进行批处理。这可以改善延迟,但很可能会降低整体吞吐量。
参数(例如1 seconds
)决定检查点的频率。
使用此触发模式时,重要的是至少要有与主题有分区一样多的可用内核。否则,申请将不会取得任何进展。您可以阅读更多相关信息 here:
"For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress."
我想设置配置,让我的应用程序跟踪来自 kafka 的消费消息。因此,无论何时失败,它都可以从上次提交或消耗的偏移量开始选择。
readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();
我在网上看到 checkpointlocation
属性 可以设置,spark 可以使用它来跟踪偏移量。
想知道我可以在哪里设置这个 属性 吗?我可以在上面的代码中设置 option
吗?我可以知道如何正确设置它吗?
其次,我无法理解trigger(Trigger.Continuous("1 second"))
属性。文档说 continuous processing engine will record the progress of the query every second
,它在读取来自 kafka 的消息时记录了什么样的进度?
您可以将检查点位置设置为 writeStream
中的一个选项:
[...]
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.option("checkpointLocation", "/path/to/dir")
.trigger(Trigger.Continuous("1 second"))
.start();
从 Kafka 读取时跟踪进度意味着跟踪 TopicPartition 中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为 JSON 对象存储在给定路径中,例如
{
"topic1":{
"0":11,
"1":101
}
}
这意味着应用程序已经使用了主题 topic1
的分区 0
中的偏移量 10 和分区 1
中的偏移量 100。检查点是“提前”写入的(使用预写日志),因此应用程序将继续从 Kafka 读取消息,它在有意或无意(失败)重启之前停止的地方。
Trigger.Continuous
从 Spark 版本 2.3 开始可用。并且截至目前标记为 实验性 。与 micro-batch 方法相比,它会在 Kafka 中的每条消息到达主题时立即获取它,而无需尝试将其与其他消息进行批处理。这可以改善延迟,但很可能会降低整体吞吐量。
参数(例如1 seconds
)决定检查点的频率。
使用此触发模式时,重要的是至少要有与主题有分区一样多的可用内核。否则,申请将不会取得任何进展。您可以阅读更多相关信息 here:
"For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress."