安慰队列中的消息是否需要按照它们在队列中的顺序进行确认?
Do messages on a solace queue need to be acked in the order they are on the queue?
我有工作线程从不同的提供程序 classes 获取消息。每个提供程序 class adds/takes 个内部队列的消息。每个提供者仅迎合一个安慰队列,安慰消费者将消息添加到队列的提供者。
多个 worker 可以获取提供者的消息,处理它们,然后发送消息的 ack(下面的 message.commit() 方法执行 ack)。
场景
- Worker1 从 provider1 获取 message1 进行处理
- Worker2 从 provider1 中获取 message2 进行处理
- Worker2 在 Worker1 之前完成,因此发回消息 2 的确认
问题
- message2 是否仍位于安慰队列中等待 message1 被确认,或者尽管 message1 尚未被确认,message2 是否会从队列中弹出?
- 收到ack后,安慰硬件会发生什么? message2是否被完全移除,队列顺序如何保持?
供应商class
public abstract class BaseProvider implements IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
if (internalQueue.size() > 0) {
Logger.debug("Queue has entries");
CoreMessage msg = null;
try {
msg = internalQueue.take();
} catch (InterruptedException e) {
Logger.warn("Interruption");
e.printStackTrace();
}
if (msg != null) {
arrMessages.add(msg);
}
}
return arrMessages;
}
protected synchronized void addToQueue(CoreMessage message) {
try {
internalQueue.put(message);
} catch (InterruptedException e) {
Logger.error("Exception adding message to queue " + message);
}
}
}
// 有一组工作线程读取这些队列
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
这似乎是您对 Solace API 的自定义包装。
这使得很难回答您的问题,因为我们根本不知道这个包装器在做什么。
以下答案基于以下假设。
包装器正在使用非事务 JCSMPSession
正在使用客户端确认
message.commit()
实际上是调用 Solace 的 XMLMessage.ackMessage()
- 您正在使用独占队列
- Would message2 still sit on the solace queue and wait for message1 to be acked or would message2 be popped off the queue despite message1
not acked yet?
消息 2 将被删除。
- What happens on the solace hardware when the ack is received? Is the message2 removed completely, how is the queue order then
maintained?
消息 2 将被确认并从队列中删除。
队列顺序没有影响。
消息顺序是指传入消息传送到消费应用程序的顺序。
在这种情况下,message1、message2 和 message3 按顺序传送到消费应用程序。
我有工作线程从不同的提供程序 classes 获取消息。每个提供程序 class adds/takes 个内部队列的消息。每个提供者仅迎合一个安慰队列,安慰消费者将消息添加到队列的提供者。
多个 worker 可以获取提供者的消息,处理它们,然后发送消息的 ack(下面的 message.commit() 方法执行 ack)。
场景
- Worker1 从 provider1 获取 message1 进行处理
- Worker2 从 provider1 中获取 message2 进行处理
- Worker2 在 Worker1 之前完成,因此发回消息 2 的确认
问题
- message2 是否仍位于安慰队列中等待 message1 被确认,或者尽管 message1 尚未被确认,message2 是否会从队列中弹出?
- 收到ack后,安慰硬件会发生什么? message2是否被完全移除,队列顺序如何保持?
供应商class
public abstract class BaseProvider implements IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
@Override
public synchronized List<CoreMessage> getNextQueuedItem() {
List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
if (internalQueue.size() > 0) {
Logger.debug("Queue has entries");
CoreMessage msg = null;
try {
msg = internalQueue.take();
} catch (InterruptedException e) {
Logger.warn("Interruption");
e.printStackTrace();
}
if (msg != null) {
arrMessages.add(msg);
}
}
return arrMessages;
}
protected synchronized void addToQueue(CoreMessage message) {
try {
internalQueue.put(message);
} catch (InterruptedException e) {
Logger.error("Exception adding message to queue " + message);
}
}
}
// 有一组工作线程读取这些队列
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
这似乎是您对 Solace API 的自定义包装。 这使得很难回答您的问题,因为我们根本不知道这个包装器在做什么。
以下答案基于以下假设。
包装器正在使用非事务 JCSMPSession
正在使用客户端确认
message.commit()
实际上是调用 Solace 的XMLMessage.ackMessage()
- 您正在使用独占队列
- Would message2 still sit on the solace queue and wait for message1 to be acked or would message2 be popped off the queue despite message1 not acked yet?
消息 2 将被删除。
- What happens on the solace hardware when the ack is received? Is the message2 removed completely, how is the queue order then maintained?
消息 2 将被确认并从队列中删除。
队列顺序没有影响。 消息顺序是指传入消息传送到消费应用程序的顺序。 在这种情况下,message1、message2 和 message3 按顺序传送到消费应用程序。