Service Bus Session Block实现建议

Suggestions for Implementing Session Block for Service Bus

目前,我们有一个带有后台服务的 dotnet 核心应用程序,它从启用会话的服务总线接收消息,其中 sessionIduserId,消息包含对用户信息的更新。现在我们想实现一个功能,通过阻止特定 userId/sessionId 来暂时暂停对特定用户的更新,但在解除阻止时仍按顺序处理消息。解决此问题的最佳方法是什么?

我已尝试查看服务总线文档和示例。主要是,message deferral, message session state and session state sample

而且我找到了一些关于SessionState和消息延迟的信息,我想知道它们是否可以用来实现这个特性并且仍然保证处理顺序(无论消息是否被延迟的先进先出)。我正在考虑尝试将序列号存储在会话状态中,并继续通过该号码接收延迟消息并递增它以接收下一条消息,直到我 运行 out messages?

目前,我们的代码如下所示:

            this.queue.RegisterSessionHandler(
                this.SessionHandler,
                new SessionHandlerOptions(this.ExceptionHandler)
                {
                    AutoComplete = false,
                    MessageWaitTimeout = TimeSpan.FromMinutes(1),
                });

其中 this.SessionHandler 是处理消息然后通过调用 session.CompleteAsyncsession.CloseAsync 完成并关闭会话的函数。但是,我无法概念化如何将延迟逻辑添加到我们的代码中。因为目前,RegisterSessionHandler 已经处理会话锁并使用 sessionId 对消息进行负载平衡(我假设),这很棒。但是 RegisterSessionHandler 也不允许您指定要处理的特定 sessionId

假设我有一些消息被推迟到 userId/sessionId: A。当我想取消阻止该用户的处理时,我不能简单地将延迟的消息插入回队列。由于发件人仍会不断将用户A的消息发送到队列中,这会弄乱顺序。

我上面提到的 会话状态示例 有一个很好的例子,说明如何使用会话状态和处理延迟消息。但是,它只使用了一个 sessionId 而没有使用 RegisterSessionHandler。我的问题是:如果我们想要实现延迟消息处理逻辑(保留顺序),我们是否必须实现我们自己的 RegisterSessionHandler 并处理 sessionId 负载平衡?

提前致谢!

您应该在 QueueClient 中使用 SessionClient 而不是使用 RegisterSessionHandler 以更好地处理延迟情况并保持顺序。您可以在邮件正文中保留一些 step/sequence 号码。当您实际处理消息时,还要添加 LastProcessedStep/Seqence。 Session state allows keeping track of the processing state a handler has related to a session, so that clients can be agile between processing nodes (including failover) during session processing. The sample 通过维护该 (Step) 来处理延迟的消息。它结合了延迟和会话功能,因此会话状态工具用于跟踪工作流的处理状态,其中各个步骤的输入未按预期顺序到达。还请注意发送方代码,它演示了通过以不可预测的顺序发送消息,但凭借会话状态,接收方检测到顺序。

//   
//   Copyright © Microsoft Corporation, All Rights Reserved
// 
//   Licensed under the Apache License, Version 2.0 (the "License"); 
//   you may not use this file except in compliance with the License. 
//   You may obtain a copy of the License at
// 
//   http://www.apache.org/licenses/LICENSE-2.0 
// 
//   THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
//   OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
//   ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
//   PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
// 
//   See the Apache License, Version 2.0 for the specific language
//   governing permissions and limitations under the License. 

