具有故障转移设置的 ActiveMQ 上未确认的消息不会重新传送给订阅者
Unacknowledged message on ActiveMQ with fail over setup is not redelivered to subscriber
我正在用 C# 实现 Active MQ 发布者和订阅者。我正在使用 Apache.NMS.ActiveMQ .net 客户端库与代理通信。
<package id="Apache.NMS" version="1.7.1" targetFramework="net461" />
<package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" />
ActiveMQ 在 4 个服务器上配置了故障转移设置(.225、.226、.346、.347 - IP 的最后部分供参考)。经纪人 url 看起来像
failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616
这是我发布的方式
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
var connectionFactory = new ConnectionFactory(brokerUrl);
using (var connection = connectionFactory.CreateConnection("conn1", "conn"))
{
connection.ClientId = "TESTPUBLISHER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.Start();
var session = connection.CreateSession();
var topic = new ActiveMQTopic("ACCOUNT.UPDATE");
var producer = session.CreateProducer(topic);
var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST";
var textMessage = producer.CreateTextMessage(msg);
textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE");
textMessage.Properties.SetString("Action", "UPDATE");
textMessage.Properties.SetString("DataContractType", "Account");
producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0));
}
这是我订阅该主题的方式。此代码的设置使得多个共享订阅者可以侦听传入消息。我被告知我必须使用虚拟主题来完成它。因此,我将订阅者配置为使用虚拟主题,并将其托管在 Windows 服务项目中。我使用 Acknowledgment Mode 作为 ClientAcknowledge,这样除非消息被确认,否则它应该继续返回。下面的代码片段仅代表 windows 服务的重要订阅者部分。
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl));
IConnection connection = factory.CreateConnection("conn1", "conn"))
connection.ClientId = "TESTSUBSCRIBER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.ConnectionInterruptedListener += OnConnectionInturrupted;
connection.ExceptionListener += OnConnectionExceptionListener;
connection.ConnectionResumedListener += OnConnectionResumedListener;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE");
ActiveMQTopic topic = new ActiveMQTopic();
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += OnMessage;
private void OnMessage(IMessage message)
{
var payload = ((ITextMessage)message).Text;
Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]");
if(payload != "43342_test")
{
message.Acknowledge();
Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]");
}
}
private void OnConnectionResumedListener()
{
Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER");
}
private void OnConnectionExceptionListener(Exception exception)
{
Log.Error(exception);
}
private void OnConnectionInturrupted()
{
Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER");
}
我可以发布和订阅消息。我 运行 遇到了一个特定案例的问题。假设订户从故障转移服务器池建立到 (.225 代理服务器) 的连接。发布者发布了一条消息。订户收到它,它正在处理中。但由于某些服务器补丁维护,windows 服务不得不关闭。结果,订阅者与代理的连接断开了。当 windows 服务恢复时,这次订阅者建立了与故障转移池中不同代理服务器(.346 代理服务器)的连接。发生这种情况时,未确认的消息再也不会被重新发送。但是如果我重新启动 windows 服务并且幸运的话如果连接建立到 .225 代理(订阅者最初连接到的同一服务器),现在订阅者收到未确认的消息。
我的假设是,当在故障转移设置中配置 ActiveMQ 时,无论订阅服务器能够与故障转移池中的哪个代理服务器建立连接,它都应该始终收到未确认的消息。
在某些情况下,故障转移设置似乎可以正常工作。假设订阅者从故障转移池连接到 .346 代理服务器。发布者从同一个池连接到不同的代理服务器(.225 代理)并发布消息,订阅者正在接收消息。这证明故障转移设置正在运行。
但是一旦订阅者从代理服务器接收到消息,并且如果订阅者在确认消息之前断开连接,则它必须重新建立与同一代理服务器的连接以接收未确认的消息。这听起来不对我。
Active MQ 服务器设置是否需要任何其他配置才能使此用例正常工作?
这个问题的解决方案不是在客户端,而是通过 Active MQ 服务器配置。
对于生产者流量控制目标策略,添加 ConditionalNetworkBridgeFilter 并启用 replayWhenNoConsumers。
我正在用 C# 实现 Active MQ 发布者和订阅者。我正在使用 Apache.NMS.ActiveMQ .net 客户端库与代理通信。
<package id="Apache.NMS" version="1.7.1" targetFramework="net461" />
<package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" />
ActiveMQ 在 4 个服务器上配置了故障转移设置(.225、.226、.346、.347 - IP 的最后部分供参考)。经纪人 url 看起来像
failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616
这是我发布的方式
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
var connectionFactory = new ConnectionFactory(brokerUrl);
using (var connection = connectionFactory.CreateConnection("conn1", "conn"))
{
connection.ClientId = "TESTPUBLISHER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.Start();
var session = connection.CreateSession();
var topic = new ActiveMQTopic("ACCOUNT.UPDATE");
var producer = session.CreateProducer(topic);
var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST";
var textMessage = producer.CreateTextMessage(msg);
textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE");
textMessage.Properties.SetString("Action", "UPDATE");
textMessage.Properties.SetString("DataContractType", "Account");
producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0));
}
这是我订阅该主题的方式。此代码的设置使得多个共享订阅者可以侦听传入消息。我被告知我必须使用虚拟主题来完成它。因此,我将订阅者配置为使用虚拟主题,并将其托管在 Windows 服务项目中。我使用 Acknowledgment Mode 作为 ClientAcknowledge,这样除非消息被确认,否则它应该继续返回。下面的代码片段仅代表 windows 服务的重要订阅者部分。
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl));
IConnection connection = factory.CreateConnection("conn1", "conn"))
connection.ClientId = "TESTSUBSCRIBER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.ConnectionInterruptedListener += OnConnectionInturrupted;
connection.ExceptionListener += OnConnectionExceptionListener;
connection.ConnectionResumedListener += OnConnectionResumedListener;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE");
ActiveMQTopic topic = new ActiveMQTopic();
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += OnMessage;
private void OnMessage(IMessage message)
{
var payload = ((ITextMessage)message).Text;
Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]");
if(payload != "43342_test")
{
message.Acknowledge();
Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]");
}
}
private void OnConnectionResumedListener()
{
Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER");
}
private void OnConnectionExceptionListener(Exception exception)
{
Log.Error(exception);
}
private void OnConnectionInturrupted()
{
Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER");
}
我可以发布和订阅消息。我 运行 遇到了一个特定案例的问题。假设订户从故障转移服务器池建立到 (.225 代理服务器) 的连接。发布者发布了一条消息。订户收到它,它正在处理中。但由于某些服务器补丁维护,windows 服务不得不关闭。结果,订阅者与代理的连接断开了。当 windows 服务恢复时,这次订阅者建立了与故障转移池中不同代理服务器(.346 代理服务器)的连接。发生这种情况时,未确认的消息再也不会被重新发送。但是如果我重新启动 windows 服务并且幸运的话如果连接建立到 .225 代理(订阅者最初连接到的同一服务器),现在订阅者收到未确认的消息。
我的假设是,当在故障转移设置中配置 ActiveMQ 时,无论订阅服务器能够与故障转移池中的哪个代理服务器建立连接,它都应该始终收到未确认的消息。
在某些情况下,故障转移设置似乎可以正常工作。假设订阅者从故障转移池连接到 .346 代理服务器。发布者从同一个池连接到不同的代理服务器(.225 代理)并发布消息,订阅者正在接收消息。这证明故障转移设置正在运行。
但是一旦订阅者从代理服务器接收到消息,并且如果订阅者在确认消息之前断开连接,则它必须重新建立与同一代理服务器的连接以接收未确认的消息。这听起来不对我。
Active MQ 服务器设置是否需要任何其他配置才能使此用例正常工作?
这个问题的解决方案不是在客户端,而是通过 Active MQ 服务器配置。
对于生产者流量控制目标策略,添加 ConditionalNetworkBridgeFilter 并启用 replayWhenNoConsumers。