ActiveMQ OnMessage() 方法阻塞其他线程
ActiveMQ OnMessage() method blocks other threads
我正在使用 ActiveMQ 编写应用程序,其中我使用异步 onMessage() 方法从 ActiveMQ 获取消息。
假设我从 activemq 收到 1000 条消息,因此所有消息都将存储在 OnMessage() 方法中的 ConcurrentLinkedQueue 中,并且我使用线程从 ConcurrentLinkedQueue 中检索。
但是我面临的问题是我无法在 ConcurrentLinkedQueue 中添加或检索一条消息,并且来自 onMessage() 的 textMessage 被发送到一个 setter 方法,该方法接受一个 textMessage 但我无法从 getter method.Why 是这样吗?如何避免呢?
代码片段如下
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a producer
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
}
}
}
//Problem here unable to produce
class Producer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}
public void run() {
System.out.println("Producer Started");
try {
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
Thread.currentThread().sleep(200);
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) != null) {
System.out.println("Removed: " + str);
}
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
//}
}
我不知道你为什么这样做,但你的设计有问题,请参阅下面的注释 1-5,注意 QueueMessageListener
是异步执行的,它可以在另一个消费者之前更改 settext.setTextmessage((TextMessage) message);
检索 TextMessage 并将其添加到队列中,因为这可能 V2 更好,但也许使用 org.springframework.jms.listener.DefaultMessageListenerContainer 是最好的解决方案:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a producer
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
// 3- you start consumers go to 4, note that you will only consume count messages !!
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
// at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails
}
}
}
//Problem here unable to produce
class Producer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}
public void run() {
System.out.println("Producer Started");
try {
// 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish
// you have to add this
while (this.settext.getTextmessage() == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
// 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish
// you have to add this
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V2:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V3:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
executor.execute(new Consumer((TextMessage) message));
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
TextMessage textMessage;
public Consumer(TextMessage textMessage) {
this.textMessage = textMessage;
}
public void run() {
System.out.println("Removed: " + str);
}
}
V4:
public static void main(String[] args) throws InterruptedException, JMSException {
new Consumer(queue).start();
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while (true) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
}
System.out.println("Removed: " + str);
}
}
}
我正在使用 ActiveMQ 编写应用程序,其中我使用异步 onMessage() 方法从 ActiveMQ 获取消息。 假设我从 activemq 收到 1000 条消息,因此所有消息都将存储在 OnMessage() 方法中的 ConcurrentLinkedQueue 中,并且我使用线程从 ConcurrentLinkedQueue 中检索。 但是我面临的问题是我无法在 ConcurrentLinkedQueue 中添加或检索一条消息,并且来自 onMessage() 的 textMessage 被发送到一个 setter 方法,该方法接受一个 textMessage 但我无法从 getter method.Why 是这样吗?如何避免呢?
代码片段如下
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a producer
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
}
}
}
//Problem here unable to produce
class Producer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}
public void run() {
System.out.println("Producer Started");
try {
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
Thread.currentThread().sleep(200);
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) != null) {
System.out.println("Removed: " + str);
}
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
//}
}
我不知道你为什么这样做,但你的设计有问题,请参阅下面的注释 1-5,注意 QueueMessageListener
是异步执行的,它可以在另一个消费者之前更改 settext.setTextmessage((TextMessage) message);
检索 TextMessage 并将其添加到队列中,因为这可能 V2 更好,但也许使用 org.springframework.jms.listener.DefaultMessageListenerContainer 是最好的解决方案:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a producer
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point
Thread producer = new Thread(new Producer(queue,settext));
producer.start();
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
// 3- you start consumers go to 4, note that you will only consume count messages !!
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//Setting the text message to a setter which takes TextMessage as arg
settext.setTextmessage((TextMessage) message);
// at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails
}
}
}
//Problem here unable to produce
class Producer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
Settext settext;
Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){
this.queue = queue2;
this.settext=settext;
}
public void run() {
System.out.println("Producer Started");
try {
// 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish
// you have to add this
while (this.settext.getTextmessage() == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
if(this.settext.getTextmessage()!=null)
{
//Add to ConcurrentLinkedQueue
queue.add(this.settext.getTextmessage());
}
//}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
// 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish
// you have to add this
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V2:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <count; i++) {
executor.execute(new Consumer(queue));
}
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Removed: " + str);
//}
}
V3:
public static void main(String[] args) throws InterruptedException, JMSException {
//Create a Consumer with coresize 4 and Max size 10
final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
executor.execute(new Consumer((TextMessage) message));
}
}
}
//Problem here unable to consume
class Consumer implements Runnable {
TextMessage textMessage;
public Consumer(TextMessage textMessage) {
this.textMessage = textMessage;
}
public void run() {
System.out.println("Removed: " + str);
}
}
V4:
public static void main(String[] args) throws InterruptedException, JMSException {
new Consumer(queue).start();
**//INITIALIZE ACTIVEMQ CONFIGURATION HERE**
consumer.setMessageListener(new QueueMessageListener());
executor.shutdown();
}
private static class QueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
queue.add((TextMessage) message);
}
}
//Problem here unable to consume
class Consumer implements Runnable {
ConcurrentLinkedQueue<TextMessage> queue;
public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) {
this.queue = queue2;
}
public void run() {
TextMessage str;
System.out.println("Consumer Started");
while (true) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
while ((str = queue.poll()) == null) {
try {
Thread.currentThread().sleep(500);
} catch (Exception ex) {
}
}
System.out.println("Removed: " + str);
}
}
}