Akka/Scala:映射 Future 与 pipeTo
Akka/Scala: mapping Future vs pipeTo
在 Akka
actor 中,通过以下方式向另一个 actor 发送 Future
结果在使用的线程数或线程锁定方面是否存在任何差异:
一个。将 Future
映射到 tell
将结果传递给参与者的函数。
乙。为未来定义一个 onSuccess
回调,tell
将结果发送给参与者。
摄氏度。使用 pipeTo
.
将 Future
结果传递给 actor
其中一些选项已在上一个问题中讨论过:
Akka: Send a future message to an Actor
三种方法中哪一种是首选方法,为什么?
此外,我想知道,如果receive
应该是Any => Unit
类型,那么为什么代码编译当某些情况下receive
returns的偏函数一个Future
,不是Unit
?
下面是我上面提到的三个选项的代码示例:
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success
class ActorIncrement extends Actor {
def receive = {
case i: Int =>
println(s"increment $i")
sender ! i + 1
}
}
class ActorEven extends Actor {
def receive = {
case i: Int =>
println(s"$i is even")
}
}
class ActorOdd extends Actor {
def receive = {
case i: Int =>
println(s"$i is odd")
}
}
class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(5 seconds)
def receive = {
case i: Int if i % 2 == 0 =>
println(s"receive a: $i")
actorIncrement ? i map {
case j: Int =>
println(s"$j from increment a")
actorOdd ! j
}
case i: Int =>
println(s"receive b: $i")
val future: Future[Any] = actorIncrement ? i
future onSuccess {
case i: Int =>
println(s"$i from increment b")
actorEven ! i
}
case s: String =>
println(s"receive c: $s")
(actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
}
}
object TalkToActor extends App {
// Create the 'talk-to-actor' actor system
val system = ActorSystem("talk-to-actor")
val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
val actorEven = system.actorOf(Props[ActorEven], "actorEven")
val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")
val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")
myActor ! 2
myActor ! 7
myActor ! "11"
Thread.sleep(1000)
//shutdown system
system.terminate()
}
如果您查看 pipeTo
在 akka.pattern.PipeToSupport
中的定义方式,
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef =
Actor.noSender): Future[T] = {
future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
}
}
如您所见...pipeTo
与仅将 andThen
调用添加到您的 Future
没有什么不同,后者发送未来结果或 Status.Failure
如果您的 Future
失败,请向管道演员发送消息。
现在的主要区别在于此 Status.Failure
故障处理。如果你不使用 pipeTo
,你可以用任何你想要的方式处理你的失败。
在 Akka
actor 中,通过以下方式向另一个 actor 发送 Future
结果在使用的线程数或线程锁定方面是否存在任何差异:
一个。将 Future
映射到 tell
将结果传递给参与者的函数。
乙。为未来定义一个 onSuccess
回调,tell
将结果发送给参与者。
摄氏度。使用 pipeTo
.
Future
结果传递给 actor
其中一些选项已在上一个问题中讨论过:
Akka: Send a future message to an Actor
三种方法中哪一种是首选方法,为什么?
此外,我想知道,如果receive
应该是Any => Unit
类型,那么为什么代码编译当某些情况下receive
returns的偏函数一个Future
,不是Unit
?
下面是我上面提到的三个选项的代码示例:
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success
class ActorIncrement extends Actor {
def receive = {
case i: Int =>
println(s"increment $i")
sender ! i + 1
}
}
class ActorEven extends Actor {
def receive = {
case i: Int =>
println(s"$i is even")
}
}
class ActorOdd extends Actor {
def receive = {
case i: Int =>
println(s"$i is odd")
}
}
class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(5 seconds)
def receive = {
case i: Int if i % 2 == 0 =>
println(s"receive a: $i")
actorIncrement ? i map {
case j: Int =>
println(s"$j from increment a")
actorOdd ! j
}
case i: Int =>
println(s"receive b: $i")
val future: Future[Any] = actorIncrement ? i
future onSuccess {
case i: Int =>
println(s"$i from increment b")
actorEven ! i
}
case s: String =>
println(s"receive c: $s")
(actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
}
}
object TalkToActor extends App {
// Create the 'talk-to-actor' actor system
val system = ActorSystem("talk-to-actor")
val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
val actorEven = system.actorOf(Props[ActorEven], "actorEven")
val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")
val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")
myActor ! 2
myActor ! 7
myActor ! "11"
Thread.sleep(1000)
//shutdown system
system.terminate()
}
如果您查看 pipeTo
在 akka.pattern.PipeToSupport
中的定义方式,
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef =
Actor.noSender): Future[T] = {
future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
}
}
如您所见...pipeTo
与仅将 andThen
调用添加到您的 Future
没有什么不同,后者发送未来结果或 Status.Failure
如果您的 Future
失败,请向管道演员发送消息。
现在的主要区别在于此 Status.Failure
故障处理。如果你不使用 pipeTo
,你可以用任何你想要的方式处理你的失败。