Akka 事件消息不会派发或触发另一个事件。总是首先触发其他事件,其他事件没有得到调度
Akka event message is not dispatch or trigger another event. Always getting triggered first event other events are not getting dispatch
我是 Akka 的新手,我正在尝试触发事件消息或向 Akka Actor 发送事件消息,我有 3 个事件消息一个再见一个我正在触发但是,为什么只有一个第一个事件被触发。
可能是因为:receive(receiveEvent);
这个方法在我的EventProcessActor
构造函数中调用。
但在那之后我们也调用了其他事件,但我在这里遗漏了一些东西,为什么它不分派给其他匹配事件。
我总是在控制台中得到以下输出:
[INFO] [03/18/2017 13:35:53.446]... We received the Events need to process it
我的预期输出是:
[INFO] [03/18/2017 13:35:53.446] ... We received the Events need to process it
[INFO] [03/18/2017 13:35:53.447]... We are processing Events
[INFO] [03/18/2017 13:35:53.446]... Completed Events processing
上面的控制台输出我已经删除了 [default-akka.actor.default-dispatcher-4] [akka://default/user/EventProcessing]
和 ...
因为我正在触发如下事件:
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new ventProcessActor.EventActivity(Events.END_OR,Paths.get("/")), procsssEvents);
下面是我的 Acotr class 和消息 class 和 pom.xml 文件。
阿卡演员:
package com.ebc.biz.akka.event.trigger;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import akka.actor.AbstractLoggingActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystemImpl;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import static com.ebc.biz.akka.event.trigger.EventMessage.Events;
public class EventProcessActor extends AbstractLoggingActor {
public static class EventActivity {
final EventMessage startOfEventMessage;
public EventMessage getStartOfEventMessage() {
return startOfEventMessage;
}
public EventActivity(Events events, Path eventPath) {
startOfEventMessage = new EventMessage(events, eventPath);
}
}
public static class EventReadingActivity {
final EventMessage startOfReadingMessage;
public EventMessage getStartOfReadingMessage() {
return startOfReadingMessage;
}
public EventReadingActivity(Events events, Path eventPath) {
startOfReadingMessage = new EventMessage(events, eventPath);
}
}
public static class EndOfEventActivity {
final EventMessage endOfEventMessage;
public EventMessage getEndOfEventMessage() {
return endOfEventMessage;
}
public EndOfEventActivity(Events events, Path eventPath) {
endOfEventMessage = new EventMessage(Events.END_OR, eventPath);
}
}
private final PartialFunction<Object, BoxedUnit> receiveEvent;
private final PartialFunction<Object, BoxedUnit> startEventsProcessing;
private final PartialFunction<Object, BoxedUnit> completeEventProcessing;
public EventProcessActor() {
receiveEvent = ReceiveBuilder
.match(EventActivity.class, this::onStartEventReceive)
.match(EventReadingActivity.class, this::readEventLine).build();
startEventsProcessing = ReceiveBuilder
.match(EventReadingActivity.class, this::readEventLine)
.match(EndOfEventActivity.class, this::onEndOfEventProcessing)
.build();
completeEventProcessing = ReceiveBuilder.match(
EndOfEventActivity.class, this::onEndOfEventProcessing).build();
receive(receiveEvent);
}
public static Props props() {
return Props.create(EventProcessActor.class);
}
public void onStartEventReceive(EventActivity fileActivity) {
log().info("We received the Events need to process it");
getContext().become(startEventsProcessing);
}
public void readEventLine(EventReadingActivity fileActivity) {
log().info("We are processing Events");
getContext().become(completeEventProcessing);
}
public void onEndOfEventProcessing(EndOfEventActivity fileActivity) {
log().info("Completed Events processing");
}
public static void main(String args[]) throws IOException {
ActorSystem syste = ActorSystemImpl.create();
final ActorRef procsssEvents = syste.actorOf(EventProcessActor.props(),
"Event" + "Processing");
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT,
Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(
Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(Events.END_OR,
Paths.get("/")), procsssEvents);
System.out.println("Enter to terminate");
System.in.read();
}
}
EventMessage
package com.ebc.biz.akka.event.trigger;
import java.nio.file.Path;
public class EventMessage {
public static enum Events {
STSRT, READING_LINE, END_OR;
}
private final Events readEvents;
private final Path pathOfEvents;
public Path getPathOfEvents() {
return pathOfEvents;
}
public Events getReadEvents() {
return readEvents;
}
public EventMessage(Events readEvents, Path pathOfFile) {
this.readEvents = readEvents;
this.pathOfEvents = pathOfFile;
}
}
Pom.xml
<groupId>com.ebc.biz</groupId>
<artifactId>akka.event.trigger</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<akka.version>2.4.9</akka.version>
<maven-dependency-plugin.version>3.0.0</maven-dependency-plugin.version>
<maven.compiler.plugin>3.6.1</maven.compiler.plugin>
<java.compiler.target>1.8</java.compiler.target>
<java.compiler.source>1.8</java.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-jackson-experimental_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven-dependency-plugin.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- This will download source so easy to see API and java doc. -->
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Java 8 compiler plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin}</version>
<configuration>
<source>${java.compiler.source}</source>
<target>${java.compiler.target}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
为什么我的消息没有被发送到另一个。我想我错过了什么。
在此先感谢您提供任何类型的信息和帮助。
编辑:问题是 - 你正在发送 EventActivity
而演员希望每个设计都有另一种类型,所以你应该更新你的 main
:
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventReadingActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EndOfEventActivity(Events.END_OR, Paths.get("/")), procsssEvents);
我是 Akka 的新手,我正在尝试触发事件消息或向 Akka Actor 发送事件消息,我有 3 个事件消息一个再见一个我正在触发但是,为什么只有一个第一个事件被触发。
可能是因为:receive(receiveEvent);
这个方法在我的EventProcessActor
构造函数中调用。
但在那之后我们也调用了其他事件,但我在这里遗漏了一些东西,为什么它不分派给其他匹配事件。
我总是在控制台中得到以下输出:
[INFO] [03/18/2017 13:35:53.446]... We received the Events need to process it
我的预期输出是:
[INFO] [03/18/2017 13:35:53.446] ... We received the Events need to process it
[INFO] [03/18/2017 13:35:53.447]... We are processing Events
[INFO] [03/18/2017 13:35:53.446]... Completed Events processing
上面的控制台输出我已经删除了 [default-akka.actor.default-dispatcher-4] [akka://default/user/EventProcessing]
和 ...
因为我正在触发如下事件:
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new ventProcessActor.EventActivity(Events.END_OR,Paths.get("/")), procsssEvents);
下面是我的 Acotr class 和消息 class 和 pom.xml 文件。
阿卡演员:
package com.ebc.biz.akka.event.trigger;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import akka.actor.AbstractLoggingActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ActorSystemImpl;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import static com.ebc.biz.akka.event.trigger.EventMessage.Events;
public class EventProcessActor extends AbstractLoggingActor {
public static class EventActivity {
final EventMessage startOfEventMessage;
public EventMessage getStartOfEventMessage() {
return startOfEventMessage;
}
public EventActivity(Events events, Path eventPath) {
startOfEventMessage = new EventMessage(events, eventPath);
}
}
public static class EventReadingActivity {
final EventMessage startOfReadingMessage;
public EventMessage getStartOfReadingMessage() {
return startOfReadingMessage;
}
public EventReadingActivity(Events events, Path eventPath) {
startOfReadingMessage = new EventMessage(events, eventPath);
}
}
public static class EndOfEventActivity {
final EventMessage endOfEventMessage;
public EventMessage getEndOfEventMessage() {
return endOfEventMessage;
}
public EndOfEventActivity(Events events, Path eventPath) {
endOfEventMessage = new EventMessage(Events.END_OR, eventPath);
}
}
private final PartialFunction<Object, BoxedUnit> receiveEvent;
private final PartialFunction<Object, BoxedUnit> startEventsProcessing;
private final PartialFunction<Object, BoxedUnit> completeEventProcessing;
public EventProcessActor() {
receiveEvent = ReceiveBuilder
.match(EventActivity.class, this::onStartEventReceive)
.match(EventReadingActivity.class, this::readEventLine).build();
startEventsProcessing = ReceiveBuilder
.match(EventReadingActivity.class, this::readEventLine)
.match(EndOfEventActivity.class, this::onEndOfEventProcessing)
.build();
completeEventProcessing = ReceiveBuilder.match(
EndOfEventActivity.class, this::onEndOfEventProcessing).build();
receive(receiveEvent);
}
public static Props props() {
return Props.create(EventProcessActor.class);
}
public void onStartEventReceive(EventActivity fileActivity) {
log().info("We received the Events need to process it");
getContext().become(startEventsProcessing);
}
public void readEventLine(EventReadingActivity fileActivity) {
log().info("We are processing Events");
getContext().become(completeEventProcessing);
}
public void onEndOfEventProcessing(EndOfEventActivity fileActivity) {
log().info("Completed Events processing");
}
public static void main(String args[]) throws IOException {
ActorSystem syste = ActorSystemImpl.create();
final ActorRef procsssEvents = syste.actorOf(EventProcessActor.props(),
"Event" + "Processing");
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT,
Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(
Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventActivity(Events.END_OR,
Paths.get("/")), procsssEvents);
System.out.println("Enter to terminate");
System.in.read();
}
}
EventMessage
package com.ebc.biz.akka.event.trigger;
import java.nio.file.Path;
public class EventMessage {
public static enum Events {
STSRT, READING_LINE, END_OR;
}
private final Events readEvents;
private final Path pathOfEvents;
public Path getPathOfEvents() {
return pathOfEvents;
}
public Events getReadEvents() {
return readEvents;
}
public EventMessage(Events readEvents, Path pathOfFile) {
this.readEvents = readEvents;
this.pathOfEvents = pathOfFile;
}
}
Pom.xml
<groupId>com.ebc.biz</groupId>
<artifactId>akka.event.trigger</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<akka.version>2.4.9</akka.version>
<maven-dependency-plugin.version>3.0.0</maven-dependency-plugin.version>
<maven.compiler.plugin>3.6.1</maven.compiler.plugin>
<java.compiler.target>1.8</java.compiler.target>
<java.compiler.source>1.8</java.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-core_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-experimental_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-http-jackson-experimental_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven-dependency-plugin.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- This will download source so easy to see API and java doc. -->
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Java 8 compiler plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin}</version>
<configuration>
<source>${java.compiler.source}</source>
<target>${java.compiler.target}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
为什么我的消息没有被发送到另一个。我想我错过了什么。
在此先感谢您提供任何类型的信息和帮助。
编辑:问题是 - 你正在发送 EventActivity
而演员希望每个设计都有另一种类型,所以你应该更新你的 main
:
procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EventReadingActivity(Events.READING_LINE, Paths.get("/")), procsssEvents);
procsssEvents.tell(new EventProcessActor.EndOfEventActivity(Events.END_OR, Paths.get("/")), procsssEvents);