带有 akka 和多线程的 JMS
JMS with akka and multi threading
public class QueueListener implements MessageListener {
public static final ExecutorService executor = Executors.newWorkStealingPool();
public static boolean isActorinit=false;
public static ActorSystem system=null;
private ActorRef myActor=null;
private String _queueName=null;
public QueueListener(String qName){
this._queueName = qName;
if(!isActorinit){
system=ActorSystem.create("Controller");
try {
myActor=system.actorOf(Props.create(MessageExecutor.class.getConstructor(String.class).newInstance(_queueName).getClass()),"mysysActor");
} catch (Exception e) {
// TODO Auto-generated catch block
}
isActorinit=true;
}
}
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message msg) {
// processRequest(msg);
executeRequest(msg);
}
/** This method will process the message fetch by the listener.
*
* @param msg - javax.jms.Messages parameter get queue message
*/
private void processRequest(Message msg){
String requestData=null;
try {
if(msg instanceof TextMessage){
TextMessage textMessage= (TextMessage) msg;
requestData = textMessage.getText().toString();
}else if(msg instanceof ObjectMessage){
ObjectMessage objMsg = (ObjectMessage) msg;
requestData = objMsg.getObject().toString();
}
MessageProcessor msgProcessor = new MessageProcessor(_queueName, requestData);
executor.submit(msgProcessor);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
private void executeRequest(Message msg){
String requestData=null;
try {
if(msg instanceof TextMessage){
TextMessage textMessage= (TextMessage) msg;
requestData = textMessage.getText().toString();
}else if(msg instanceof ObjectMessage){
ObjectMessage objMsg = (ObjectMessage) msg;
requestData = objMsg.getObject().toString();
}
// MessageExecutor objMessageExecutor=new MessageExecutor(_queueName);
myActor.tell(requestData, ActorRef.noSender());
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
当使用 ExecutorService 执行 ProcessRequst 方法时,这段代码工作正常。然而,akka actor 系统实现面临以下问题。
Exception in thread "Thread-4" java.lang.NullPointerException
at com.syn.jms.listener.QueueListener.executeRequest(QueueListener.java:102)
at com.syn.jms.listener.QueueListener.onMessage(QueueListener.java:59)
at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
at java.lang.Thread.run(Thread.java:745)
请注意,我将 Apache qpid APi 用于带有 activeMQ 的 AMQP 协议。
我无法理解这个问题。
创建 actor 花费的时间很少,在创建 actor 的过程中,如果你有消息要处理,你会得到空指针异常。
您可能需要在向 'myActor' 发送消息之前检查它是否为空。或者您可以考虑使用 actor 生命周期事件来开始处理消息。
我找到了解决方案,这是由于在处理每个进程队列的具有唯一 actorRef 的多个输入时 Actor ref 的 NPE 而导致的,并且它没有初始化对象。我找到了这个解决方案。
public QueueListener(String actorId,String qName){
this._queueName = qName;
if(!isActorinit){
system=ActorSystem.create(actorId);
isActorinit=true;
}
myActor=system.actorOf( Props.create(MessageExecutor.class, qName),qName);
}
不过,我很感激您的意见,让我找到了解决方案。特纳克斯
public class QueueListener implements MessageListener {
public static final ExecutorService executor = Executors.newWorkStealingPool();
public static boolean isActorinit=false;
public static ActorSystem system=null;
private ActorRef myActor=null;
private String _queueName=null;
public QueueListener(String qName){
this._queueName = qName;
if(!isActorinit){
system=ActorSystem.create("Controller");
try {
myActor=system.actorOf(Props.create(MessageExecutor.class.getConstructor(String.class).newInstance(_queueName).getClass()),"mysysActor");
} catch (Exception e) {
// TODO Auto-generated catch block
}
isActorinit=true;
}
}
/*
* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message msg) {
// processRequest(msg);
executeRequest(msg);
}
/** This method will process the message fetch by the listener.
*
* @param msg - javax.jms.Messages parameter get queue message
*/
private void processRequest(Message msg){
String requestData=null;
try {
if(msg instanceof TextMessage){
TextMessage textMessage= (TextMessage) msg;
requestData = textMessage.getText().toString();
}else if(msg instanceof ObjectMessage){
ObjectMessage objMsg = (ObjectMessage) msg;
requestData = objMsg.getObject().toString();
}
MessageProcessor msgProcessor = new MessageProcessor(_queueName, requestData);
executor.submit(msgProcessor);
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
private void executeRequest(Message msg){
String requestData=null;
try {
if(msg instanceof TextMessage){
TextMessage textMessage= (TextMessage) msg;
requestData = textMessage.getText().toString();
}else if(msg instanceof ObjectMessage){
ObjectMessage objMsg = (ObjectMessage) msg;
requestData = objMsg.getObject().toString();
}
// MessageExecutor objMessageExecutor=new MessageExecutor(_queueName);
myActor.tell(requestData, ActorRef.noSender());
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
当使用 ExecutorService 执行 ProcessRequst 方法时,这段代码工作正常。然而,akka actor 系统实现面临以下问题。
Exception in thread "Thread-4" java.lang.NullPointerException
at com.syn.jms.listener.QueueListener.executeRequest(QueueListener.java:102)
at com.syn.jms.listener.QueueListener.onMessage(QueueListener.java:59)
at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
at java.lang.Thread.run(Thread.java:745)
请注意,我将 Apache qpid APi 用于带有 activeMQ 的 AMQP 协议。
我无法理解这个问题。
创建 actor 花费的时间很少,在创建 actor 的过程中,如果你有消息要处理,你会得到空指针异常。
您可能需要在向 'myActor' 发送消息之前检查它是否为空。或者您可以考虑使用 actor 生命周期事件来开始处理消息。
我找到了解决方案,这是由于在处理每个进程队列的具有唯一 actorRef 的多个输入时 Actor ref 的 NPE 而导致的,并且它没有初始化对象。我找到了这个解决方案。
public QueueListener(String actorId,String qName){
this._queueName = qName;
if(!isActorinit){
system=ActorSystem.create(actorId);
isActorinit=true;
}
myActor=system.actorOf( Props.create(MessageExecutor.class, qName),qName);
}
不过,我很感激您的意见,让我找到了解决方案。特纳克斯