网络断开后 ActiveMQ 5 重新传送不起作用
ActiveMQ 5 redelivery not working after network disconnect
框架:
- Java 1.7.0_191 和 1.8.0_181
- Spring 4.3.18.RELEASE
- ActiveMQ 5.14.5
场景:
两个客户端和一个服务器。客户端 1 失去连接(由于网络断开)。客户端 2 发送并使用一条消息。客户端 1 重新获得网络连接。现在我期待将消息重新传递给客户端 1。虽然客户端 1 现在工作正常(获取所有新消息),但是客户端 1 没有收到更新消息,所以客户端不再可信。
这是设计使然还是我配置有误?
ServerBroker:
final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientBroker", host, port);
final BrokerService brokerService = BrokerFactory.createBroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
loggingBrokerPlugin.setLogConsumerEvents(true);
loggingBrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new BrokerPlugin[] { loggingBrokerPlugin });
brokerService.start();
ServerProducer:
final JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setTimeToLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("tcp://%s:%s", host, port));
jmsTemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmsTemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE, new NotificationQueueEntry());
客户端消费者:
@JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
process(msg);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnectionFactory());
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConcurrency("2");
return factory;
}
private ConnectionFactory jmsConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d", host, port, 600_000));
final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(10_000L);
redeliveryPolicy.setRedeliveryDelay(1_000L);
redeliveryPolicy.setMaximumRedeliveries(600);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setTransportListener(new TransportListener() {
@Override
public void onCommand(final Object command) {
}
@Override
public void onException(final IOException error) {
connected = false;
}
@Override
public void transportInterupted() {
connected = false;
}
@Override
public void transportResumed() {
if (!connected) {
reconnect();
}
}
});
return new PooledConnectionFactory(activeMQConnectionFactory);
}
connected和reconnect()只是为了在客户端可见的显示断开状态,而不是主动重新连接ActiveMQ连接。
日志:
2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 failed: java.net.SocketException: Connection reset
2020-09-09 11:39:52,416 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-1-ip>:54049@1079
2020-09-09 11:39:52,417 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.b.TransportConnection:1233 Cleaning up connection resources: tcp://<client-1-ip>:54049
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
2020-09-09 11:39:56,343 [server-pool-1-thread-16] INFO m.p.s.ServerProducerImplementation:268 Send message to client-2:1079. Data: NotificationQueueEntry
2020-09-09 11:39:56,343 [ActiveMQ Transport: tcp:///<client-2-ip>:63125@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:257 Removing Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:5:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2020-09-09 11:39:56,345 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-2-ip>:63125@1079
2020-09-09 11:39:56,345 [server-pool-1-thread-16] DEBUG o.a.a.util.ThreadPoolUtils:155 Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@f78a38[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
2020-09-09 11:39:56,346 [server-pool-1-thread-16] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp://client-2/<client-2-ip>:1079@63125
2020-09-09 11:39:56,348 [server-pool-1-thread-16] DEBUG o.a.a.t.WireFormatNegotiator:82 Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp://client-2/<client-2-ip>:1079@52652 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-2-ip>:52652@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp://client-2/<client-2-ip>:1079@52652 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-2-ip>:52652@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,354 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:192 Adding Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2020-09-09 11:39:56,357 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:285 Sending message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@11eb3c5, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2020-09-09 11:39:56,358 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:428 preProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,359 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:436 postProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 1599644396359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,419 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:157 Acknowledging message for client ID: ID:client-2-52500-1599644321472-0:1, ID:client-2-61550-1599640739463-4:6:1:1:1
2020-09-09 11:39:56,421 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] DEBUG o.a.a.t.LocalTransaction:48 commit: TX:ID:client-2-52500-1599644321472-1:1:1 syncCount: 1
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-1-ip>:50446@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:41:04,216 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-1-ip>:50446@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
这是 non-durable 主题订阅者的预期行为。当这样的订阅者未连接时,它将不会收到发送给代理的任何消息。
还值得注意的是,这并不是通常所说的“重新投递”。消息重新传递是指,例如,消息在事务中被使用并且该事务被回滚,然后消息被重新传递给客户端以重试。
框架:
- Java 1.7.0_191 和 1.8.0_181
- Spring 4.3.18.RELEASE
- ActiveMQ 5.14.5
场景:
两个客户端和一个服务器。客户端 1 失去连接(由于网络断开)。客户端 2 发送并使用一条消息。客户端 1 重新获得网络连接。现在我期待将消息重新传递给客户端 1。虽然客户端 1 现在工作正常(获取所有新消息),但是客户端 1 没有收到更新消息,所以客户端不再可信。
这是设计使然还是我配置有误?
ServerBroker:
final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientBroker", host, port);
final BrokerService brokerService = BrokerFactory.createBroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
loggingBrokerPlugin.setLogConsumerEvents(true);
loggingBrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new BrokerPlugin[] { loggingBrokerPlugin });
brokerService.start();
ServerProducer:
final JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setTimeToLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("tcp://%s:%s", host, port));
jmsTemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmsTemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE, new NotificationQueueEntry());
客户端消费者:
@JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
process(msg);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnectionFactory());
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConcurrency("2");
return factory;
}
private ConnectionFactory jmsConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d", host, port, 600_000));
final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(10_000L);
redeliveryPolicy.setRedeliveryDelay(1_000L);
redeliveryPolicy.setMaximumRedeliveries(600);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setTransportListener(new TransportListener() {
@Override
public void onCommand(final Object command) {
}
@Override
public void onException(final IOException error) {
connected = false;
}
@Override
public void transportInterupted() {
connected = false;
}
@Override
public void transportResumed() {
if (!connected) {
reconnect();
}
}
});
return new PooledConnectionFactory(activeMQConnectionFactory);
}
connected和reconnect()只是为了在客户端可见的显示断开状态,而不是主动重新连接ActiveMQ连接。
日志:
2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 failed: java.net.SocketException: Connection reset
2020-09-09 11:39:52,416 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-1-ip>:54049@1079
2020-09-09 11:39:52,417 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.b.TransportConnection:1233 Cleaning up connection resources: tcp://<client-1-ip>:54049
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
2020-09-09 11:39:56,343 [server-pool-1-thread-16] INFO m.p.s.ServerProducerImplementation:268 Send message to client-2:1079. Data: NotificationQueueEntry
2020-09-09 11:39:56,343 [ActiveMQ Transport: tcp:///<client-2-ip>:63125@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:257 Removing Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:5:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2020-09-09 11:39:56,345 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-2-ip>:63125@1079
2020-09-09 11:39:56,345 [server-pool-1-thread-16] DEBUG o.a.a.util.ThreadPoolUtils:155 Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@f78a38[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
2020-09-09 11:39:56,346 [server-pool-1-thread-16] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp://client-2/<client-2-ip>:1079@63125
2020-09-09 11:39:56,348 [server-pool-1-thread-16] DEBUG o.a.a.t.WireFormatNegotiator:82 Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp://client-2/<client-2-ip>:1079@52652 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-2-ip>:52652@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp://client-2/<client-2-ip>:1079@52652 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-2-ip>:52652@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,354 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:192 Adding Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2020-09-09 11:39:56,357 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:285 Sending message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@11eb3c5, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2020-09-09 11:39:56,358 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:428 preProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,359 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:436 postProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 1599644396359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,419 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:157 Acknowledging message for client ID: ID:client-2-52500-1599644321472-0:1, ID:client-2-61550-1599640739463-4:6:1:1:1
2020-09-09 11:39:56,421 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] DEBUG o.a.a.t.LocalTransaction:48 commit: TX:ID:client-2-52500-1599644321472-1:1:1 syncCount: 1
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-1-ip>:50446@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:41:04,216 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-1-ip>:50446@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
这是 non-durable 主题订阅者的预期行为。当这样的订阅者未连接时,它将不会收到发送给代理的任何消息。
还值得注意的是,这并不是通常所说的“重新投递”。消息重新传递是指,例如,消息在事务中被使用并且该事务被回滚,然后消息被重新传递给客户端以重试。