如何让客户端确认在 NMS 中工作?
how to get ClientAcknowledgement to work in NMS?
我的理解是,当在生产者中指定 AcknowledgementMode.ClientAcknowledge 时,我应该能够停止并重新启动消费者,并且代理将重新发送关于该主题的所有未确认消息。但是我无法让它工作。 (我也试过在消费者中使用 onMessage 侦听器,但我得到了相同的行为。)任何帮助表示赞赏。这是我的制作人:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "producer";
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge)) {
IDestination destination = SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic);
IMessageProducer producer = session.CreateProducer(destination);
while (!Console.KeyAvailable) {
string message = "[" + DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss.fff") + "]";
ITextMessage xtext_message = session.CreateTextMessage(message);
producer.Send(xtext_message, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(1, 0, 0));
Console.WriteLine("Sent message:" + message);
Thread.Sleep(1000);
}
}
}
}
这是我的消费者:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "consumer";
connection.Start();
using (ISession session = connection.CreateSession()) {
IMessageConsumer consumer = session.CreateConsumer(SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic));
while (!Console.KeyAvailable) {
ITextMessage receivedMsg = consumer.Receive() as ITextMessage;
if (receivedMsg != null) {
Console.WriteLine("--> received: " + receivedMsg.Text);
}
}
}
}
}
Client Acknowledgement 对 Message producer 没有影响,他们向 broker 发送消息并以相同的方式等待 broker 无论如何(只有事务会话模式对生产者有任何影响)。确认模式会影响消息的使用者,在客户端确认的情况下,需要专门确认消息才能从队列中使用它们。
如果使用客户端确认模式的消费者未能确认并关闭,则传递给它的消息将被分派给另一个消费者(如果存在)或将简单地保留在队列中直到另一个消费者到达。
我的理解是,当在生产者中指定 AcknowledgementMode.ClientAcknowledge 时,我应该能够停止并重新启动消费者,并且代理将重新发送关于该主题的所有未确认消息。但是我无法让它工作。 (我也试过在消费者中使用 onMessage 侦听器,但我得到了相同的行为。)任何帮助表示赞赏。这是我的制作人:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "producer";
using (ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge)) {
IDestination destination = SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic);
IMessageProducer producer = session.CreateProducer(destination);
while (!Console.KeyAvailable) {
string message = "[" + DateTime.UtcNow.ToString("yyyy/MM/dd HH:mm:ss.fff") + "]";
ITextMessage xtext_message = session.CreateTextMessage(message);
producer.Send(xtext_message, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(1, 0, 0));
Console.WriteLine("Sent message:" + message);
Thread.Sleep(1000);
}
}
}
}
这是我的消费者:
static void Main(string[] args) {
IConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection()) {
connection.ClientId = "consumer";
connection.Start();
using (ISession session = connection.CreateSession()) {
IMessageConsumer consumer = session.CreateConsumer(SessionUtil.GetDestination(session, "MY_TOPIC", DestinationType.Topic));
while (!Console.KeyAvailable) {
ITextMessage receivedMsg = consumer.Receive() as ITextMessage;
if (receivedMsg != null) {
Console.WriteLine("--> received: " + receivedMsg.Text);
}
}
}
}
}
Client Acknowledgement 对 Message producer 没有影响,他们向 broker 发送消息并以相同的方式等待 broker 无论如何(只有事务会话模式对生产者有任何影响)。确认模式会影响消息的使用者,在客户端确认的情况下,需要专门确认消息才能从队列中使用它们。
如果使用客户端确认模式的消费者未能确认并关闭,则传递给它的消息将被分派给另一个消费者(如果存在)或将简单地保留在队列中直到另一个消费者到达。