Solace 是否因为不确认先前的消息而违反了 JMS 规范?
Does Solace violate JMS spec by not acknowledging previous messages?
JMS 1.1 规范第 4.4.11 节说,“承认一个
consumed message 自动确认收到所有消息
已由其会话交付。"
但是,这不是我观察到的 Solace 行为。我编写了以下 100 行的程序,该程序发送 20 条消息,然后读取消息并在确认和删除消息之间交替。结果是所有偶数编号的消息都保留在队列中。
那么 Solace 是否违反了 JMS 规范,还是我遗漏了什么?
package com.example;
import java.util.function.Predicate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
public class SolaceAckTest {
private static final String host = "localhost";
private static final String username = "MyUser";
private static final String password = "MyPassword";
private static final String COUNTER_PROPERTY_NAME = "MyCounter";
private static final String QUEUE_NAME = "MATCHED_1";
private static final int NUM_MESSAGES_TO_SEND = 20;
private static final long SENDING_INTERVAL_IN_MILLISECONDS = 100;
/**
* Determines on which messages we should call
* {@link Message#acknowledge()}.
*/
private static final Predicate<Message> SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE = new Predicate<Message>() {
@Override
public boolean test(Message m) {
try {
return (m.getIntProperty(COUNTER_PROPERTY_NAME) % 2) == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
};
public static void main(String[] args) throws Exception {
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setRespectTTL(true);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
Destination requestDest = queueSession.createQueue(QUEUE_NAME);
queueSession.createConsumer(requestDest).setMessageListener(new MessageListenerThatAcknowledgesSomeMessages());
MessageProducer messageProducer = queueSession.createProducer(requestDest);
queueConnection.start();
for (int counter = 1; counter <= NUM_MESSAGES_TO_SEND; counter++) {
TextMessage msg = queueSession.createTextMessage();
msg.setText("Message #" + counter);
msg.setIntProperty(COUNTER_PROPERTY_NAME, counter);
messageProducer.send(msg);
Thread.sleep(SENDING_INTERVAL_IN_MILLISECONDS);
}
// Prevent the program from terminating.
Thread.sleep(1000);
}
/**
* A listener that calls {@link Message#acknowledge()} only on messages that
* meet the criteria specified by
* {@link SolaceAckTest#SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE}.
*/
private static class MessageListenerThatAcknowledgesSomeMessages implements MessageListener {
public MessageListenerThatAcknowledgesSomeMessages() {
}
@Override
public void onMessage(Message msg) {
try {
final String text = ((TextMessage) msg).getText();
if (SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE.test(msg)) {
msg.acknowledge();
System.out.println("Acknowledging message: " + text);
} else {
System.out.println("Not acknowledging message: " + text);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
我相信蒂姆是正确的。在您的示例中,您似乎使用的是 Solace 扩展而不是标准的 JMS 客户端确认模式。请尝试在创建会话时指定标准 JMS 客户端确认模式。例如:
QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
您最初指定的 SOL_CLIENT_ACKNOWLEDGE 扩展允许您确认特定消息而无需隐式确认会话收到的所有其他消息。如果您有多个工作线程处理来自会话的消息,这将很有用。每个线程在完成处理后都可以确认其消息,而无需隐式确认其他线程正在处理的消息。
JMS 1.1 规范第 4.4.11 节说,“承认一个 consumed message 自动确认收到所有消息 已由其会话交付。"
但是,这不是我观察到的 Solace 行为。我编写了以下 100 行的程序,该程序发送 20 条消息,然后读取消息并在确认和删除消息之间交替。结果是所有偶数编号的消息都保留在队列中。
那么 Solace 是否违反了 JMS 规范,还是我遗漏了什么?
package com.example;
import java.util.function.Predicate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
public class SolaceAckTest {
private static final String host = "localhost";
private static final String username = "MyUser";
private static final String password = "MyPassword";
private static final String COUNTER_PROPERTY_NAME = "MyCounter";
private static final String QUEUE_NAME = "MATCHED_1";
private static final int NUM_MESSAGES_TO_SEND = 20;
private static final long SENDING_INTERVAL_IN_MILLISECONDS = 100;
/**
* Determines on which messages we should call
* {@link Message#acknowledge()}.
*/
private static final Predicate<Message> SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE = new Predicate<Message>() {
@Override
public boolean test(Message m) {
try {
return (m.getIntProperty(COUNTER_PROPERTY_NAME) % 2) == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
};
public static void main(String[] args) throws Exception {
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setRespectTTL(true);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
Destination requestDest = queueSession.createQueue(QUEUE_NAME);
queueSession.createConsumer(requestDest).setMessageListener(new MessageListenerThatAcknowledgesSomeMessages());
MessageProducer messageProducer = queueSession.createProducer(requestDest);
queueConnection.start();
for (int counter = 1; counter <= NUM_MESSAGES_TO_SEND; counter++) {
TextMessage msg = queueSession.createTextMessage();
msg.setText("Message #" + counter);
msg.setIntProperty(COUNTER_PROPERTY_NAME, counter);
messageProducer.send(msg);
Thread.sleep(SENDING_INTERVAL_IN_MILLISECONDS);
}
// Prevent the program from terminating.
Thread.sleep(1000);
}
/**
* A listener that calls {@link Message#acknowledge()} only on messages that
* meet the criteria specified by
* {@link SolaceAckTest#SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE}.
*/
private static class MessageListenerThatAcknowledgesSomeMessages implements MessageListener {
public MessageListenerThatAcknowledgesSomeMessages() {
}
@Override
public void onMessage(Message msg) {
try {
final String text = ((TextMessage) msg).getText();
if (SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE.test(msg)) {
msg.acknowledge();
System.out.println("Acknowledging message: " + text);
} else {
System.out.println("Not acknowledging message: " + text);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
我相信蒂姆是正确的。在您的示例中,您似乎使用的是 Solace 扩展而不是标准的 JMS 客户端确认模式。请尝试在创建会话时指定标准 JMS 客户端确认模式。例如:
QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
您最初指定的 SOL_CLIENT_ACKNOWLEDGE 扩展允许您确认特定消息而无需隐式确认会话收到的所有其他消息。如果您有多个工作线程处理来自会话的消息,这将很有用。每个线程在完成处理后都可以确认其消息,而无需隐式确认其他线程正在处理的消息。