Akka/Scala 需要两个参与者才能完成的承诺
An Akka/Scala promise that requires two actors to complete
我正在使用 Scala 和 Akka 构建一个股票市场应用程序。市场匹配买家和卖家,然后发送一个
Promise[Transaction]
需要完成(在某个时候)以便处理交易的买卖双方。
问题是 promise 可能会因失败而完成,因为
- 买家资金不足,
- 卖家的份额不足。
如何创建需要两个参与者协调才能完成的 Scala 承诺?
您可以执行以下操作:
def querySellingActor(seller : ActorRef) : Future[Boolean] = ???
def queryBuyingActor(buyer : ActorRef ) : Future[Boolean] = ???
// Calling these outside the for-comprehension
// ensures that they are really parallel
val f1 = querySellingActor(sellerActor)
val f2 = queryBuyingActor(buyerActor)
val niceResult = for{
sellerCanSell <- f1
buyerCanBuy <- f2
} yield { sellerCanSell && buyerCanBuy }
niceResult
现在是可以传递的 Future[Boolean],您可以在其上安装回调。
我会使用 short-lived actor 模式,只有当你真的需要时才应该使用 ask
模式:
case object TransactionTimeout
case class NewTransaction(buyer: ActorRef, seller: ActorRef)
class TransactionHandlerActor extends Actor with ActorLogging {
var _scheduled: Cancellable = null
var resultReceiver = Actor.noSender
var hasBuyerAnswered = false
var hasSellerAnswered = false
def receive: Receive = {
case NewTransaction(buyer, seller) =>
resultReceiver = sender()
buyer ! BuyRequest(...)
seller ! SellRequest(...)
_scheduled = context.system.scheduler.scheduleOnce(5 seconds)(self ! TransactionTimeout)
case TransactionTimeout =>
resultReceiver ! TransactionError(...)
context.stop(self)
case BuyerResponse(...) =>
hasBuyerAnswered = true
checkIfComplete()
case SellerResponse(...) =>
hasSellerAnswered = true
checkIfComplete()
}
def checkIfComplete() = {
if(hasBuyerAnswered && hasSellerAnswered) {
/* finalize operation */
resultReceiver ! TransactionCompleted(...)
_scheduled.cancel()
context.stop(self)
}
}
}
基本上临时参与者处理事务并在完成时自行停止,超时是为了确保他不会永远等待响应。这是您可以执行的操作的一个非常简单的示例,代码未经测试。
你不想使用 ask
是对的——应该尽可能避免。
以下是如何向买卖双方发送 Promise,然后编写相应的 Futures 并安排在 Promises 履行时处理交易。
import scala.concurrent.{future, promise}
import scala.concurrent.ExecutionContext.Implicits.global
val fundsPromise = promise[Funds]
val sharesPromise = promise[Shares]
buyerActor ! GetFunds(amount, fundsPromise)
sellerActor ! GetShares(numShares, stock, sharesPromise)
val futureFunds = fundsPromise.future
val futureShares = sharesPromise.future
val purchase =
for { funds <- futureFunds; shares <- futureShares }
yield transact(funds, shares)
purchase onComplete {
case Success(transactionResult) =>
buyerActor ! PutShares(numShares)
sellerActor ! PutFunds(amount)
// tell somebody it worked
case Failure(t) =>
futureFunds onSuccess { case _ => buyerActor ! PutFunds(amount) }
futureShares onSuccess { case _ => sellerActor ! PutShares(numShares) }
// tell somebody it failed because of t
}
请注意,最终结果可能因多种原因而失败:因为您无法从买家那里获得资金,因为您无法从卖家那里获得股份,或者因为 transact
中出现问题.因此,如果失败,我们会检查基金和股票期货,看看我们是否得到了它们,如果是,我们会归还我们得到的东西。
另请注意,我们在期货中关闭 amount
和 numShares
,这意味着您希望它们成为 vals。如果它们是变量,当期货实际上 运行.
时,您最终可能会使用错误的值
正如 bsmk 指出的那样,这假设 buyerActor
和 sellerActor
与上述代码位于同一 JVM 中。如果没有,您可以让本地演员在收到远程演员的消息后处理承诺;如果有更好的方法,请在评论中告诉我。
我正在使用 Scala 和 Akka 构建一个股票市场应用程序。市场匹配买家和卖家,然后发送一个
Promise[Transaction]
需要完成(在某个时候)以便处理交易的买卖双方。
问题是 promise 可能会因失败而完成,因为
- 买家资金不足,
- 卖家的份额不足。
如何创建需要两个参与者协调才能完成的 Scala 承诺?
您可以执行以下操作:
def querySellingActor(seller : ActorRef) : Future[Boolean] = ???
def queryBuyingActor(buyer : ActorRef ) : Future[Boolean] = ???
// Calling these outside the for-comprehension
// ensures that they are really parallel
val f1 = querySellingActor(sellerActor)
val f2 = queryBuyingActor(buyerActor)
val niceResult = for{
sellerCanSell <- f1
buyerCanBuy <- f2
} yield { sellerCanSell && buyerCanBuy }
niceResult
现在是可以传递的 Future[Boolean],您可以在其上安装回调。
我会使用 short-lived actor 模式,只有当你真的需要时才应该使用 ask
模式:
case object TransactionTimeout
case class NewTransaction(buyer: ActorRef, seller: ActorRef)
class TransactionHandlerActor extends Actor with ActorLogging {
var _scheduled: Cancellable = null
var resultReceiver = Actor.noSender
var hasBuyerAnswered = false
var hasSellerAnswered = false
def receive: Receive = {
case NewTransaction(buyer, seller) =>
resultReceiver = sender()
buyer ! BuyRequest(...)
seller ! SellRequest(...)
_scheduled = context.system.scheduler.scheduleOnce(5 seconds)(self ! TransactionTimeout)
case TransactionTimeout =>
resultReceiver ! TransactionError(...)
context.stop(self)
case BuyerResponse(...) =>
hasBuyerAnswered = true
checkIfComplete()
case SellerResponse(...) =>
hasSellerAnswered = true
checkIfComplete()
}
def checkIfComplete() = {
if(hasBuyerAnswered && hasSellerAnswered) {
/* finalize operation */
resultReceiver ! TransactionCompleted(...)
_scheduled.cancel()
context.stop(self)
}
}
}
基本上临时参与者处理事务并在完成时自行停止,超时是为了确保他不会永远等待响应。这是您可以执行的操作的一个非常简单的示例,代码未经测试。
你不想使用 ask
是对的——应该尽可能避免。
以下是如何向买卖双方发送 Promise,然后编写相应的 Futures 并安排在 Promises 履行时处理交易。
import scala.concurrent.{future, promise}
import scala.concurrent.ExecutionContext.Implicits.global
val fundsPromise = promise[Funds]
val sharesPromise = promise[Shares]
buyerActor ! GetFunds(amount, fundsPromise)
sellerActor ! GetShares(numShares, stock, sharesPromise)
val futureFunds = fundsPromise.future
val futureShares = sharesPromise.future
val purchase =
for { funds <- futureFunds; shares <- futureShares }
yield transact(funds, shares)
purchase onComplete {
case Success(transactionResult) =>
buyerActor ! PutShares(numShares)
sellerActor ! PutFunds(amount)
// tell somebody it worked
case Failure(t) =>
futureFunds onSuccess { case _ => buyerActor ! PutFunds(amount) }
futureShares onSuccess { case _ => sellerActor ! PutShares(numShares) }
// tell somebody it failed because of t
}
请注意,最终结果可能因多种原因而失败:因为您无法从买家那里获得资金,因为您无法从卖家那里获得股份,或者因为 transact
中出现问题.因此,如果失败,我们会检查基金和股票期货,看看我们是否得到了它们,如果是,我们会归还我们得到的东西。
另请注意,我们在期货中关闭 amount
和 numShares
,这意味着您希望它们成为 vals。如果它们是变量,当期货实际上 运行.
正如 bsmk 指出的那样,这假设 buyerActor
和 sellerActor
与上述代码位于同一 JVM 中。如果没有,您可以让本地演员在收到远程演员的消息后处理承诺;如果有更好的方法,请在评论中告诉我。