namespace SessionState
{
    using Microsoft.Azure.ServiceBus;
    using Microsoft.Azure.ServiceBus.Core;
    using Newtonsoft.Json;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    public class Program : MessagingSamples.Sample
    {
        public async Task Run(string connectionString)
        {
            Console.WriteLine("Press any key to exit the scenario");

            var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
            var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
            var receiveTask = this.ReceiveMessagesAsync(connectionString, SessionQueueName);

            await Task.WhenAll(sendTask, sendTask2, receiveTask);
        }

        async Task SendMessagesAsync(string session, string connectionString, string queueName)
        {
            var sender = new MessageSender(connectionString, queueName);


            Console.WriteLine("Sending messages to Queue...");

            ProcessingState[] data = new[]
            {
                new ProcessingState {Step = 1, Title = "Buy"},
                new ProcessingState {Step = 2, Title = "Unpack"},
                new ProcessingState {Step = 3, Title = "Prepare"},
                new ProcessingState {Step = 4, Title = "Cook"},
                new ProcessingState {Step = 5, Title = "Eat"},
            };

            var rnd = new Random();
            var tasks = new List<Task>();
            for (int i = 0; i < data.Length; i++)
            {
                var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                {
                    SessionId = session,
                    ContentType = "application/json",
                    Label = "RecipeStep",
                    MessageId = i.ToString(),
                    TimeToLive = TimeSpan.FromMinutes(2)
                };

                tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
                      async (t) =>
                      {
                          await sender.SendAsync(message);
                          lock (Console.Out)
                          {
                              Console.ForegroundColor = ConsoleColor.Yellow;
                              Console.WriteLine("Message sent: Id = {0}", message.MessageId);
                              Console.ResetColor();
                          }
                      }));
            }
            await Task.WhenAll(tasks);
        }

        async Task ReceiveMessagesAsync(string connectionString, string queueName)
        {
            var client = new SessionClient(connectionString, queueName, ReceiveMode.PeekLock);

            while (true)
            {
                var session = await client.AcceptMessageSessionAsync();
                await Task.Run(
                    async () =>
                    {
                        ProcessingState processingState;

                        var stateData = await session.GetStateAsync();
                        if (stateData != null)
                        {
                            processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
                        }
                        else
                        {
                            processingState = new ProcessingState
                            {
                                LastProcessedRecipeStep = 0,
                                DeferredSteps = new Dictionary<int, long>()
                            };
                        }

                        while (true)
                        {
                            try
                            {
                                //receive messages from Queue
                                var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
                                if (message != null)
                                {
                                    if (message.Label != null &&
                                        message.ContentType != null &&
                                        message.Label.Equals("RecipeStep", StringComparison.InvariantCultureIgnoreCase) &&
                                        message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
                                    {
                                        var body = message.Body;

                                        ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                        if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
                                        {
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
                                                    "\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4},  \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
                                                    message.MessageId,
                                                    message.SystemProperties.SequenceNumber,
                                                    message.SystemProperties.EnqueuedTimeUtc,
                                                    message.ContentType,
                                                    message.Size,
                                                    message.ExpiresAtUtc,
                                                    recipeStep.Step,
                                                    recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(message.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = recipeStep.Step;
                                            await
                                                session.SetStateAsync(
                                                    Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                        else
                                        {
// in your case, if customer update is blocked, you can defer
                                            processingState.DeferredSteps.Add((int)recipeStep.Step, (long)message.SystemProperties.SequenceNumber);
                                            await session.DeferAsync(message.SystemProperties.LockToken);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    else
                                    {
                                        await session.DeadLetterAsync(message.SystemProperties.LockToken);//, "ProcessingError", "Don't know what to do with this message");
                                    }
                                }
                                else
                                {
                                    while (processingState.DeferredSteps.Count > 0)
                                    {
                                        long step;

                                        if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1, out step))
                                        {
                                            var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
                                            var body = deferredMessage.Body;
                                            ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
                                                    "\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4},  \n\t\t\t\t\t\tContent: [ step = {6}, title = {7} ]",
                                                    deferredMessage.MessageId,
                                                    deferredMessage.SystemProperties.SequenceNumber,
                                                    deferredMessage.SystemProperties.EnqueuedTimeUtc,
                                                    deferredMessage.ContentType,
                                                    deferredMessage.Size,
                                                    deferredMessage.ExpiresAtUtc,
                                                    recipeStep.Step,
                                                    recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
                                            processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    break;
                                }
                            }
                            catch (ServiceBusException e)
                            {
                                if (!e.IsTransient)
                                {
                                    Console.WriteLine(e.Message);
                                    throw;
                                }
                            }
                        }
                        await session.CloseAsync();
                    });
            }
        }

       public static int Main(string[] args)
        {
            try
            {
                var app = new Program();
                app.RunSample(args, app.Run);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                return 1;
            }
            return 0;
        }

        class ProcessingState
        {
            [JsonProperty]
            public int LastProcessedRecipeStep { get; set; }
            [JsonProperty]
            public Dictionary<int, long> DeferredSteps { get; set; }
            [JsonProperty]
            public int Step { get; internal set; }
            [JsonProperty]
            public string Title { get; internal set; }
        }
    }
}

您也可以按照 Ordering Messages in Azure Service Bus 进行操作,它非常简要地解释了这个概念。但那里提供的样本与上面略有不同。

警告:使用消息会话也意味着对于会话(对于您的情况是用户 ID),该会话中的消息将始终由单个接收者接收和处理。因此,在设置 Session 和 SessionId 时要注意。如果您创建一个非常大的会话,这将迫使 Azure 服务总线将大部分消息发送给一个订阅者,从而减少 multi-threading 的好处。如果您将会话设置得过于细化,那么它就会失去预期的好处,您只会增加不必要的开销。