在不轮询的情况下在 Service Fabric 中使用可靠队列?
Use ReliableQueue in ServiceFabric without polling?
我一直在研究 Service Fabric 中的有状态服务。我一直在研究示例,特别是 WordCount。他们有一个 RunAsync 方法,在 WordCountService 内部看起来像这样:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
IReliableQueue<string> inputQueue = await this.StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalValue<string> dequeuReply = await inputQueue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
//... {more example code here }
}
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
}
catch (TimeoutException)
{
//Service Fabric uses timeouts on collection operations to prevent deadlocks.
//If this exception is thrown, it means that this transaction was waiting the default
//amount of time (4 seconds) but was unable to acquire the lock. In this case we simply
//retry after a random backoff interval. You can also control the timeout via a parameter
//on the collection operation.
Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(100, 300)));
continue;
}
catch (Exception exception)
{
//For sample code only: simply trace the exception.
ServiceEventSource.Current.MessageEvent(exception.ToString());
}
}
}
本质上,在此示例中,服务每 100 毫秒轮询一次 ReliableQueue 以获取消息。有没有办法在没有民意调查的情况下做到这一点?当消息成功添加到 ReliableQueue 时,我们可以订阅事件或触发的事件吗?
不,目前没有可用于 ReliableQueue 的事件。您必须轮询新项目。
我建议使用 ReliableDispatcher in your service, or just use a Dispatcher Service。
使用 Dispatcher 服务 允许您编写一个方法,每当项目在底层可靠队列中排队时调用该方法。
例如:
public override async Task OnItemDispatchedAsync(
ITransaction transaction,
int value,
CancellationToken cancellationToken)
{
// Do something with the value that has been dequeued
}
Reliable Dispatcher 和 Dispatcher Service 都可以通过 NuGet package 使用,并且在 GitHub 让你开始:
我一直在研究 Service Fabric 中的有状态服务。我一直在研究示例,特别是 WordCount。他们有一个 RunAsync 方法,在 WordCountService 内部看起来像这样:
protected override async Task RunAsync(CancellationToken cancellationToken)
{
IReliableQueue<string> inputQueue = await this.StateManager.GetOrAddAsync<IReliableQueue<string>>("inputQueue");
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
using (ITransaction tx = this.StateManager.CreateTransaction())
{
ConditionalValue<string> dequeuReply = await inputQueue.TryDequeueAsync(tx);
if (dequeuReply.HasValue)
{
//... {more example code here }
}
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
}
catch (TimeoutException)
{
//Service Fabric uses timeouts on collection operations to prevent deadlocks.
//If this exception is thrown, it means that this transaction was waiting the default
//amount of time (4 seconds) but was unable to acquire the lock. In this case we simply
//retry after a random backoff interval. You can also control the timeout via a parameter
//on the collection operation.
Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(100, 300)));
continue;
}
catch (Exception exception)
{
//For sample code only: simply trace the exception.
ServiceEventSource.Current.MessageEvent(exception.ToString());
}
}
}
本质上,在此示例中,服务每 100 毫秒轮询一次 ReliableQueue 以获取消息。有没有办法在没有民意调查的情况下做到这一点?当消息成功添加到 ReliableQueue 时,我们可以订阅事件或触发的事件吗?
不,目前没有可用于 ReliableQueue 的事件。您必须轮询新项目。
我建议使用 ReliableDispatcher in your service, or just use a Dispatcher Service。
使用 Dispatcher 服务 允许您编写一个方法,每当项目在底层可靠队列中排队时调用该方法。
例如:
public override async Task OnItemDispatchedAsync(
ITransaction transaction,
int value,
CancellationToken cancellationToken)
{
// Do something with the value that has been dequeued
}
Reliable Dispatcher 和 Dispatcher Service 都可以通过 NuGet package 使用,并且在 GitHub 让你开始: