Service Bus Session Block实现建议
Suggestions for Implementing Session Block for Service Bus
目前,我们有一个带有后台服务的 dotnet 核心应用程序,它从启用会话的服务总线接收消息,其中 sessionId
是 userId
,消息包含对用户信息的更新。现在我们想实现一个功能,通过阻止特定 userId/sessionId
来暂时暂停对特定用户的更新,但在解除阻止时仍按顺序处理消息。解决此问题的最佳方法是什么?
我已尝试查看服务总线文档和示例。主要是,message deferral, message session state and session state sample
而且我找到了一些关于SessionState
和消息延迟的信息,我想知道它们是否可以用来实现这个特性并且仍然保证处理顺序(无论消息是否被延迟的先进先出)。我正在考虑尝试将序列号存储在会话状态中,并继续通过该号码接收延迟消息并递增它以接收下一条消息,直到我 运行 out messages?
目前,我们的代码如下所示:
this.queue.RegisterSessionHandler(
this.SessionHandler,
new SessionHandlerOptions(this.ExceptionHandler)
{
AutoComplete = false,
MessageWaitTimeout = TimeSpan.FromMinutes(1),
});
其中 this.SessionHandler
是处理消息然后通过调用 session.CompleteAsync
和 session.CloseAsync
完成并关闭会话的函数。但是,我无法概念化如何将延迟逻辑添加到我们的代码中。因为目前,RegisterSessionHandler
已经处理会话锁并使用 sessionId
对消息进行负载平衡(我假设),这很棒。但是 RegisterSessionHandler
也不允许您指定要处理的特定 sessionId
。
假设我有一些消息被推迟到 userId/sessionId: A
。当我想取消阻止该用户的处理时,我不能简单地将延迟的消息插入回队列。由于发件人仍会不断将用户A的消息发送到队列中,这会弄乱顺序。
我上面提到的 会话状态示例 有一个很好的例子,说明如何使用会话状态和处理延迟消息。但是,它只使用了一个 sessionId
而没有使用 RegisterSessionHandler
。我的问题是:如果我们想要实现延迟消息处理逻辑(保留顺序),我们是否必须实现我们自己的 RegisterSessionHandler
并处理 sessionId
负载平衡?
提前致谢!
您应该在 QueueClient 中使用 SessionClient
而不是使用 RegisterSessionHandler
以更好地处理延迟情况并保持顺序。您可以在邮件正文中保留一些 step/sequence 号码。当您实际处理消息时,还要添加 LastProcessedStep/Seqence。 Session state allows keeping track of the processing state a handler has related to a session, so that clients can be agile between processing nodes (including failover) during session processing. The sample 通过维护该 (Step) 来处理延迟的消息。它结合了延迟和会话功能,因此会话状态工具用于跟踪工作流的处理状态,其中各个步骤的输入未按预期顺序到达。还请注意发送方代码,它演示了通过以不可预测的顺序发送消息,但凭借会话状态,接收方检测到顺序。
//
// Copyright © Microsoft Corporation, All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
// ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
// PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
//
// See the Apache License, Version 2.0 for the specific language
// governing permissions and limitations under the License.
namespace SessionState
{
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
public class Program : MessagingSamples.Sample
{
public async Task Run(string connectionString)
{
Console.WriteLine("Press any key to exit the scenario");
var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
var receiveTask = this.ReceiveMessagesAsync(connectionString, SessionQueueName);
await Task.WhenAll(sendTask, sendTask2, receiveTask);
}
async Task SendMessagesAsync(string session, string connectionString, string queueName)
{
var sender = new MessageSender(connectionString, queueName);
Console.WriteLine("Sending messages to Queue...");
ProcessingState[] data = new[]
{
new ProcessingState {Step = 1, Title = "Buy"},
new ProcessingState {Step = 2, Title = "Unpack"},
new ProcessingState {Step = 3, Title = "Prepare"},
new ProcessingState {Step = 4, Title = "Cook"},
new ProcessingState {Step = 5, Title = "Eat"},
};
var rnd = new Random();
var tasks = new List<Task>();
for (int i = 0; i < data.Length; i++)
{
var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
{
SessionId = session,
ContentType = "application/json",
Label = "RecipeStep",
MessageId = i.ToString(),
TimeToLive = TimeSpan.FromMinutes(2)
};
tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
async (t) =>
{
await sender.SendAsync(message);
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("Message sent: Id = {0}", message.MessageId);
Console.ResetColor();
}
}));
}
await Task.WhenAll(tasks);
}
async Task ReceiveMessagesAsync(string connectionString, string queueName)
{
var client = new SessionClient(connectionString, queueName, ReceiveMode.PeekLock);
while (true)
{
var session = await client.AcceptMessageSessionAsync();
await Task.Run(
async () =>
{
ProcessingState processingState;
var stateData = await session.GetStateAsync();
if (stateData != null)
{
processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
}
else
{
processingState = new ProcessingState
{
LastProcessedRecipeStep = 0,
DeferredSteps = new Dictionary<int, long>()
};
}
while (true)
{
try
{
//receive messages from Queue
var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
if (message != null)
{
if (message.Label != null &&
message.ContentType != null &&
message.Label.Equals("RecipeStep", StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
message.MessageId,
message.SystemProperties.SequenceNumber,
message.SystemProperties.EnqueuedTimeUtc,
message.ContentType,
message.Size,
message.ExpiresAtUtc,
recipeStep.Step,
recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(message.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = recipeStep.Step;
await
session.SetStateAsync(
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
else
{
// in your case, if customer update is blocked, you can defer
processingState.DeferredSteps.Add((int)recipeStep.Step, (long)message.SystemProperties.SequenceNumber);
await session.DeferAsync(message.SystemProperties.LockToken);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
else
{
await session.DeadLetterAsync(message.SystemProperties.LockToken);//, "ProcessingError", "Don't know what to do with this message");
}
}
else
{
while (processingState.DeferredSteps.Count > 0)
{
long step;
if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1, out step))
{
var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
var body = deferredMessage.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
deferredMessage.MessageId,
deferredMessage.SystemProperties.SequenceNumber,
deferredMessage.SystemProperties.EnqueuedTimeUtc,
deferredMessage.ContentType,
deferredMessage.Size,
deferredMessage.ExpiresAtUtc,
recipeStep.Step,
recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
break;
}
}
catch (ServiceBusException e)
{
if (!e.IsTransient)
{
Console.WriteLine(e.Message);
throw;
}
}
}
await session.CloseAsync();
});
}
}
public static int Main(string[] args)
{
try
{
var app = new Program();
app.RunSample(args, app.Run);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
return 1;
}
return 0;
}
class ProcessingState
{
[JsonProperty]
public int LastProcessedRecipeStep { get; set; }
[JsonProperty]
public Dictionary<int, long> DeferredSteps { get; set; }
[JsonProperty]
public int Step { get; internal set; }
[JsonProperty]
public string Title { get; internal set; }
}
}
}
您也可以按照 Ordering Messages in Azure Service Bus 进行操作,它非常简要地解释了这个概念。但那里提供的样本与上面略有不同。
警告:使用消息会话也意味着对于会话(对于您的情况是用户 ID),该会话中的消息将始终由单个接收者接收和处理。因此,在设置 Session 和 SessionId 时要注意。如果您创建一个非常大的会话,这将迫使 Azure 服务总线将大部分消息发送给一个订阅者,从而减少 multi-threading 的好处。如果您将会话设置得过于细化,那么它就会失去预期的好处,您只会增加不必要的开销。
目前,我们有一个带有后台服务的 dotnet 核心应用程序,它从启用会话的服务总线接收消息,其中 sessionId
是 userId
,消息包含对用户信息的更新。现在我们想实现一个功能,通过阻止特定 userId/sessionId
来暂时暂停对特定用户的更新,但在解除阻止时仍按顺序处理消息。解决此问题的最佳方法是什么?
我已尝试查看服务总线文档和示例。主要是,message deferral, message session state and session state sample
而且我找到了一些关于SessionState
和消息延迟的信息,我想知道它们是否可以用来实现这个特性并且仍然保证处理顺序(无论消息是否被延迟的先进先出)。我正在考虑尝试将序列号存储在会话状态中,并继续通过该号码接收延迟消息并递增它以接收下一条消息,直到我 运行 out messages?
目前,我们的代码如下所示:
this.queue.RegisterSessionHandler(
this.SessionHandler,
new SessionHandlerOptions(this.ExceptionHandler)
{
AutoComplete = false,
MessageWaitTimeout = TimeSpan.FromMinutes(1),
});
其中 this.SessionHandler
是处理消息然后通过调用 session.CompleteAsync
和 session.CloseAsync
完成并关闭会话的函数。但是,我无法概念化如何将延迟逻辑添加到我们的代码中。因为目前,RegisterSessionHandler
已经处理会话锁并使用 sessionId
对消息进行负载平衡(我假设),这很棒。但是 RegisterSessionHandler
也不允许您指定要处理的特定 sessionId
。
假设我有一些消息被推迟到 userId/sessionId: A
。当我想取消阻止该用户的处理时,我不能简单地将延迟的消息插入回队列。由于发件人仍会不断将用户A的消息发送到队列中,这会弄乱顺序。
我上面提到的 会话状态示例 有一个很好的例子,说明如何使用会话状态和处理延迟消息。但是,它只使用了一个 sessionId
而没有使用 RegisterSessionHandler
。我的问题是:如果我们想要实现延迟消息处理逻辑(保留顺序),我们是否必须实现我们自己的 RegisterSessionHandler
并处理 sessionId
负载平衡?
提前致谢!
您应该在 QueueClient 中使用 SessionClient
而不是使用 RegisterSessionHandler
以更好地处理延迟情况并保持顺序。您可以在邮件正文中保留一些 step/sequence 号码。当您实际处理消息时,还要添加 LastProcessedStep/Seqence。 Session state allows keeping track of the processing state a handler has related to a session, so that clients can be agile between processing nodes (including failover) during session processing. The sample 通过维护该 (Step) 来处理延迟的消息。它结合了延迟和会话功能,因此会话状态工具用于跟踪工作流的处理状态,其中各个步骤的输入未按预期顺序到达。还请注意发送方代码,它演示了通过以不可预测的顺序发送消息,但凭借会话状态,接收方检测到顺序。
//
// Copyright © Microsoft Corporation, All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
// ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
// PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
//
// See the Apache License, Version 2.0 for the specific language
// governing permissions and limitations under the License.
namespace SessionState
{
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
public class Program : MessagingSamples.Sample
{
public async Task Run(string connectionString)
{
Console.WriteLine("Press any key to exit the scenario");
var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
var receiveTask = this.ReceiveMessagesAsync(connectionString, SessionQueueName);
await Task.WhenAll(sendTask, sendTask2, receiveTask);
}
async Task SendMessagesAsync(string session, string connectionString, string queueName)
{
var sender = new MessageSender(connectionString, queueName);
Console.WriteLine("Sending messages to Queue...");
ProcessingState[] data = new[]
{
new ProcessingState {Step = 1, Title = "Buy"},
new ProcessingState {Step = 2, Title = "Unpack"},
new ProcessingState {Step = 3, Title = "Prepare"},
new ProcessingState {Step = 4, Title = "Cook"},
new ProcessingState {Step = 5, Title = "Eat"},
};
var rnd = new Random();
var tasks = new List<Task>();
for (int i = 0; i < data.Length; i++)
{
var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
{
SessionId = session,
ContentType = "application/json",
Label = "RecipeStep",
MessageId = i.ToString(),
TimeToLive = TimeSpan.FromMinutes(2)
};
tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
async (t) =>
{
await sender.SendAsync(message);
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine("Message sent: Id = {0}", message.MessageId);
Console.ResetColor();
}
}));
}
await Task.WhenAll(tasks);
}
async Task ReceiveMessagesAsync(string connectionString, string queueName)
{
var client = new SessionClient(connectionString, queueName, ReceiveMode.PeekLock);
while (true)
{
var session = await client.AcceptMessageSessionAsync();
await Task.Run(
async () =>
{
ProcessingState processingState;
var stateData = await session.GetStateAsync();
if (stateData != null)
{
processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
}
else
{
processingState = new ProcessingState
{
LastProcessedRecipeStep = 0,
DeferredSteps = new Dictionary<int, long>()
};
}
while (true)
{
try
{
//receive messages from Queue
var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
if (message != null)
{
if (message.Label != null &&
message.ContentType != null &&
message.Label.Equals("RecipeStep", StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
{
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
message.MessageId,
message.SystemProperties.SequenceNumber,
message.SystemProperties.EnqueuedTimeUtc,
message.ContentType,
message.Size,
message.ExpiresAtUtc,
recipeStep.Step,
recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(message.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = recipeStep.Step;
await
session.SetStateAsync(
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
else
{
// in your case, if customer update is blocked, you can defer
processingState.DeferredSteps.Add((int)recipeStep.Step, (long)message.SystemProperties.SequenceNumber);
await session.DeferAsync(message.SystemProperties.LockToken);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
else
{
await session.DeadLetterAsync(message.SystemProperties.LockToken);//, "ProcessingError", "Don't know what to do with this message");
}
}
else
{
while (processingState.DeferredSteps.Count > 0)
{
long step;
if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1, out step))
{
var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
var body = deferredMessage.Body;
ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
deferredMessage.MessageId,
deferredMessage.SystemProperties.SequenceNumber,
deferredMessage.SystemProperties.EnqueuedTimeUtc,
deferredMessage.ContentType,
deferredMessage.Size,
deferredMessage.ExpiresAtUtc,
recipeStep.Step,
recipeStep.Title);
Console.ResetColor();
}
await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
}
}
break;
}
}
catch (ServiceBusException e)
{
if (!e.IsTransient)
{
Console.WriteLine(e.Message);
throw;
}
}
}
await session.CloseAsync();
});
}
}
public static int Main(string[] args)
{
try
{
var app = new Program();
app.RunSample(args, app.Run);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
return 1;
}
return 0;
}
class ProcessingState
{
[JsonProperty]
public int LastProcessedRecipeStep { get; set; }
[JsonProperty]
public Dictionary<int, long> DeferredSteps { get; set; }
[JsonProperty]
public int Step { get; internal set; }
[JsonProperty]
public string Title { get; internal set; }
}
}
}
您也可以按照 Ordering Messages in Azure Service Bus 进行操作,它非常简要地解释了这个概念。但那里提供的样本与上面略有不同。
警告:使用消息会话也意味着对于会话(对于您的情况是用户 ID),该会话中的消息将始终由单个接收者接收和处理。因此,在设置 Session 和 SessionId 时要注意。如果您创建一个非常大的会话,这将迫使 Azure 服务总线将大部分消息发送给一个订阅者,从而减少 multi-threading 的好处。如果您将会话设置得过于细化,那么它就会失去预期的好处,您只会增加不必要的开销。