如何在 Spring Cloud Stream 的事务上下文中使用 MessageChannel?

How to use MessageChannel in transactional context with Spring Cloud Stream?

我正在开发一个应用程序,我从 IBM MQ 队列中读取数据,处理消息,然后将该消息发送到 Kafka 主题。我正在尝试处理我的 Kafka 经纪人出现故障的情况。在这种情况下,我希望我的应用程序回滚事务并尝试再次写入 Kafka 主题进行 X 次重试,之后我会将消息发送到备份队列。但是,我无法在 messageChannel.send 调用中抛出异常。我无法让它超时。应用程序在 messageChannel.send 调用上被无限阻塞。这是我的代码:

@Component
public class MainQueueListener {

    @Value("${mq.queueName}")
    String queueName;
    private ExecutionFlow executionFlow;

    public MainQueueListener(final ExecutionFlow executionFlow ) {
        this.executionFlow= executionFlow;
    }

    /**
     * Receive message from main queue.
     * The containerFactory is defined in infrastructure.jms.JmsComfig
     * @param byteMessage JMSBytesMessage 
     */
    @JmsListener(containerFactory = "jmsFactory", destination = "${mq.queueName}")
    public void receiveMessage(JMSBytesMessage byteMessage) {
        executionFlow .execute(byteMessage, queueName);
    }
}

事务管理器和侦听器容器的配置。

public class JmsConfig {

    private JmsErrorHandler errorHandler = new JmsErrorHandler();

    /**
     * Default JmsListenerContainer could be modified if needed
     * @param connectionFactory 
     */

    @Bean
    public JmsListenerContainerFactory<?> jmsFactory(ConnectionFactory connectionFactory,
                                                     DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setErrorHandler(errorHandler);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    /**
     * Instanciating a JmsTransactionManager: a local transaction manager. It will receive the ConnectionFactory
     */
    @Bean
    public PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
        jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        jmsTransactionManager.setRollbackOnCommitFailure(true);
        return jmsTransactionManager;
    }
}

执行流程将调用 messageChannel.send :

 public void sendMessage(MessageTarget messageTarget) {

        MessageChannel messageChannel;
        String topicName = messageTarget.getDestination();
        switch (topicName) {
            case "A":
                messageChannel = MessageStreams.outboundMessageA();
                break;
            case "B":
                messageChannel = MessageStreams.outboundMessageB();
                break;
            default:
                throw new RuntimeException("Invalid destination: " + topicName);
        }

            Message message = MessageBuilder
                    .withPayload(messageTarget.getResponse())
                    .build();
            System.out.println(messageChannel.send(message,3000)); //stuck here no timeout
    }

最后是我的应用程序配置文件:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: txn.
            producer:
              sync: true
              configuration:
                acks: all
                enable:
                  idempotence: true
                retries: 10
                max:
                  block:
                    ms: 5000
      bindings:
        AResponseOutputStreamChannel:
          destination: topicA
        BResponseOutputStreamChannel:
          destination: topicB

  mustache:
    check-template-location: false


ibm:
  mq:
    queue-manager: QM1
    conn-name: localhost(1414)
    channel: DEV.ADMIN.SVRCONN
    user: xxx
    password: xxx

mq.queueName: Q1
bo-queue: Q2

这是监听器收到消息时的输出,我在调用 messageChannel.send 之前杀死了我的本地 kafka docker 容器。 None 发送调用中的 timeot 或 max.block.ms 参数似乎有所不同。

2019-08-15 20:47:02,365 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:04,371 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:06,281 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:08,391 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:10,399 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:12,408 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:14,419 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:16,425 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:18,434 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:20,342 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:22,556 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:24,565 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:26,470 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:28,377 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:30,386 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:32,289 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:34,397 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:36,408 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:38,518 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available

这是一个错误;我看到这个...

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 5000milliseconds while awaiting EndTxn(COMMIT)

失败后,我们试图关闭生产者,但我们调用 close() 方法没有超时,所以它会挂起,直到代理恢复。