Java Akka 聚合器
Java Aggregator for Akka
我正在尝试实施 Java Aggregator for Akka, since it doesn't look like Java API 支持它们(为什么不!?)
这是我迄今为止的最佳尝试:
// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
ActorRef recipient
Set<T> aggregation
// TODO: Timer timer (?)
abstract boolean isAggregated()
@Override
void onReceive(Object message) {
aggregation << message as T
if(isAggregated()) {
recipient.tell(new Aggregation(aggregation)) // again, pseudo-code
aggregation.clear()
// TODO: timer.reset()
}
}
}
缺少的是某种 Timer
结构,如果它还没有聚合,它将在 Aggregator
之后超时,比如说 60 秒。超时时,它应该抛出某种异常。在聚合时,计时器应该被重置。任何想法如何做到这一点?
您要找的是ReceiveTimeout
。 Akka 提供了一个功能,当特定的 actor 在预定义的时间内没有收到任何东西时会超时。
在 Java 你会在你的 actor 中做这样的事情:
getContext().setReceiveTimeout(Duration.create("1 second"));
当此触发器发送类型为 ReceiveTimeout
的消息给参与者时,然后您可以决定要执行的操作(异常、日志记录、重置...)。
您可以在此处 'Receive timeout' 部分找到更多信息:http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html
另一方面,github 中有开源库可以做这些事情。查看 https://github.com/sksamuel/akka-patterns 以获取更多示例。
我正在尝试实施 Java Aggregator for Akka, since it doesn't look like Java API 支持它们(为什么不!?)
这是我迄今为止的最佳尝试:
// Groovy pseudo-code
abstract class Aggregator<T> extends UntypedActor {
ActorRef recipient
Set<T> aggregation
// TODO: Timer timer (?)
abstract boolean isAggregated()
@Override
void onReceive(Object message) {
aggregation << message as T
if(isAggregated()) {
recipient.tell(new Aggregation(aggregation)) // again, pseudo-code
aggregation.clear()
// TODO: timer.reset()
}
}
}
缺少的是某种 Timer
结构,如果它还没有聚合,它将在 Aggregator
之后超时,比如说 60 秒。超时时,它应该抛出某种异常。在聚合时,计时器应该被重置。任何想法如何做到这一点?
您要找的是ReceiveTimeout
。 Akka 提供了一个功能,当特定的 actor 在预定义的时间内没有收到任何东西时会超时。
在 Java 你会在你的 actor 中做这样的事情:
getContext().setReceiveTimeout(Duration.create("1 second"));
当此触发器发送类型为 ReceiveTimeout
的消息给参与者时,然后您可以决定要执行的操作(异常、日志记录、重置...)。
您可以在此处 'Receive timeout' 部分找到更多信息:http://doc.akka.io/docs/akka/snapshot/java/untyped-actors.html
另一方面,github 中有开源库可以做这些事情。查看 https://github.com/sksamuel/akka-patterns 以获取更多示例。