Spring Kafka 和交易
Spring Kafka and Transactions
我想将 Spring Kafka 与事务一起使用,但我真的不明白它应该如何配置以及如何工作。
这是我的配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
此配置用于 DefaultKafkaProducerFactory,事务 ID 前缀为:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
问题 1 :
我应该如何选择这个交易 ID 前缀?
如果我理解正确的话,spring 使用此前缀为每个创建的生产者生成交易 ID。
为什么我们不能只使用“UUID.randomUUID() ?
问题 2 :
如果生产者被销毁,它会生成一个新的交易ID。
因此,如果应用程序崩溃,在重新启动时它将重用旧的事务 ID。
正常吗???
问题 3 :
我正在使用部署在云上的可以自动扩展的应用程序 up/down。
这意味着我的前缀无法修复,因为每个实例上的所有生产者都会有冲突的事务 ID。
我应该在其中添加一个随机部分吗?
当实例缩放 down/up 或崩溃并重新启动时,我是否需要恢复相同的前缀?
问题 4 :
最后但同样重要的是,我们正在为我们的 Kafka 使用凭据。
这似乎不起作用:
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
知道我的交易 ID 已生成,我应该如何设置我的 ACL?
编辑 1
进一步阅读,如果我理解正确。
如果您有一个从 P0(分区)读取的 C0(消费者)。如果代理开始消费者再平衡。
P0 可以分配给另一个消费者 C1。
此消费者 C1 应使用与之前的 C0 相同的交易 ID 以防止重复(僵尸围栏)?
如何在 spring-kafka 中实现这一点?交易 ID 似乎与消费者无关,因此与读取的分区无关。
谢谢
由于僵尸防护,您不能使用随机 TID - 如果服务器崩溃,您可能会在主题中有一个部分事务,该事务将永远不会完成,并且不会从任何其他事务中消耗任何内容为该事务写入分区。
这是设计使然 - 出于上述原因。
同样,你不能随机化;出于上述原因。
例如,Cloud Foundry 有一个指示实例索引的环境变量。如果您使用的云平台不包含类似的东西,您将不得不以某种方式模拟它。然后,在交易id中使用:
spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
ACL - 我无法回答这个问题;我对kafka权限不熟悉;可能最好单独问一个问题。
我认为我们需要向 Spring 添加一些逻辑,以确保相同的交易 ID 始终用于特定的 topic/partition。
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
编辑
自从这个答案 (KIP-447) 以来,情况发生了变化;如果您的经纪人是 2.5.0 或更高版本 - 请参阅。 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-once and https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-once
我想将 Spring Kafka 与事务一起使用,但我真的不明白它应该如何配置以及如何工作。
这是我的配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
此配置用于 DefaultKafkaProducerFactory,事务 ID 前缀为:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
问题 1 :
我应该如何选择这个交易 ID 前缀? 如果我理解正确的话,spring 使用此前缀为每个创建的生产者生成交易 ID。
为什么我们不能只使用“UUID.randomUUID() ?
问题 2 :
如果生产者被销毁,它会生成一个新的交易ID。 因此,如果应用程序崩溃,在重新启动时它将重用旧的事务 ID。
正常吗???
问题 3 :
我正在使用部署在云上的可以自动扩展的应用程序 up/down。 这意味着我的前缀无法修复,因为每个实例上的所有生产者都会有冲突的事务 ID。
我应该在其中添加一个随机部分吗? 当实例缩放 down/up 或崩溃并重新启动时,我是否需要恢复相同的前缀?
问题 4 :
最后但同样重要的是,我们正在为我们的 Kafka 使用凭据。 这似乎不起作用:
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
知道我的交易 ID 已生成,我应该如何设置我的 ACL?
编辑 1
进一步阅读,如果我理解正确。
如果您有一个从 P0(分区)读取的 C0(消费者)。如果代理开始消费者再平衡。 P0 可以分配给另一个消费者 C1。 此消费者 C1 应使用与之前的 C0 相同的交易 ID 以防止重复(僵尸围栏)?
如何在 spring-kafka 中实现这一点?交易 ID 似乎与消费者无关,因此与读取的分区无关。
谢谢
由于僵尸防护,您不能使用随机 TID - 如果服务器崩溃,您可能会在主题中有一个部分事务,该事务将永远不会完成,并且不会从任何其他事务中消耗任何内容为该事务写入分区。
这是设计使然 - 出于上述原因。
同样,你不能随机化;出于上述原因。
例如,Cloud Foundry 有一个指示实例索引的环境变量。如果您使用的云平台不包含类似的东西,您将不得不以某种方式模拟它。然后,在交易id中使用:
spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-
ACL - 我无法回答这个问题;我对kafka权限不熟悉;可能最好单独问一个问题。
我认为我们需要向 Spring 添加一些逻辑,以确保相同的交易 ID 始终用于特定的 topic/partition。
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
编辑
自从这个答案 (KIP-447) 以来,情况发生了变化;如果您的经纪人是 2.5.0 或更高版本 - 请参阅。 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#exactly-once and https://docs.spring.io/spring-kafka/docs/2.6.0-SNAPSHOT/reference/html/#exactly-once