like-stream 与 like-cluster

akka-streams with akka-cluster

我的 akka-streams 学习马拉松继续进行。我想将我的 akka-streams 应用程序与 akka-cluster and DistributedPubSubMediator.

集成

添加对发布的支持相当简单,但我在订阅部分遇到了麻烦。

供参考,订阅者在Typesafe sample中给出如下:

class ChatClient(name: String) extends Actor {
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe("some topic", self)

  def receive = {
    case ChatClient.Message(from, text) =>
      ...process message...
  }
}

我的问题是,我应该如何将此 actor 与我的流程集成,以及我应该如何确保在没有流背压的情况下获得发布消息?

我正在尝试实现一个 pubsub 模型,其中一个流可以发布一条消息,而另一个流将使用它(如果已订阅)。

您可能想让您的 Actor 扩展 ActorPublisher。然后您可以从中创建一个源并将其集成到您的流中。

在此处查看 ActorPublisher 上的文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html

关于这个主题有一个非常好的 YouTube presentation。与集群的集成即将结束,但整个演讲内容非常丰富。

其他答案已过时:他们建议使用 ActorPublisher,自版本 2.5.0 以来已弃用。

对于那些对当前方法感兴趣的人,Colin Breck 在他的博客中写了一个关于集成 Akka Streams 和 Akka actors 的优秀系列。在整个系列的过程中,Breck 充实了一个系统,该系统从 Akka Streams 和普通演员开始,然后合并了 Akka Cluster 和 Akka Persistence。该系列中的第一个 post 是 here (the distributed stream processing piece is in part 3)。