Spring 生产者端的云流卡夫卡交易
Spring cloud stream kafka transactions in producer side
我们有一个使用 Kafka 的 spring 云流应用程序。要求是在生产者方面,消息列表需要放在事务的主题中。同一个应用程序中的消息没有消费者。当我使用 spring.cloud.stream.kafka.binder.transaction.transaction-id 前缀启动事务时,我遇到了调度程序没有订阅者的错误,并且从主题获得的分区总数小于配置的事务。应用程序无法在事务模式下获取主题的分区。你能告诉我是否遗漏了什么吗?明天我会post详细的日志
谢谢
您需要展示您的代码和配置以及您使用的版本。
仅限生产者的交易是 discussed in the documentation。
Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix to a non-empty value, e.g. tx-. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.* properties; individual binding Kafka producer properties are ignored.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager bean using it.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
Notice that we get a reference to the binder using the BinderFactory; use null in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to the ProducerFactory and create a transaction manager.
Then you would just normal Spring transaction support, e.g. TransactionTemplate or @Transactional, for example:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅限生产者的事务与来自其他事务管理器的事务同步,请使用 ChainedTransactionManager。
我们有一个使用 Kafka 的 spring 云流应用程序。要求是在生产者方面,消息列表需要放在事务的主题中。同一个应用程序中的消息没有消费者。当我使用 spring.cloud.stream.kafka.binder.transaction.transaction-id 前缀启动事务时,我遇到了调度程序没有订阅者的错误,并且从主题获得的分区总数小于配置的事务。应用程序无法在事务模式下获取主题的分区。你能告诉我是否遗漏了什么吗?明天我会post详细的日志
谢谢
您需要展示您的代码和配置以及您使用的版本。
仅限生产者的交易是 discussed in the documentation。
Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix to a non-empty value, e.g. tx-. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.* properties; individual binding Kafka producer properties are ignored.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager bean using it.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
Notice that we get a reference to the binder using the BinderFactory; use null in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to the ProducerFactory and create a transaction manager.
Then you would just normal Spring transaction support, e.g. TransactionTemplate or @Transactional, for example:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅限生产者的事务与来自其他事务管理器的事务同步,请使用 ChainedTransactionManager。