Akka 流 - 它可以像普通演员或其他方式一样扩展吗?
Akka streams - can it be scale like regular actors or other way?
我有一个使用 Akka 流执行管道的代码。
我的问题是扩展它的最佳方式是什么?也可以使用 Akka 流来完成吗?
还是需要转换成actors/other方式?
代码片段是:
val future = SqsSource(sqsEndpoint)(awsSqsClient)
.takeWhile(_=>true)
.map { m: Message =>
(m, Ack())
}.runWith(SqsAckSink(sqsEndpoint)(awsSqsClient))
如果您稍微修改一下代码,那么您的流将具体化为多个 Actor
值。这些具体化的 Actors 将为您提供您正在寻找的并发性:
val future =
SqsSource(sqsEnpoint)(awsSqsClient) //Actor 1
.via(Flow[Message] map (m => (m, Ack()))) //Actor 2
.to(SqsAckSink(sqsEndpoint)(awsSqsClient)) //Actor 3
.run()
注意 via
和 to
的用法。这些很重要,因为它们表明流的那些阶段应该具体化为单独的 Actor。在您的示例代码中,您在 Source
上使用 map
和 runWith
,由于 operator fusion.
,这将导致仅创建 1 个 Actor
询问外部参与者的流程
如果您希望扩展到更多的 Actor,那么您可以使用 Flow#mapAsync
查询外部 Actor 来做更多的工作,类似于 。
我有一个使用 Akka 流执行管道的代码。
我的问题是扩展它的最佳方式是什么?也可以使用 Akka 流来完成吗?
还是需要转换成actors/other方式?
代码片段是:
val future = SqsSource(sqsEndpoint)(awsSqsClient)
.takeWhile(_=>true)
.map { m: Message =>
(m, Ack())
}.runWith(SqsAckSink(sqsEndpoint)(awsSqsClient))
如果您稍微修改一下代码,那么您的流将具体化为多个 Actor
值。这些具体化的 Actors 将为您提供您正在寻找的并发性:
val future =
SqsSource(sqsEnpoint)(awsSqsClient) //Actor 1
.via(Flow[Message] map (m => (m, Ack()))) //Actor 2
.to(SqsAckSink(sqsEndpoint)(awsSqsClient)) //Actor 3
.run()
注意 via
和 to
的用法。这些很重要,因为它们表明流的那些阶段应该具体化为单独的 Actor。在您的示例代码中,您在 Source
上使用 map
和 runWith
,由于 operator fusion.
询问外部参与者的流程
如果您希望扩展到更多的 Actor,那么您可以使用 Flow#mapAsync
查询外部 Actor 来做更多的工作,类似于