如何管理 BroadcastHub 目的地
How to manage BroadcastHub destinations
假设我创建一个runnable,如下:
var (sink, source) = MergeHub
.Source<string>(perProducerBufferSize: 16)
.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 256), Keep.Both)
.Run(sys.Materializer());
然后我想动态添加和删除 BroadcastHub 的 Actor 消费者。我可以做到:
var myProps = Props.Create<MyActor>();
var myActor = sys.System.ActorOf(myProps);
source.RunForeach(p => actor.Tell(p), myMaterializer);
(这样对吗?)
但是我想编写一个 RegisterListener() 和 UnregisterListener() 来添加和删除将接收消息的参与者。有这样做的好模式吗?用法为:
var myActor1 = ...
RegisterListener(myActor1);
var myActor2 = ...
RegisterListener(myActor2);
...
UnregisterListener(myActor2);
UnregisterListener(myActor1);
如果没有,我可以使用字典来跟踪注册,但我没有看到让演员停止收听并销毁该注册的明显方法?
您的案例听起来像是通过 akka 流构建广播 pub/sub 的示例。 The official documentation 展示了这个例子。
假设我创建一个runnable,如下:
var (sink, source) = MergeHub
.Source<string>(perProducerBufferSize: 16)
.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 256), Keep.Both)
.Run(sys.Materializer());
然后我想动态添加和删除 BroadcastHub 的 Actor 消费者。我可以做到:
var myProps = Props.Create<MyActor>();
var myActor = sys.System.ActorOf(myProps);
source.RunForeach(p => actor.Tell(p), myMaterializer);
(这样对吗?)
但是我想编写一个 RegisterListener() 和 UnregisterListener() 来添加和删除将接收消息的参与者。有这样做的好模式吗?用法为:
var myActor1 = ...
RegisterListener(myActor1);
var myActor2 = ...
RegisterListener(myActor2);
...
UnregisterListener(myActor2);
UnregisterListener(myActor1);
如果没有,我可以使用字典来跟踪注册,但我没有看到让演员停止收听并销毁该注册的明显方法?
您的案例听起来像是通过 akka 流构建广播 pub/sub 的示例。 The official documentation 展示了这个例子。