使用 Microsoft.ServiceBus.Messaging.MessageSession 条消息 Complete()、Abort() 等

using Microsoft.ServiceBus.Messaging.MessageSession Many messages Complete(), Abort(), Etc

当要接收的消息不止一条时,我在关闭 MessageSession 时遇到问题...我等待所有序列进入,然后我尝试 "Complete()" 所有会话中的消息。我尝试了很多不同的方法,遍历收到的消息并 "Complete()"-ing 它们,"Close()"-ing 会话,"Completing()"-ing 会话,我找不到获胜的公式我的生活。 Microsoft 的文档也不是很有帮助。它当然没有提供一个像样的例子。以下是我在最近一次迭代中使用的代码

public QueueMessageBody ReceiveResponse(字符串 sessionId) {

        try
        {
            MessagingFactory _messagingFactory;
            _messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);

            var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
            var messageBodyStream = new MemoryStream();

            while (true)
            {
                Console.WriteLine("Waiting for new message");
                using (var scope = new TransactionScope())
                {
                    var receiverSession = receiver.AcceptMessageSession(sessionId);
                    messageBodyStream = new MemoryStream();

                    var keepPolling = true;
                    var isFirstMessage = true;
                    var expectedNoMessages = 0;
                    BrokeredMessage[] messages = null;
                    var messagesReceived = 0;
                    while (keepPolling)
                    {
                        var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
                        if (message == null)
                            continue;

                        if (isFirstMessage)
                        {
                            Console.WriteLine("Receiving first message");
                            expectedNoMessages = (int)message.Properties["TotalMessages"];
                            messages = new BrokeredMessage[expectedNoMessages];
                            isFirstMessage = false;
                        }

                        var messageNo = (int)message.Properties["MessageNo"];
                        var messageIndex = messageNo - 1;
                        Console.WriteLine(string.Format("Receiving message {0}", messageNo));
                        messages[messageIndex] = message;
                        messagesReceived++;

                        if (messagesReceived == expectedNoMessages)
                            keepPolling = false;
                    }





                    //Rebuild Object
                    var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
                    var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);

                    var completeTasks = new List<Task>();
                    if (messages.Count()< 3)
                    {
                        foreach (var message in messages)
                            completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
                        Task.WaitAll(completeTasks.ToArray());
                    }else
                    {
                        receiverSession.Close();
                    }


                    return o;


                }
            }



        }
        catch(Exception ex)
        {
           System.Diagnostics.Debug.Print(ex.Message);
            return null;
        }

    }


  public   QueueMessageBody ReceiveResponse(string sessionId)
    {

        try
        {
            MessagingFactory _messagingFactory;
            _messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);

            var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
            var messageBodyStream = new MemoryStream();

            while (true)
            {
                Console.WriteLine("Waiting for new message");
                using (var scope = new TransactionScope())
                {
                    var receiverSession = receiver.AcceptMessageSession(sessionId);
                    messageBodyStream = new MemoryStream();

                    var keepPolling = true;
                    var isFirstMessage = true;
                    var expectedNoMessages = 0;
                    BrokeredMessage[] messages = null;
                    var messagesReceived = 0;
                    while (keepPolling)
                    {
                        var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
                        if (message == null)
                            continue;

                        if (isFirstMessage)
                        {
                            Console.WriteLine("Receiving first message");
                            expectedNoMessages = (int)message.Properties["TotalMessages"];
                            messages = new BrokeredMessage[expectedNoMessages];
                            isFirstMessage = false;
                        }

                        var messageNo = (int)message.Properties["MessageNo"];
                        var messageIndex = messageNo - 1;
                        Console.WriteLine(string.Format("Receiving message {0}", messageNo));
                        messages[messageIndex] = message;
                        messagesReceived++;

                        if (messagesReceived == expectedNoMessages)
                            keepPolling = false;
                    }





                    //Rebuild Object
                    var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
                    var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);

                    var completeTasks = new List<Task>();
                    if (messages.Count()< 3)
                    {
                        foreach (var message in messages)
                            completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
                        Task.WaitAll(completeTasks.ToArray());
                    }else
                    {
                        receiverSession.Close();
                    }


                    return o;


                }
            }



        }
        catch(Exception ex)
        {
           System.Diagnostics.Debug.Print(ex.Message);
            return null;
        }

    }

并且队列配置如下。

我能够接收消息并完成它们的工作流程,问题是它们只是留在队列中。我无法在

完成它们

在这种情况下,问题可能在于交易从未完成。删除 TransactionScope 解决了这个问题。