如何强制断开与 Stomp.py 的 ActiveMQ 连接
How to force disconnect from ActiveMQ connection with Stomp.py
使用持久连接侦听消息队列时,侦听器出现错误。我通过点击 CTRL-Z 退出程序来模拟这个。尝试重新连接时出现错误:
on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:197)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
at org.apache.activemq.transport.stomp.StompSslTransportFactory.doConsume(StompSslTransportFactory.java:70)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:745)
"
我尝试使用以下方法取消订阅并断开连接,但它不会断开连接。
class MyListener(stomp.ConnectionListener):
"""This is a listener class that listens for new messages using the STOMP protocol"""
def __init__(self, conn):
self.conn = conn
self.client_id = client_id
def on_error(self, headers, message):
self.disconnect()
def on_message(self, headers, message):
...
def on_disconnected(self):
self.disconnect()
def disconnect(self):
try:
self.conn.unsubscribe(
destination="/topic/bmrsTopic",
id=self.client_id
)
except:
print('unsubscribe failed')
# first disconnect before trying to reconnect
print('first disconnect before trying to reconnect')
try:
self.conn.disconnect()
except:
print('disconnect failed')
如何强制 AMQ 服务器忘记我之前的连接?
您不能让代理断开其他客户端资源与另一个连接的连接。相反,您应该为客户端和代理配置一个空闲超时值的连接,以便双方都能处理远程丢弃,而套接字不会检测到关闭。
您还可以为不通告心跳的 stomp 客户端配置具有心跳宽限期值的代理传输连接器,请参阅 ActiveMQ 代理 STOMP documentation。
STOMP specification 概述了心跳的工作原理:
使用持久连接侦听消息队列时,侦听器出现错误。我通过点击 CTRL-Z 退出程序来模拟这个。尝试重新连接时出现错误:
on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:197)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
at org.apache.activemq.transport.stomp.StompSslTransportFactory.doConsume(StompSslTransportFactory.java:70)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:745)
"
我尝试使用以下方法取消订阅并断开连接,但它不会断开连接。
class MyListener(stomp.ConnectionListener):
"""This is a listener class that listens for new messages using the STOMP protocol"""
def __init__(self, conn):
self.conn = conn
self.client_id = client_id
def on_error(self, headers, message):
self.disconnect()
def on_message(self, headers, message):
...
def on_disconnected(self):
self.disconnect()
def disconnect(self):
try:
self.conn.unsubscribe(
destination="/topic/bmrsTopic",
id=self.client_id
)
except:
print('unsubscribe failed')
# first disconnect before trying to reconnect
print('first disconnect before trying to reconnect')
try:
self.conn.disconnect()
except:
print('disconnect failed')
如何强制 AMQ 服务器忘记我之前的连接?
您不能让代理断开其他客户端资源与另一个连接的连接。相反,您应该为客户端和代理配置一个空闲超时值的连接,以便双方都能处理远程丢弃,而套接字不会检测到关闭。
您还可以为不通告心跳的 stomp 客户端配置具有心跳宽限期值的代理传输连接器,请参阅 ActiveMQ 代理 STOMP documentation。
STOMP specification 概述了心跳的工作原理: