like-stream 与 like-cluster
akka-streams with akka-cluster
我的 akka-streams 学习马拉松继续进行。我想将我的 akka-streams 应用程序与 akka-cluster and DistributedPubSubMediator.
集成
添加对发布的支持相当简单,但我在订阅部分遇到了麻烦。
供参考,订阅者在Typesafe sample中给出如下:
class ChatClient(name: String) extends Actor {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("some topic", self)
def receive = {
case ChatClient.Message(from, text) =>
...process message...
}
}
我的问题是,我应该如何将此 actor 与我的流程集成,以及我应该如何确保在没有流背压的情况下获得发布消息?
我正在尝试实现一个 pubsub 模型,其中一个流可以发布一条消息,而另一个流将使用它(如果已订阅)。
您可能想让您的 Actor 扩展 ActorPublisher。然后您可以从中创建一个源并将其集成到您的流中。
在此处查看 ActorPublisher 上的文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html
关于这个主题有一个非常好的 YouTube presentation。与集群的集成即将结束,但整个演讲内容非常丰富。
其他答案已过时:他们建议使用 ActorPublisher
,自版本 2.5.0 以来已弃用。
对于那些对当前方法感兴趣的人,Colin Breck 在他的博客中写了一个关于集成 Akka Streams 和 Akka actors 的优秀系列。在整个系列的过程中,Breck 充实了一个系统,该系统从 Akka Streams 和普通演员开始,然后合并了 Akka Cluster 和 Akka Persistence。该系列中的第一个 post 是 here (the distributed stream processing piece is in part 3)。
我的 akka-streams 学习马拉松继续进行。我想将我的 akka-streams 应用程序与 akka-cluster and DistributedPubSubMediator.
集成添加对发布的支持相当简单,但我在订阅部分遇到了麻烦。
供参考,订阅者在Typesafe sample中给出如下:
class ChatClient(name: String) extends Actor {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("some topic", self)
def receive = {
case ChatClient.Message(from, text) =>
...process message...
}
}
我的问题是,我应该如何将此 actor 与我的流程集成,以及我应该如何确保在没有流背压的情况下获得发布消息?
我正在尝试实现一个 pubsub 模型,其中一个流可以发布一条消息,而另一个流将使用它(如果已订阅)。
您可能想让您的 Actor 扩展 ActorPublisher。然后您可以从中创建一个源并将其集成到您的流中。
在此处查看 ActorPublisher 上的文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html
关于这个主题有一个非常好的 YouTube presentation。与集群的集成即将结束,但整个演讲内容非常丰富。
其他答案已过时:他们建议使用 ActorPublisher
,自版本 2.5.0 以来已弃用。
对于那些对当前方法感兴趣的人,Colin Breck 在他的博客中写了一个关于集成 Akka Streams 和 Akka actors 的优秀系列。在整个系列的过程中,Breck 充实了一个系统,该系统从 Akka Streams 和普通演员开始,然后合并了 Akka Cluster 和 Akka Persistence。该系列中的第一个 post 是 here (the distributed stream processing piece is in part 3)。