我如何测试在我的 kafka 流应用程序中工作的 Exactly Once 语义
how do i test Exactly Once Semantics working in my kafka streams application
我有一个 Kafka Streams DSL 应用程序,我们要求只处理一次,为此我添加了配置
streamConfig.put(processing.gurantee, "exactly_once");
我使用的是kafka 2.7
我有 2 个查询
- exactly_once和exactly_once_beta
有什么区别
- 我如何测试此功能以确保我的消息只被处理一次
谢谢!
exactly_once_beta
是对 exactly_once
的改进。 exactly_once
为每个流任务(子拓扑和输入分区的组合)使用事务生产者,exactly_once_beta
为 Kafka Streams 客户端的每个流线程使用事务生产者。
每个生产者都带有单独的内存缓冲区、单独的线程、单独的网络连接,这可能会限制扩展输入分区的数量(即任务数量)。大量的生产者也可能会给经纪人带来更多的负担。因此,exactly_once_beta
具有更好的缩放特性。您可以在 KIP-447.
中找到更多详细信息
请注意,exactly_once
将被弃用,exactly_once_beta
将在 Apache Kafka 3.0 中重命名为 exactly_once_v2
。有关详细信息,请参阅 KIP-732。
对于测试,您可以从 Apache Kafka 存储库中的测试中获得灵感:
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_eos_test.py
基本上,您需要创建故障转移方案并验证不会多次向输出主题生成消息。请注意,消息可能会被处理多次,但输出主题中的结果必须显示为好像只被处理过一次。您可以在此处找到有关 exactly-once 语义的非常好的讨论,它还解释了故障转移场景:https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka/
我有一个 Kafka Streams DSL 应用程序,我们要求只处理一次,为此我添加了配置
streamConfig.put(processing.gurantee, "exactly_once");
我使用的是kafka 2.7 我有 2 个查询
- exactly_once和exactly_once_beta 有什么区别
- 我如何测试此功能以确保我的消息只被处理一次
谢谢!
exactly_once_beta
是对 exactly_once
的改进。 exactly_once
为每个流任务(子拓扑和输入分区的组合)使用事务生产者,exactly_once_beta
为 Kafka Streams 客户端的每个流线程使用事务生产者。
每个生产者都带有单独的内存缓冲区、单独的线程、单独的网络连接,这可能会限制扩展输入分区的数量(即任务数量)。大量的生产者也可能会给经纪人带来更多的负担。因此,exactly_once_beta
具有更好的缩放特性。您可以在 KIP-447.
请注意,exactly_once
将被弃用,exactly_once_beta
将在 Apache Kafka 3.0 中重命名为 exactly_once_v2
。有关详细信息,请参阅 KIP-732。
对于测试,您可以从 Apache Kafka 存储库中的测试中获得灵感:
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
- https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_eos_test.py
基本上,您需要创建故障转移方案并验证不会多次向输出主题生成消息。请注意,消息可能会被处理多次,但输出主题中的结果必须显示为好像只被处理过一次。您可以在此处找到有关 exactly-once 语义的非常好的讨论,它还解释了故障转移场景:https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka/