Akka/Scala 需要两个参与者才能完成的承诺

An Akka/Scala promise that requires two actors to complete

我正在使用 Scala 和 Akka 构建一个股票市场应用程序。市场匹配买家和卖家,然后发送一个

Promise[Transaction]

需要完成(在某个时候)以便处理交易的买卖双方。

问题是 promise 可能会因失败而完成,因为

  1. 买家资金不足,
  2. 卖家的份额不足。

如何创建需要两个参与者协调才能完成的 Scala 承诺?

您可以执行以下操作:

def querySellingActor(seller : ActorRef) : Future[Boolean] = ???
def queryBuyingActor(buyer : ActorRef ) : Future[Boolean] = ???

// Calling these outside the for-comprehension 
// ensures that they are really parallel
val f1 = querySellingActor(sellerActor)
val f2 = queryBuyingActor(buyerActor)

val niceResult = for{
   sellerCanSell <- f1
   buyerCanBuy <- f2
} yield { sellerCanSell && buyerCanBuy }

niceResult 现在是可以传递的 Future[Boolean],您可以在其上安装回调。

我会使用 short-lived actor 模式,只有当你真的需要时才应该使用 ask 模式:

case object TransactionTimeout
case class NewTransaction(buyer: ActorRef, seller: ActorRef)

class TransactionHandlerActor extends Actor with ActorLogging {

  var _scheduled: Cancellable = null
  var resultReceiver = Actor.noSender
  var hasBuyerAnswered = false
  var hasSellerAnswered = false

  def receive: Receive = {
    case NewTransaction(buyer, seller) =>
      resultReceiver = sender()
      buyer ! BuyRequest(...)
      seller ! SellRequest(...)
      _scheduled = context.system.scheduler.scheduleOnce(5 seconds)(self ! TransactionTimeout)

    case TransactionTimeout =>
      resultReceiver ! TransactionError(...)
      context.stop(self)

    case BuyerResponse(...) =>
      hasBuyerAnswered = true
      checkIfComplete()
    case SellerResponse(...) =>
      hasSellerAnswered = true
      checkIfComplete()
  }

  def checkIfComplete() = {
    if(hasBuyerAnswered && hasSellerAnswered) {
      /* finalize operation */
      resultReceiver ! TransactionCompleted(...)
      _scheduled.cancel()
      context.stop(self)
    }
  }
}

基本上临时参与者处理事务并在完成时自行停止,超时是为了确保他不会永远等待响应。这是您可以执行的操作的一个非常简单的示例,代码未经测试。

你不想使用 ask 是对的——应该尽可能避免。

以下是如何向买卖双方发送 Promise,然后编写相应的 Futures 并安排在 Promises 履行时处理交易。

import scala.concurrent.{future, promise}
import scala.concurrent.ExecutionContext.Implicits.global

val  fundsPromise = promise[Funds]
val sharesPromise = promise[Shares]

 buyerActor ! GetFunds(amount, fundsPromise)
sellerActor ! GetShares(numShares, stock, sharesPromise)

val futureFunds =   fundsPromise.future
val futureShares = sharesPromise.future

val purchase =
  for { funds <- futureFunds; shares <- futureShares }
    yield transact(funds, shares)

purchase onComplete {
  case Success(transactionResult) =>
     buyerActor ! PutShares(numShares)
    sellerActor ! PutFunds(amount)
    // tell somebody it worked
  case Failure(t) =>
    futureFunds  onSuccess { case _ =>  buyerActor ! PutFunds(amount) }
    futureShares onSuccess { case _ => sellerActor ! PutShares(numShares) }
    // tell somebody it failed because of t
}

请注意,最终结果可能因多种原因而失败:因为您无法从买家那里获得资金,因为您无法从卖家那里获得股份,或者因为 transact 中出现问题.因此,如果失败,我们会检查基金和股票期货,看看我们是否得到了它们,如果是,我们会归还我们得到的东西。

另请注意,我们在期货中关闭 amountnumShares,这意味着您希望它们成为 vals。如果它们是变量,当期货实际上 运行.

时,您最终可能会使用错误的值

正如 bsmk 指出的那样,这假设 buyerActorsellerActor 与上述代码位于同一 JVM 中。如果没有,您可以让本地演员在收到远程演员的消息后处理承诺;如果有更好的方法,请在评论中告诉我。