Akka/Scala:映射 Future 与 pipeTo

Akka/Scala: mapping Future vs pipeTo

Akka actor 中,通过以下方式向另一个 actor 发送 Future 结果在使用的线程数或线程锁定方面是否存在任何差异:

一个。将 Future 映射到 tell 将结果传递给参与者的函数。

乙。为未来定义一个 onSuccess 回调,tell 将结果发送给参与者。

摄氏度。使用 pipeTo.

Future 结果传递给 actor

其中一些选项已在上一个问题中讨论过:

Akka: Send a future message to an Actor

三种方法中哪一种是首选方法,为什么?

此外,我想知道,如果receive应该是Any => Unit类型,那么为什么代码编译当某些情况下receivereturns的偏函数一个Future,不是Unit

下面是我上面提到的三个选项的代码示例:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success

class ActorIncrement extends Actor {

  def receive = {
    case i: Int =>
      println(s"increment $i")
      sender ! i + 1
  }
}

class ActorEven extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is even")
  }
}


class ActorOdd extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is odd")
  }
}

class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case i: Int if i % 2 == 0 =>
      println(s"receive a: $i")
      actorIncrement ? i map {
        case j: Int =>
          println(s"$j from increment a")
          actorOdd ! j
      }
    case i: Int =>
      println(s"receive b: $i")
      val future: Future[Any] = actorIncrement ? i
      future onSuccess {
        case i: Int =>
          println(s"$i from increment b")
          actorEven ! i
      }

    case s: String =>
      println(s"receive c: $s")
      (actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
  }
}

object TalkToActor extends App {

  // Create the 'talk-to-actor' actor system
  val system = ActorSystem("talk-to-actor")

  val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
  val actorEven = system.actorOf(Props[ActorEven], "actorEven")
  val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")

  val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")

  myActor ! 2
  myActor ! 7
  myActor ! "11"

  Thread.sleep(1000)

  //shutdown system
  system.terminate()
}

如果您查看 pipeToakka.pattern.PipeToSupport 中的定义方式,

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = 
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

如您所见...pipeTo 与仅将 andThen 调用添加到您的 Future 没有什么不同,后者发送未来结果或 Status.Failure如果您的 Future 失败,请向管道演员发送消息。

现在的主要区别在于此 Status.Failure 故障处理。如果你不使用 pipeTo,你可以用任何你想要的方式处理你的失败。