如何在 Spring Cloud Stream 的事务上下文中使用 MessageChannel?
How to use MessageChannel in transactional context with Spring Cloud Stream?
我正在开发一个应用程序,我从 IBM MQ 队列中读取数据,处理消息,然后将该消息发送到 Kafka 主题。我正在尝试处理我的 Kafka 经纪人出现故障的情况。在这种情况下,我希望我的应用程序回滚事务并尝试再次写入 Kafka 主题进行 X 次重试,之后我会将消息发送到备份队列。但是,我无法在 messageChannel.send 调用中抛出异常。我无法让它超时。应用程序在 messageChannel.send 调用上被无限阻塞。这是我的代码:
@Component
public class MainQueueListener {
@Value("${mq.queueName}")
String queueName;
private ExecutionFlow executionFlow;
public MainQueueListener(final ExecutionFlow executionFlow ) {
this.executionFlow= executionFlow;
}
/**
* Receive message from main queue.
* The containerFactory is defined in infrastructure.jms.JmsComfig
* @param byteMessage JMSBytesMessage
*/
@JmsListener(containerFactory = "jmsFactory", destination = "${mq.queueName}")
public void receiveMessage(JMSBytesMessage byteMessage) {
executionFlow .execute(byteMessage, queueName);
}
}
事务管理器和侦听器容器的配置。
public class JmsConfig {
private JmsErrorHandler errorHandler = new JmsErrorHandler();
/**
* Default JmsListenerContainer could be modified if needed
* @param connectionFactory
*/
@Bean
public JmsListenerContainerFactory<?> jmsFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(errorHandler);
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* Instanciating a JmsTransactionManager: a local transaction manager. It will receive the ConnectionFactory
*/
@Bean
public PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
jmsTransactionManager.setRollbackOnCommitFailure(true);
return jmsTransactionManager;
}
}
执行流程将调用 messageChannel.send :
public void sendMessage(MessageTarget messageTarget) {
MessageChannel messageChannel;
String topicName = messageTarget.getDestination();
switch (topicName) {
case "A":
messageChannel = MessageStreams.outboundMessageA();
break;
case "B":
messageChannel = MessageStreams.outboundMessageB();
break;
default:
throw new RuntimeException("Invalid destination: " + topicName);
}
Message message = MessageBuilder
.withPayload(messageTarget.getResponse())
.build();
System.out.println(messageChannel.send(message,3000)); //stuck here no timeout
}
最后是我的应用程序配置文件:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer:
sync: true
configuration:
acks: all
enable:
idempotence: true
retries: 10
max:
block:
ms: 5000
bindings:
AResponseOutputStreamChannel:
destination: topicA
BResponseOutputStreamChannel:
destination: topicB
mustache:
check-template-location: false
ibm:
mq:
queue-manager: QM1
conn-name: localhost(1414)
channel: DEV.ADMIN.SVRCONN
user: xxx
password: xxx
mq.queueName: Q1
bo-queue: Q2
这是监听器收到消息时的输出,我在调用 messageChannel.send 之前杀死了我的本地 kafka docker 容器。 None 发送调用中的 timeot 或 max.block.ms 参数似乎有所不同。
2019-08-15 20:47:02,365 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:04,371 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:06,281 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:08,391 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:10,399 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:12,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:14,419 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:16,425 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:18,434 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:20,342 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:22,556 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:24,565 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:26,470 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:28,377 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:30,386 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:32,289 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:34,397 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:36,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:38,518 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available
这是一个错误;我看到这个...
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 5000milliseconds while awaiting EndTxn(COMMIT)
失败后,我们试图关闭生产者,但我们调用 close()
方法没有超时,所以它会挂起,直到代理恢复。
我正在开发一个应用程序,我从 IBM MQ 队列中读取数据,处理消息,然后将该消息发送到 Kafka 主题。我正在尝试处理我的 Kafka 经纪人出现故障的情况。在这种情况下,我希望我的应用程序回滚事务并尝试再次写入 Kafka 主题进行 X 次重试,之后我会将消息发送到备份队列。但是,我无法在 messageChannel.send 调用中抛出异常。我无法让它超时。应用程序在 messageChannel.send 调用上被无限阻塞。这是我的代码:
@Component
public class MainQueueListener {
@Value("${mq.queueName}")
String queueName;
private ExecutionFlow executionFlow;
public MainQueueListener(final ExecutionFlow executionFlow ) {
this.executionFlow= executionFlow;
}
/**
* Receive message from main queue.
* The containerFactory is defined in infrastructure.jms.JmsComfig
* @param byteMessage JMSBytesMessage
*/
@JmsListener(containerFactory = "jmsFactory", destination = "${mq.queueName}")
public void receiveMessage(JMSBytesMessage byteMessage) {
executionFlow .execute(byteMessage, queueName);
}
}
事务管理器和侦听器容器的配置。
public class JmsConfig {
private JmsErrorHandler errorHandler = new JmsErrorHandler();
/**
* Default JmsListenerContainer could be modified if needed
* @param connectionFactory
*/
@Bean
public JmsListenerContainerFactory<?> jmsFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(errorHandler);
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* Instanciating a JmsTransactionManager: a local transaction manager. It will receive the ConnectionFactory
*/
@Bean
public PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
jmsTransactionManager.setRollbackOnCommitFailure(true);
return jmsTransactionManager;
}
}
执行流程将调用 messageChannel.send :
public void sendMessage(MessageTarget messageTarget) {
MessageChannel messageChannel;
String topicName = messageTarget.getDestination();
switch (topicName) {
case "A":
messageChannel = MessageStreams.outboundMessageA();
break;
case "B":
messageChannel = MessageStreams.outboundMessageB();
break;
default:
throw new RuntimeException("Invalid destination: " + topicName);
}
Message message = MessageBuilder
.withPayload(messageTarget.getResponse())
.build();
System.out.println(messageChannel.send(message,3000)); //stuck here no timeout
}
最后是我的应用程序配置文件:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer:
sync: true
configuration:
acks: all
enable:
idempotence: true
retries: 10
max:
block:
ms: 5000
bindings:
AResponseOutputStreamChannel:
destination: topicA
BResponseOutputStreamChannel:
destination: topicB
mustache:
check-template-location: false
ibm:
mq:
queue-manager: QM1
conn-name: localhost(1414)
channel: DEV.ADMIN.SVRCONN
user: xxx
password: xxx
mq.queueName: Q1
bo-queue: Q2
这是监听器收到消息时的输出,我在调用 messageChannel.send 之前杀死了我的本地 kafka docker 容器。 None 发送调用中的 timeot 或 max.block.ms 参数似乎有所不同。
2019-08-15 20:47:02,365 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:04,371 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:06,281 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:08,391 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:10,399 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:12,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:14,419 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:16,425 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:18,434 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:20,342 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:22,556 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:24,565 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:26,470 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:28,377 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:30,386 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:32,289 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:34,397 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:36,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:38,518 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available
这是一个错误;我看到这个...
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 5000milliseconds while awaiting EndTxn(COMMIT)
失败后,我们试图关闭生产者,但我们调用 close()
方法没有超时,所以它会挂起,直到代理恢复。