在 akka-stream 中如何从期货集合中创建无序源
In akka-stream how to create a unordered Source from a futures collection
我需要从 Future[T]
的集合中创建一个 akka.stream.scaladsl.Source[T, Unit]
。
例如,有一组返回整数的期货,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
如何创建
val source: Source[Int, Unit] = ???
来自它。
我不能使用 Future.sequence
组合器,从那时起我会等待每个 future 完成,然后再从源中获取任何东西。我想在任何未来完成后立即获得 any 订单的结果。
我知道 Source
是一个纯粹的函数 API,在以某种方式实现它之前它不应该 运行 任何东西。所以,我的想法是使用 Iterator
(这是懒惰的)来创建一个源:
Source { () =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
但这将是期货的来源,而不是实际价值的来源。我也可以使用 Await.result(future)
在 next
上阻塞,但我不确定哪个线程池的线程会被阻塞。这也将顺序调用期货,而我需要并行执行。
更新 2:事实证明有一种更简单的方法(感谢 Viktor Klang):
Source(futures).mapAsync(1)(identity)
更新:这是我根据@sschaef 的回答得到的:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
提供 Source 的最简单方法之一是通过 Actor:
import scala.concurrent.Future
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
implicit val system = ActorSystem("MySystem")
def run(actor: ActorRef): Unit = {
import system.dispatcher
Future { Thread.sleep(100); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(300); actor ! 3 }
}
val source = Source
.actorRef[Int](0, OverflowStrategy.fail)
.mapMaterializedValue(ref ⇒ run(ref))
implicit val m = ActorMaterializer()
source runForeach { int ⇒
println(s"received: $int")
}
Actor 是通过 Source.actorRef
方法创建的,并通过 mapMaterializedValue
方法提供。 run
简单地获取 Actor 并将所有完成的值发送给它,然后可以通过 source
访问它。在上面的示例中,值直接在 Future 中发送,但这当然可以在任何地方完成(例如在 Future 的 onComplete
调用中)。
创建 Futures 源,然后通过 mapAsync "flatten" 创建它:
scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804
我需要从 Future[T]
的集合中创建一个 akka.stream.scaladsl.Source[T, Unit]
。
例如,有一组返回整数的期货,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
如何创建
val source: Source[Int, Unit] = ???
来自它。
我不能使用 Future.sequence
组合器,从那时起我会等待每个 future 完成,然后再从源中获取任何东西。我想在任何未来完成后立即获得 any 订单的结果。
我知道 Source
是一个纯粹的函数 API,在以某种方式实现它之前它不应该 运行 任何东西。所以,我的想法是使用 Iterator
(这是懒惰的)来创建一个源:
Source { () =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
但这将是期货的来源,而不是实际价值的来源。我也可以使用 Await.result(future)
在 next
上阻塞,但我不确定哪个线程池的线程会被阻塞。这也将顺序调用期货,而我需要并行执行。
更新 2:事实证明有一种更简单的方法(感谢 Viktor Klang):
Source(futures).mapAsync(1)(identity)
更新:这是我根据@sschaef 的回答得到的:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
提供 Source 的最简单方法之一是通过 Actor:
import scala.concurrent.Future
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
implicit val system = ActorSystem("MySystem")
def run(actor: ActorRef): Unit = {
import system.dispatcher
Future { Thread.sleep(100); actor ! 1 }
Future { Thread.sleep(200); actor ! 2 }
Future { Thread.sleep(300); actor ! 3 }
}
val source = Source
.actorRef[Int](0, OverflowStrategy.fail)
.mapMaterializedValue(ref ⇒ run(ref))
implicit val m = ActorMaterializer()
source runForeach { int ⇒
println(s"received: $int")
}
Actor 是通过 Source.actorRef
方法创建的,并通过 mapMaterializedValue
方法提供。 run
简单地获取 Actor 并将所有完成的值发送给它,然后可以通过 source
访问它。在上面的示例中,值直接在 Future 中发送,但这当然可以在任何地方完成(例如在 Future 的 onComplete
调用中)。
创建 Futures 源,然后通过 mapAsync "flatten" 创建它:
scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804