如何在 Scala 中使用 Future 进行投票?

How to poll with a Future in Scala?

我想轮询一个 API 端点,直到它达到某种条件。我预计它会在几秒到一分钟内达到这种状态。我有一种方法可以将 returns 的端点称为 Future。有什么方法可以将 Future 链接在一起,每隔 n 毫秒轮询一次此端点,并在 t 次尝试后放弃?

假设我有一个具有以下签名的函数:

def isComplete(): Future[Boolean] = ???

在我看来,最简单的方法是让所有内容都阻塞:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}

但是这样可能会占用所有的线程,不是异步的。我也考虑过递归地做:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}

但是,我担心调用堆栈会最大化,因为这不是尾递归。

有更好的方法吗?

编辑:我正在使用 akka

其实根本就不是递归的,所以栈就没事了。

我能想到的对你的方法的一个改进是使用某种调度程序而不是 Thread.sleep 这样你就不会阻塞线程。

此示例使用标准 java 的 TimerTask,但如果您使用某种框架,如 akka、play 或其他,它可能有自己的调度程序,那就是一个更好的选择。

object Scheduler {
   val timer = new Timer(true)
   def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
     val promise = Promise[T]()
     timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
     promise.future
   }
}


def untilComplete(attempts: Int = 10) = isComplete().flatMap { 
   case true => Future.successful(())
   case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
   case _ => throw new Exception("Attempts exhausted.") 
}

我给了自己一个图书馆来做这件事。我有

trait Poller extends AutoCloseable {
  def addTask[T]( task : Poller.Task[T] ) : Future[T]
  def close() : Unit
}

其中 Poller.Task 看起来像

class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )

Poller 每隔 period 轮询一次,直到 pollFor 方法成功(产生 Some[T])或超过 timeout

为方便起见,当我开始轮询时,我将其包装成 Poller.Task.withDeadline:

final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
  def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}

将任务的(不可变、可重用)timeout 持续时间转换为每次轮询尝试的超时截止日期。

为了有效地进行轮询,我使用 Java 的 ScheduledExecutorService:

def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
  val promise = Promise[T]()
  scheduleTask( Poller.Task.withDeadline( task ), promise )
  promise.future
}

private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
  if ( isClosed ) { 
    promise.failure( new Poller.ClosedException( this ) )
  } else {
    val task     = twd.task
    val deadline = twd.deadline

    val runnable = new Runnable {

      def run() : Unit = {
        try {
          if ( ! twd.timedOut ) {
            task.pollFor() match {
              case Some( value ) => promise.success( value )
              case None          => Abstract.this.scheduleTask( twd, promise )
            }
          } else {
            promise.failure( new Poller.TimeoutException( task.label, deadline ) )
          }
        }
        catch {
          case NonFatal( unexpected ) => promise.failure( unexpected )
        }
      }
    }

    val millis = task.period.toMillis
    ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
  }
}

它似乎工作得很好,不需要睡觉或阻挡个人 Threads

(看看这个库,可以做很多事情来让它更清晰、更容易阅读,Poller.Task.withDeadline 的作用将通过为 class 制作原始构造函数来阐明private。截止日期应始终根据任务 timeout 计算得出,不应是任意的自由变量。)

此代码来自here (framework and trait) and here (implementation). (If you want to use it outright maven coordinates are here。)

您可以使用 Akka Streams。例如,每 500 毫秒调用一次 isComplete,直到 Future 的结果为真,最多调用五次:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}