RabbitTransactionManager 无法在 ChainedTransactionManager 提交事务

RabbitTransactionManager cannot commit transaction at ChainedTransactionManager

我正在尝试为 Rabbit 和 Kafka 使用一个事务管理器。首先,我从 Rabbit 回调中收到一条消息,然后将其发送到 Kafka 主题。但我总是得到一个异常,表明 Rabbit 无法正确完成交易:

2019-11-14 16:02:46.572 ERROR 15640 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Could not configure the channel to receive publisher confirms java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
    at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:602)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:582)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access0(CachingConnectionFactory.java:99)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1053)
    at com.sun.proxy.$Proxy124.txCommit(Unknown Source)
    at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164)
    at org.springframework.amqp.rabbit.transaction.RabbitTransactionManager.doCommit(RabbitTransactionManager.java:187)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:746)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
    at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
    at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:532)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:304)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.listener.ExecutionCallbackListener$$EnhancerBySpringCGLIB$b575a95.receiveCallback(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1445)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1368)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1355)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1334)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access0(SimpleMessageListenerContainer.java:77)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode, class-id=85, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    ... 37 common frames omitted

这里是Rabbit监听器出现问题的方法:

@RabbitListener(queues = ["${queue}"])
@Transactional("chainedTransactionManager")
fun receiveCallback(message: Message<List<CallbackMessage>>) {
    traceMessage(message)
    val callbacks = message.payload
    callbacks.forEach { callback ->
        kafkaService.sendAfterCallBack(Object())
    }
}

KafkaService中的方法:

@Transactional("chainedTransactionManager")
fun sendAfterCallBack(object: Object) {
    convertAndSend(kafkaServiceProperties.topics.name, object)
}

这里是TransactionManager配置:

@Configuration
class TransactionManagerConfiguration {

    @Bean
    fun chainedTransactionManager(
            rabbitTransactionManager: RabbitTransactionManager,
            kafkaTransactionManager: KafkaTransactionManager<*, *>
    ): ChainedTransactionManager {
        return ChainedTransactionManager(kafkaTransactionManager, rabbitTransactionManager)
    }
}

兔子配置:

@Configuration
@EnableRabbit
@Import(RabbitAutoCreationConfiguration::class)
class RabbitConfiguration(
        private val integrationProperties: IntegrationProperties,
        private var clientProperties: RabbitClientProperties,
        private val jacksonObjectMapper: ObjectMapper
) : RabbitListenerConfigurer {

    @Bean
    fun rabbitListenerContainerFactory(connectionFactory: ConnectionFactory): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        factory.setConnectionFactory(connectionFactory)
        factory.setErrorHandler { t -> throw AmqpRejectAndDontRequeueException(t) }

        return factory
    }

    @Bean
    fun messageConverter(): MessageConverter {
        val messageConverter = MappingJackson2MessageConverter()
        messageConverter.objectMapper = jacksonObjectMapper

        return messageConverter
    }

    @Bean
    fun messageHandlerFactory(): MessageHandlerMethodFactory {
        val factory = DefaultMessageHandlerMethodFactory()
        factory.setMessageConverter(messageConverter())
        return factory
    }

    @Bean
    @ConditionalOnBean(CachingConnectionFactory::class)
    fun rabbitConnectionFactoryCustomizer(factory: CachingConnectionFactory): SmartInitializingSingleton {
        return SmartInitializingSingleton {
            factory.rabbitConnectionFactory.clientProperties.apply {
                clientProperties.copyright?.let { put("copyright", it) }

                put("os", System.getProperty("os.name"))
                put("host", InetAddress.getLocalHost().hostName)

                clientProperties.platform?.let { put("platform", it) }
                clientProperties.product?.let { put("product", it) }
                clientProperties.service?.let { put("service", it) }
            }
        }
    }

    override fun configureRabbitListeners(registrar: RabbitListenerEndpointRegistrar?) {
        registrar!!.messageHandlerMethodFactory = messageHandlerFactory()
    }

    @Bean
    fun rabbitTemplate(
            connectionFactory: ConnectionFactory,
            jsonObjectMapper: ObjectMapper
    ): RabbitTemplate {
        val rabbitTemplate = RabbitTemplate(connectionFactory)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(SimpleRetryPolicy(integrationProperties.callbackRetry))
        rabbitTemplate.setRetryTemplate(retryTemplate)
        rabbitTemplate.isChannelTransacted = true

        return rabbitTemplate
    }

    @Bean
    fun rabbitTransactionManager(connectionFactory: ConnectionFactory): RabbitTransactionManager {
        val rtm = RabbitTransactionManager(connectionFactory)
        rtm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
        return rtm
    }
}

卡夫卡配置:

@Configuration
@EnableKafka
class KafkaConfiguration(
        @Qualifier("kafkaExchangeMessageConverter")
        private val messageConverter: MessagingMessageConverter
) {

    @Bean
    fun kafkaListenerContainerFactory(
            configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
            consumerFactory: ConsumerFactory<Any, Any>
    ): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.setMessageConverter(messageConverter)
        configurer.configure(factory, consumerFactory)

        return factory
    }

    @Bean
    fun adminClient(kafkaAdmin: KafkaAdmin): AdminClient = AdminClient.create(kafkaAdmin.config)

    @Bean
    fun kafkaTransactionManager(
            producerFactory: ProducerFactory<*, *>
    ): KafkaTransactionManager<*, *> {
        val ktm = KafkaTransactionManager(producerFactory)

        ktm.transactionSynchronization = AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION

        return ktm
    }
}

我是否遗漏了 RabbitConfiguration 中的某些内容或其他问题?

reply-text=PRECONDITION_FAILED - cannot switch from tx to confirm mode,

您不能在同一频道上使用发布者确认和交易。关闭发布者确认。

此外,最好将链式事务管理器注入到侦听器容器中,而不是使用 @Transactional