在 akka-stream 中使用 actor 时如何获取 return 值
How get return value when using actor in akka-stream
我正在学习 scala 的 akka-stream。
我想使用这样的代码获取演员发送的值:
case class Message(value: String)
class LibraryService @Inject()(@Named("library-actor") library: ActorRef)(implicit ec: ExecutionContext, ml: Materializer) {
val sink = Sink.actorRef[Message](library, onCompleteMessage = "stream completed", onFailureMessage = (throwable: Throwable) => Fail(throwable.getMessage))
val source = Source.single(Message("test"))
def add(message: String): Unit = {
val runnable = source to sink
val value = runnable.run() // I want get "I receive 'test'" at here.
print(value) // This code prints "Not Used"
}
}
class Library extends Actor {
override def receive: Receive = {
case Message(message) => {
sender ! s"I receive '$message'"
}
}
}
但是,在上面的代码中,只输出了“Not Used”,发送方发送的值并没有存储在'value'中。
我想获取actor的'sender'.
发送的值
如果你知道什么,请告诉我。
Sink.actorRef
具体化为NotUsed
,意味着它不提供有用的值。它所做的只是将消息转发给目标演员。
要获得回复,此示例的最佳方法可能是:
Source.single(Message("test"))
.runWith(Sink.foreachAsync(1) { msg =>
(library ? msg).map(println _)
})
一般来说,如果您要从流中向参与者发送消息并期待响应,mapAsync
与请求应该是您的首选(mapAsyncUnordered
以获得更好的吞吐量当且仅当如果您不关心保留流顺序)。如果您正在执行 mapAsyncUnordered
并立即将结果传递给 Sink.foreach
以调用副作用,则可能值得在 Sink.foreachAsync
中将询问与副作用结合起来并删除 mapAsyncUnordered
.
编辑添加:如果你确实关心顺序(因此使用 mapAsync
而不是 mapAsyncUnordered
),你仍然可以使用 Sink.foreachAsync
,但一定要使用单并行(即 Sink.foreachAsync(1)
)。另一方面,mapAsync
到 Sink.foreach
的优点是您可以在 mapAsync
上使并行度大于一个,并且仍然保留接收器中副作用的顺序,例如:
/**
* Transforms values from a source in parallel and applies a side
* effect to the transformed values. If value `a` is emitted before
* value `b` by the source, the effects arising from `pureFutF(a)`
* will execute before the effects arising from `pureFutF(b)`.
*
* @param src
* @param parallelism how many calls to pureFutF are in-flight at a
* time
* @param pureFutF asynchronous transformation; if parallelism is
* greater than 1, order is not guaranteed, so ideally the order
* in which transformations are executed is not important
* @param sideEffect
* @return a future which successfully completes when the source has
* completed and all side effects have been executed
*/
def runParallelStreamMap[A, B](src: Source[A, Any], parallelism: Int)
( pureFutF: A => Future[B], sideEffect: B => Unit): Future[Done] = {
src.mapAsync(parallelism)(pureFutF).runWith(Sink.foreach(sideEffect))
}
如果 Sink
上有一个 .contramapAsync
方法就好了。
我正在学习 scala 的 akka-stream。
我想使用这样的代码获取演员发送的值:
case class Message(value: String)
class LibraryService @Inject()(@Named("library-actor") library: ActorRef)(implicit ec: ExecutionContext, ml: Materializer) {
val sink = Sink.actorRef[Message](library, onCompleteMessage = "stream completed", onFailureMessage = (throwable: Throwable) => Fail(throwable.getMessage))
val source = Source.single(Message("test"))
def add(message: String): Unit = {
val runnable = source to sink
val value = runnable.run() // I want get "I receive 'test'" at here.
print(value) // This code prints "Not Used"
}
}
class Library extends Actor {
override def receive: Receive = {
case Message(message) => {
sender ! s"I receive '$message'"
}
}
}
但是,在上面的代码中,只输出了“Not Used”,发送方发送的值并没有存储在'value'中。
我想获取actor的'sender'.
发送的值
如果你知道什么,请告诉我。
Sink.actorRef
具体化为NotUsed
,意味着它不提供有用的值。它所做的只是将消息转发给目标演员。
要获得回复,此示例的最佳方法可能是:
Source.single(Message("test"))
.runWith(Sink.foreachAsync(1) { msg =>
(library ? msg).map(println _)
})
一般来说,如果您要从流中向参与者发送消息并期待响应,mapAsync
与请求应该是您的首选(mapAsyncUnordered
以获得更好的吞吐量当且仅当如果您不关心保留流顺序)。如果您正在执行 mapAsyncUnordered
并立即将结果传递给 Sink.foreach
以调用副作用,则可能值得在 Sink.foreachAsync
中将询问与副作用结合起来并删除 mapAsyncUnordered
.
编辑添加:如果你确实关心顺序(因此使用 mapAsync
而不是 mapAsyncUnordered
),你仍然可以使用 Sink.foreachAsync
,但一定要使用单并行(即 Sink.foreachAsync(1)
)。另一方面,mapAsync
到 Sink.foreach
的优点是您可以在 mapAsync
上使并行度大于一个,并且仍然保留接收器中副作用的顺序,例如:
/**
* Transforms values from a source in parallel and applies a side
* effect to the transformed values. If value `a` is emitted before
* value `b` by the source, the effects arising from `pureFutF(a)`
* will execute before the effects arising from `pureFutF(b)`.
*
* @param src
* @param parallelism how many calls to pureFutF are in-flight at a
* time
* @param pureFutF asynchronous transformation; if parallelism is
* greater than 1, order is not guaranteed, so ideally the order
* in which transformations are executed is not important
* @param sideEffect
* @return a future which successfully completes when the source has
* completed and all side effects have been executed
*/
def runParallelStreamMap[A, B](src: Source[A, Any], parallelism: Int)
( pureFutF: A => Future[B], sideEffect: B => Unit): Future[Done] = {
src.mapAsync(parallelism)(pureFutF).runWith(Sink.foreach(sideEffect))
}
如果 Sink
上有一个 .contramapAsync
方法就好了。