在 akka-stream 中使用 actor 时如何获取 return 值

How get return value when using actor in akka-stream

我正在学习 scala 的 akka-stream。
我想使用这样的代码获取演员发送的值:

case class Message(value: String)

class LibraryService @Inject()(@Named("library-actor") library: ActorRef)(implicit ec: ExecutionContext, ml: Materializer) {
  val sink = Sink.actorRef[Message](library, onCompleteMessage = "stream completed", onFailureMessage = (throwable: Throwable) => Fail(throwable.getMessage))
  val source = Source.single(Message("test"))

  def add(message: String): Unit = {
    val runnable = source to sink
    val value = runnable.run() // I want get "I receive 'test'"  at here.
    print(value) // This code prints "Not Used"
  }
}

class Library extends Actor {
  override def receive: Receive = {
    case Message(message) => {
      sender ! s"I receive '$message'"
    }
  }
}

但是,在上面的代码中,只输出了“Not Used”,发送方发送的值并没有存储在'value'中。
我想获取actor的'sender'.
发送的值 如果你知道什么,请告诉我。

Sink.actorRef具体化为NotUsed,意味着它不提供有用的值。它所做的只是将消息转发给目标演员。

要获得回复,此示例的最佳方法可能是:

Source.single(Message("test"))
  .runWith(Sink.foreachAsync(1) { msg =>
    (library ? msg).map(println _)
  })

一般来说,如果您要从流中向参与者发送消息并期待响应,mapAsync 与请求应该是您的首选(mapAsyncUnordered 以获得更好的吞吐量当且仅当如果您不关心保留流顺序)。如果您正在执行 mapAsyncUnordered 并立即将结果传递给 Sink.foreach 以调用副作用,则可能值得在 Sink.foreachAsync 中将询问与副作用结合起来并删除 mapAsyncUnordered.

编辑添加:如果你确实关心顺序(因此使用 mapAsync 而不是 mapAsyncUnordered),你仍然可以使用 Sink.foreachAsync,但一定要使用单并行(即 Sink.foreachAsync(1))。另一方面,mapAsyncSink.foreach 的优点是您可以在 mapAsync 上使并行度大于一个,并且仍然保留接收器中副作用的顺序,例如:

/**
 * Transforms values from a source in parallel and applies a side 
 * effect to the transformed values. If value `a` is emitted before
 * value `b` by the source, the effects arising from `pureFutF(a)` 
 * will execute before the effects arising from `pureFutF(b)`.
 *
 * @param src
 * @param parallelism how many calls to pureFutF are in-flight at a 
 *  time
 * @param pureFutF asynchronous transformation; if parallelism is 
 *  greater than 1, order is not guaranteed, so ideally the order
 *  in which transformations are executed is not important
 * @param sideEffect
 * @return a future which successfully completes when the source has 
 *  completed and all side effects have been executed
 */
def runParallelStreamMap[A, B](src: Source[A, Any], parallelism: Int)
  ( pureFutF: A => Future[B], sideEffect: B => Unit): Future[Done] = {

  src.mapAsync(parallelism)(pureFutF).runWith(Sink.foreach(sideEffect))
}

如果 Sink 上有一个 .contramapAsync 方法就好了。