如何强制断开与 Stomp.py 的 ActiveMQ 连接

How to force disconnect from ActiveMQ connection with Stomp.py

使用持久连接侦听消息队列时,侦听器出现错误。我通过点击 CTRL-Z 退出程序来模拟这个。尝试重新连接时出现错误:

on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
        at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:197)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory.doConsume(StompSslTransportFactory.java:70)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:745)
"

我尝试使用以下方法取消订阅并断开连接,但它不会断开连接。

class MyListener(stomp.ConnectionListener):
    """This is a listener class that listens for new messages using the STOMP protocol"""

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination="/topic/bmrsTopic",
                id=self.client_id
            )
        except:
            print('unsubscribe failed')
        # first disconnect before trying to reconnect
        print('first disconnect before trying to reconnect')
        try:
            self.conn.disconnect()
        except:
            print('disconnect failed')

如何强制 AMQ 服务器忘记我之前的连接?

您不能让代理断开其他客户端资源与另一个连接的连接。相反,您应该为客户端和代理配置一个空闲超时值的连接,以便双方都能处理远程丢弃,而套接字不会检测到关闭。

您还可以为不通告心跳的 stomp 客户端配置具有心跳宽限期值的代理传输连接器,请参阅 ActiveMQ 代理 STOMP documentation

STOMP specification 概述了心跳的工作原理: