如何使用 stompit 删除持久订阅
How to remove durable subscription using stompit
我们将 ActiveMQ 5.16.1 与 stompit 客户端结合使用,使用以下代码片段在我们的 NodeJS 应用程序中创建持久订阅:
var connectOptions = {
"host": "",
"port": amqPort,
rejectUnauthorized: false,
checkServerIdentity: () => { },
"connectHeaders": {
"heart-beat": "15000,15000",// hear-beat of 15 seconds
'login': 'admin',
'passcode': 'admin',
'client-id' : "agent-manager"
}
};
var server1 = connectOptions;
server1.host = amqPrimaryHost;
var server2 = connectOptions;
server2.host = amqSecondaryHost;
var amqSubscription;
var subscribeHeaders = {
"destination": "/topic/delivery-reports",
"activemq.subscriptionName": "channel_manager_delivery_reports",
"ack": "client-individual"
};
var connectionManager = new stompit.ConnectFailover([server1,server2], reconnectOptions);
connectionManager.connect(function (error, client, reconnect){
if (error) {
logger.error("Terminal error, gave up reconnecting ", error);
return;
}
client.on("error", function (error) {
if(!client)
reconnect();
});
amqSubscription=client.subscribe(subscribeHeaders, function (error, message,subscription) {
logger.info("going to subscribe")
if (error) {
logger.error("Subscription failed. Going to disconnect", error);
subscription.unsubscribe();
// reconnect();
}
logger.info("subscribed")
});
});
function unsubscribe () {
logger.info("Going to unsub")
amqSubscription.unsubscribe({"activemq.subscriptionName":"channel_manager_delivery_reports"})
};
但是,当我调用 unsubscribe
时,它只会将 Subscriber
活动状态更改为 false,但不会将其从活动订阅者列表中删除,如屏幕截图所示。
在 stomp.logs 中得到以下异常。
2021-05-12 05:20:14,826 [0.1:50251@61613] WARN ProtocolConverter - Exception occurred for client agent-manager (tcp://127.0.0.1:50251) processing: UNSUBSCRIBE -> javax.jms.JMSException: Durable consumer is in use
2021-05-12 05:20:14,826 [0.1:50251@61613] DEBUG ProtocolConverter - Exception detail
javax.jms.JMSException: Durable consumer is in use
at org.apache.activemq.broker.region.TopicRegion.removeSubscription(TopicRegion.java:220)
at org.apache.activemq.broker.region.RegionBroker.removeSubscription(RegionBroker.java:457)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.advisory.AdvisoryBroker.removeSubscription(AdvisoryBroker.java:396)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.TransportConnection.processRemoveSubscription(TransportConnection.java:419)
at org.apache.activemq.command.RemoveSubscriptionInfo.visit(RemoveSubscriptionInfo.java:81)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:200)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:179)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompUnsubscribe(ProtocolConverter.java:714)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-05-12 05:20:14,827 [0.1:50251@61613] TRACE ProtocolConverter - Command that caused the error: UNSUBSCRIBE
activemq.subscriptionName:channel_manager_delivery_reports
receipt:1
id:1
关于如何通过 stompit 正确删除持久订阅的任何建议。
您首先需要断开持久订阅者正在使用的连接。这会停用订阅,并会阻止您看到的 JMSException: Durable consumer is in use
。
然后您需要重新连接,使用您在 CONNECT
框架上用于连接的相同 client-id
header 值订阅。
然后需要像 SUBSCRIBE
帧一样在 UNSUBSCRIBE
帧中传递 activemq.subscriptionName
header,例如:
amqSubscription.unsubscribe({"activemq.subscriptionName": "channel_manager_delivery_reports"})
需要说明的是,这个问题已经通过 AMQ-1890. You can see the corresponding source code which checks for the header and removes the subscription. You can also see the unit test 解决了,它订阅、断开连接、重新连接和取消订阅持久订阅。
如果您仍然遇到问题,那么可能值得使用 these instructions 在代理中打开 STOMP 跟踪日志记录,以确保 UNSUBSCRIBE
帧以预期的 activemq.subscriptionName
header.
我们将 ActiveMQ 5.16.1 与 stompit 客户端结合使用,使用以下代码片段在我们的 NodeJS 应用程序中创建持久订阅:
var connectOptions = {
"host": "",
"port": amqPort,
rejectUnauthorized: false,
checkServerIdentity: () => { },
"connectHeaders": {
"heart-beat": "15000,15000",// hear-beat of 15 seconds
'login': 'admin',
'passcode': 'admin',
'client-id' : "agent-manager"
}
};
var server1 = connectOptions;
server1.host = amqPrimaryHost;
var server2 = connectOptions;
server2.host = amqSecondaryHost;
var amqSubscription;
var subscribeHeaders = {
"destination": "/topic/delivery-reports",
"activemq.subscriptionName": "channel_manager_delivery_reports",
"ack": "client-individual"
};
var connectionManager = new stompit.ConnectFailover([server1,server2], reconnectOptions);
connectionManager.connect(function (error, client, reconnect){
if (error) {
logger.error("Terminal error, gave up reconnecting ", error);
return;
}
client.on("error", function (error) {
if(!client)
reconnect();
});
amqSubscription=client.subscribe(subscribeHeaders, function (error, message,subscription) {
logger.info("going to subscribe")
if (error) {
logger.error("Subscription failed. Going to disconnect", error);
subscription.unsubscribe();
// reconnect();
}
logger.info("subscribed")
});
});
function unsubscribe () {
logger.info("Going to unsub")
amqSubscription.unsubscribe({"activemq.subscriptionName":"channel_manager_delivery_reports"})
};
但是,当我调用 unsubscribe
时,它只会将 Subscriber
活动状态更改为 false,但不会将其从活动订阅者列表中删除,如屏幕截图所示。
在 stomp.logs 中得到以下异常。
2021-05-12 05:20:14,826 [0.1:50251@61613] WARN ProtocolConverter - Exception occurred for client agent-manager (tcp://127.0.0.1:50251) processing: UNSUBSCRIBE -> javax.jms.JMSException: Durable consumer is in use
2021-05-12 05:20:14,826 [0.1:50251@61613] DEBUG ProtocolConverter - Exception detail
javax.jms.JMSException: Durable consumer is in use
at org.apache.activemq.broker.region.TopicRegion.removeSubscription(TopicRegion.java:220)
at org.apache.activemq.broker.region.RegionBroker.removeSubscription(RegionBroker.java:457)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.advisory.AdvisoryBroker.removeSubscription(AdvisoryBroker.java:396)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.BrokerFilter.removeSubscription(BrokerFilter.java:119)
at org.apache.activemq.broker.TransportConnection.processRemoveSubscription(TransportConnection.java:419)
at org.apache.activemq.command.RemoveSubscriptionInfo.visit(RemoveSubscriptionInfo.java:81)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:200)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:179)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompUnsubscribe(ProtocolConverter.java:714)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:251)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
2021-05-12 05:20:14,827 [0.1:50251@61613] TRACE ProtocolConverter - Command that caused the error: UNSUBSCRIBE
activemq.subscriptionName:channel_manager_delivery_reports
receipt:1
id:1
关于如何通过 stompit 正确删除持久订阅的任何建议。
您首先需要断开持久订阅者正在使用的连接。这会停用订阅,并会阻止您看到的 JMSException: Durable consumer is in use
。
然后您需要重新连接,使用您在 CONNECT
框架上用于连接的相同 client-id
header 值订阅。
然后需要像 SUBSCRIBE
帧一样在 UNSUBSCRIBE
帧中传递 activemq.subscriptionName
header,例如:
amqSubscription.unsubscribe({"activemq.subscriptionName": "channel_manager_delivery_reports"})
需要说明的是,这个问题已经通过 AMQ-1890. You can see the corresponding source code which checks for the header and removes the subscription. You can also see the unit test 解决了,它订阅、断开连接、重新连接和取消订阅持久订阅。
如果您仍然遇到问题,那么可能值得使用 these instructions 在代理中打开 STOMP 跟踪日志记录,以确保 UNSUBSCRIBE
帧以预期的 activemq.subscriptionName
header.