Akka-Java 在 ask-await 上抛出 DeadLetterException
Akka-Java is throwing DeadLetterException on ask-await
我在这样的文件中有一个 driver/main class。 (基本上我正在尝试混合 STORM 和 AKKA)。在 TenderEventSpout2 class 中,我正在尝试发送和接收消息 to/from 一个演员。
public class TenderEventSpout2 extends BaseRichSpout {
ActorSystemHandle actorSystemHandle;
ActorSystem _system;
ActorRef eventSpoutActor;
Future<Object> future;
Timeout timeout;
String result;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//String[] message = {"WATCH_DIR"};
timeout = new Timeout(Duration.create(60, "seconds"));
List<Object> messageList = new ArrayList<Object>();
messageList.add("WATCH_DIR");
messageList.add(this.inputDirName);
actorSystemHandle = new ActorSystemHandle();
_system = actorSystemHandle.getActorSystem();
eventSpoutActor = _system.actorOf(Props.create(EventSpoutActor.class));
future = Patterns.ask(eventSpoutActor, messageList, timeout);
}
@Override
public void nextTuple() {
String result = null;
try{
result = (String) Await.result(future, timeout.duration());
}
catch(Exception e){
e.printStackTrace();
}
}
我的演员是:
public class EventSpoutActor extends UntypedActor {
public ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
Inbox inbox;
@Override
public void onReceive(Object message){// throws IOException {
if (message instanceof List<?>) {
System.out.println(((List<Object>)message).get(0)+"*******************");
if(((List<Object>)message).get(0).equals("WATCH_DIR")){
final List<Object> msg = (List<Object>)message;
Thread fileWatcher = new Thread(new Runnable(){
@Override
public void run() {
System.out.println(msg.get(1)+"*******************");
try {
String result = "Hello";
System.out.println("Before Sending Message *******************");
getSender().tell(result, getSelf());
}
catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
}
});
fileWatcher.setDaemon(true);
fileWatcher.start();
System.out.println("Started file watcher");
}
}
else{
System.out.println("Unhandled !!");
unhandled(message);
}
}
}
我可以向我的 EventSpoutActor 发送消息。但面临接收消息的问题。这是为什么??我在控制台中打印了以下消息:
[EventProcessorActorSystem-akka.actor.default-dispatcher-3]
[akka://EventProcessorActorSystem/deadLetters] Message [java.lang.String]
from Actor[akka://EventProcessorActorSystem/user/$a#-1284357486] to
Actor[akka://EventProcessorActorSystem/deadLetters] was not delivered. [1]
dead letters encountered.
This logging can be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
所以,我找到了邮件未送达的原因。
getSender().tell(result, getSelf());
本应将消息发送给发件人的这一行在线程代码中使用时丢失了上下文数据:
Thread fileWatcher = new Thread(new Runnable(){
@Override
public void run() {
System.out.println(msg.get(1)+"*******************");
try {
String result = "Hello";
System.out.println("Before Sending Message *******************");
getSender().tell(result, getSelf());
当我将 "tell" 代码移到线程外时,它起作用了。
我在这样的文件中有一个 driver/main class。 (基本上我正在尝试混合 STORM 和 AKKA)。在 TenderEventSpout2 class 中,我正在尝试发送和接收消息 to/from 一个演员。
public class TenderEventSpout2 extends BaseRichSpout {
ActorSystemHandle actorSystemHandle;
ActorSystem _system;
ActorRef eventSpoutActor;
Future<Object> future;
Timeout timeout;
String result;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//String[] message = {"WATCH_DIR"};
timeout = new Timeout(Duration.create(60, "seconds"));
List<Object> messageList = new ArrayList<Object>();
messageList.add("WATCH_DIR");
messageList.add(this.inputDirName);
actorSystemHandle = new ActorSystemHandle();
_system = actorSystemHandle.getActorSystem();
eventSpoutActor = _system.actorOf(Props.create(EventSpoutActor.class));
future = Patterns.ask(eventSpoutActor, messageList, timeout);
}
@Override
public void nextTuple() {
String result = null;
try{
result = (String) Await.result(future, timeout.duration());
}
catch(Exception e){
e.printStackTrace();
}
}
我的演员是:
public class EventSpoutActor extends UntypedActor {
public ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
Inbox inbox;
@Override
public void onReceive(Object message){// throws IOException {
if (message instanceof List<?>) {
System.out.println(((List<Object>)message).get(0)+"*******************");
if(((List<Object>)message).get(0).equals("WATCH_DIR")){
final List<Object> msg = (List<Object>)message;
Thread fileWatcher = new Thread(new Runnable(){
@Override
public void run() {
System.out.println(msg.get(1)+"*******************");
try {
String result = "Hello";
System.out.println("Before Sending Message *******************");
getSender().tell(result, getSelf());
}
catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
}
});
fileWatcher.setDaemon(true);
fileWatcher.start();
System.out.println("Started file watcher");
}
}
else{
System.out.println("Unhandled !!");
unhandled(message);
}
}
}
我可以向我的 EventSpoutActor 发送消息。但面临接收消息的问题。这是为什么??我在控制台中打印了以下消息:
[EventProcessorActorSystem-akka.actor.default-dispatcher-3]
[akka://EventProcessorActorSystem/deadLetters] Message [java.lang.String]
from Actor[akka://EventProcessorActorSystem/user/$a#-1284357486] to
Actor[akka://EventProcessorActorSystem/deadLetters] was not delivered. [1]
dead letters encountered.
This logging can be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
所以,我找到了邮件未送达的原因。
getSender().tell(result, getSelf());
本应将消息发送给发件人的这一行在线程代码中使用时丢失了上下文数据:
Thread fileWatcher = new Thread(new Runnable(){
@Override
public void run() {
System.out.println(msg.get(1)+"*******************");
try {
String result = "Hello";
System.out.println("Before Sending Message *******************");
getSender().tell(result, getSelf());
当我将 "tell" 代码移到线程外时,它起作用了。