服务结构集群。状态服务。服务总线队列使用
Service Fabric cluster. Statefull service. Service Bus queue usage
完全披露:我是 Service Fabric 开发的新手。这是我的情况。我们有 Service Fabric 集群。我们在那里部署了有状态服务。服务有指定的服务总线队列,它会监听。因此集群中所有节点上的所有服务实例都在监听同一个服务总线队列。每个服务实例向服务总线队列注册 OnMessage 回调以处理消息,如下所示:
QueueClient Queue = QueueClient.CreateFromConnectionString(
GetServicebusConnectionString(),
ConfigData.SERVICE_QUEUE_NAME);
if (Queue != null)
{
var options = new OnMessageOptions();
options.AutoComplete = false;
Queue.OnMessage((receivedMessage) =>
ProcessMessage(receivedMessage), options);
}
现在,根据日志中的消息,似乎所有服务实例都提取了已同时放入队列中的消息。这不是一件好事。
问题是:
Is it possible to use Service Bus queue in a way when each message from the queue would be picked up only by one service instance?
队列客户端默认receive mode是PeekLock,你设置AutoComplete 属性为false,客户端收到消息后不会自动删除消息。锁过期后,消息将再次可用,其他服务实例可以再次接收和处理它。
Is it possible to use Service Bus queue in a way when each message from the queue would be picked up only by one service instance?
您可以将自动完成 属性 设置为 true,或者在客户端收到并处理消息后调用 Complete method。
Queue.OnMessage((receivedMessage) =>
{ ProcessMessage(receivedMessage); receivedMessage.Complete(); }, options);
完全披露:我是 Service Fabric 开发的新手。这是我的情况。我们有 Service Fabric 集群。我们在那里部署了有状态服务。服务有指定的服务总线队列,它会监听。因此集群中所有节点上的所有服务实例都在监听同一个服务总线队列。每个服务实例向服务总线队列注册 OnMessage 回调以处理消息,如下所示:
QueueClient Queue = QueueClient.CreateFromConnectionString(
GetServicebusConnectionString(),
ConfigData.SERVICE_QUEUE_NAME);
if (Queue != null)
{
var options = new OnMessageOptions();
options.AutoComplete = false;
Queue.OnMessage((receivedMessage) =>
ProcessMessage(receivedMessage), options);
}
现在,根据日志中的消息,似乎所有服务实例都提取了已同时放入队列中的消息。这不是一件好事。 问题是:
Is it possible to use Service Bus queue in a way when each message from the queue would be picked up only by one service instance?
队列客户端默认receive mode是PeekLock,你设置AutoComplete 属性为false,客户端收到消息后不会自动删除消息。锁过期后,消息将再次可用,其他服务实例可以再次接收和处理它。
Is it possible to use Service Bus queue in a way when each message from the queue would be picked up only by one service instance?
您可以将自动完成 属性 设置为 true,或者在客户端收到并处理消息后调用 Complete method。
Queue.OnMessage((receivedMessage) =>
{ ProcessMessage(receivedMessage); receivedMessage.Complete(); }, options);