Orleans 谷物客户端和谷物之间的双向通信
Orleans two way communication between grain client and grain
我想在 grain 方法中实现对 grain 客户端的回调,例如。在评估 Grain.Method1 期间,需要调用客户端方法来获取一些数据。
我已经尝试使用流来执行此操作,但是当我在客户端上订阅流时,该方法不会触发。
粮食:
var config = ClusterConfiguration.LocalhostPrimarySilo();
config.AddMemoryStorageProvider();
config.Globals.RegisterStorageProvider<MemoryStorage>("PubSubStore");
config.Globals.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
...
public override async Task OnActivateAsync() {
var streamProvider = GetStreamProvider("MySMSProvider");
var stream = streamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
RegisterTimer(s => {
return stream.OnNextAsync(new MyTypeMessage());
}, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
...
客户:
var clientConfiguration = ClientConfiguration.LocalhostSilo();
clientConfiguration.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
GrainClient.Initialize(clientConfiguration);
...
var streamProvider = GrainClient.GetStreamProvider("MySMSProvider");
var stream = requestStreamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
await stream.SubscribeAsync(
async (message, token) => { process message that does not fire });
您似乎没有保留对 StreamSubscriptionHandle<MyTypeMessage>
的引用,该引用由订阅
返回
尝试这样的事情:
var subscriptionHandle = await _factCalculationRequestStream
.SubscribeAsync(async (message, token) => { process message that does not fire });
并防止 subscriptionHandle
被垃圾收集
在我的例子中,我最终在我的 class 上实现了 IGrainObserver
(这是 WebAPI 项目中的一个 websocketBehaviour,这个订阅运行良好:
public class MySocketService: WebSocketBehavior, IGrainObserver, IDisposable
{
private StreamSubscriptionHandle<MyStreamEvent>
...
//inside some method invoked externally
_streamSubscriberHandle = await requestedStream
.SubscribeAsync(onStreamMessage, onStreamError, onStreamComplete);
}
我想在 grain 方法中实现对 grain 客户端的回调,例如。在评估 Grain.Method1 期间,需要调用客户端方法来获取一些数据。
我已经尝试使用流来执行此操作,但是当我在客户端上订阅流时,该方法不会触发。
粮食:
var config = ClusterConfiguration.LocalhostPrimarySilo();
config.AddMemoryStorageProvider();
config.Globals.RegisterStorageProvider<MemoryStorage>("PubSubStore");
config.Globals.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
...
public override async Task OnActivateAsync() {
var streamProvider = GetStreamProvider("MySMSProvider");
var stream = streamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
RegisterTimer(s => {
return stream.OnNextAsync(new MyTypeMessage());
}, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
...
客户:
var clientConfiguration = ClientConfiguration.LocalhostSilo();
clientConfiguration.RegisterStreamProvider<SimpleMessageStreamProvider>("MySMSProvider");
GrainClient.Initialize(clientConfiguration);
...
var streamProvider = GrainClient.GetStreamProvider("MySMSProvider");
var stream = requestStreamProvider.GetStream<MyTypeMessage>(myGuid, "MyStream");
await stream.SubscribeAsync(
async (message, token) => { process message that does not fire });
您似乎没有保留对 StreamSubscriptionHandle<MyTypeMessage>
的引用,该引用由订阅
尝试这样的事情:
var subscriptionHandle = await _factCalculationRequestStream
.SubscribeAsync(async (message, token) => { process message that does not fire });
并防止 subscriptionHandle
被垃圾收集
在我的例子中,我最终在我的 class 上实现了 IGrainObserver
(这是 WebAPI 项目中的一个 websocketBehaviour,这个订阅运行良好:
public class MySocketService: WebSocketBehavior, IGrainObserver, IDisposable
{
private StreamSubscriptionHandle<MyStreamEvent>
...
//inside some method invoked externally
_streamSubscriberHandle = await requestedStream
.SubscribeAsync(onStreamMessage, onStreamError, onStreamComplete);
}