使用 Microsoft.ServiceBus.Messaging.MessageSession 条消息 Complete()、Abort() 等
using Microsoft.ServiceBus.Messaging.MessageSession Many messages Complete(), Abort(), Etc
当要接收的消息不止一条时,我在关闭 MessageSession 时遇到问题...我等待所有序列进入,然后我尝试 "Complete()" 所有会话中的消息。我尝试了很多不同的方法,遍历收到的消息并 "Complete()"-ing 它们,"Close()"-ing 会话,"Completing()"-ing 会话,我找不到获胜的公式我的生活。 Microsoft 的文档也不是很有帮助。它当然没有提供一个像样的例子。以下是我在最近一次迭代中使用的代码
public QueueMessageBody ReceiveResponse(字符串 sessionId)
{
try
{
MessagingFactory _messagingFactory;
_messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
var messageBodyStream = new MemoryStream();
while (true)
{
Console.WriteLine("Waiting for new message");
using (var scope = new TransactionScope())
{
var receiverSession = receiver.AcceptMessageSession(sessionId);
messageBodyStream = new MemoryStream();
var keepPolling = true;
var isFirstMessage = true;
var expectedNoMessages = 0;
BrokeredMessage[] messages = null;
var messagesReceived = 0;
while (keepPolling)
{
var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
if (message == null)
continue;
if (isFirstMessage)
{
Console.WriteLine("Receiving first message");
expectedNoMessages = (int)message.Properties["TotalMessages"];
messages = new BrokeredMessage[expectedNoMessages];
isFirstMessage = false;
}
var messageNo = (int)message.Properties["MessageNo"];
var messageIndex = messageNo - 1;
Console.WriteLine(string.Format("Receiving message {0}", messageNo));
messages[messageIndex] = message;
messagesReceived++;
if (messagesReceived == expectedNoMessages)
keepPolling = false;
}
//Rebuild Object
var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);
var completeTasks = new List<Task>();
if (messages.Count()< 3)
{
foreach (var message in messages)
completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
Task.WaitAll(completeTasks.ToArray());
}else
{
receiverSession.Close();
}
return o;
}
}
}
catch(Exception ex)
{
System.Diagnostics.Debug.Print(ex.Message);
return null;
}
}
public QueueMessageBody ReceiveResponse(string sessionId)
{
try
{
MessagingFactory _messagingFactory;
_messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
var messageBodyStream = new MemoryStream();
while (true)
{
Console.WriteLine("Waiting for new message");
using (var scope = new TransactionScope())
{
var receiverSession = receiver.AcceptMessageSession(sessionId);
messageBodyStream = new MemoryStream();
var keepPolling = true;
var isFirstMessage = true;
var expectedNoMessages = 0;
BrokeredMessage[] messages = null;
var messagesReceived = 0;
while (keepPolling)
{
var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
if (message == null)
continue;
if (isFirstMessage)
{
Console.WriteLine("Receiving first message");
expectedNoMessages = (int)message.Properties["TotalMessages"];
messages = new BrokeredMessage[expectedNoMessages];
isFirstMessage = false;
}
var messageNo = (int)message.Properties["MessageNo"];
var messageIndex = messageNo - 1;
Console.WriteLine(string.Format("Receiving message {0}", messageNo));
messages[messageIndex] = message;
messagesReceived++;
if (messagesReceived == expectedNoMessages)
keepPolling = false;
}
//Rebuild Object
var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);
var completeTasks = new List<Task>();
if (messages.Count()< 3)
{
foreach (var message in messages)
completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
Task.WaitAll(completeTasks.ToArray());
}else
{
receiverSession.Close();
}
return o;
}
}
}
catch(Exception ex)
{
System.Diagnostics.Debug.Print(ex.Message);
return null;
}
}
并且队列配置如下。
我能够接收消息并完成它们的工作流程,问题是它们只是留在队列中。我无法在
完成它们
在这种情况下,问题可能在于交易从未完成。删除 TransactionScope 解决了这个问题。
当要接收的消息不止一条时,我在关闭 MessageSession 时遇到问题...我等待所有序列进入,然后我尝试 "Complete()" 所有会话中的消息。我尝试了很多不同的方法,遍历收到的消息并 "Complete()"-ing 它们,"Close()"-ing 会话,"Completing()"-ing 会话,我找不到获胜的公式我的生活。 Microsoft 的文档也不是很有帮助。它当然没有提供一个像样的例子。以下是我在最近一次迭代中使用的代码
public QueueMessageBody ReceiveResponse(字符串 sessionId) {
try
{
MessagingFactory _messagingFactory;
_messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
var messageBodyStream = new MemoryStream();
while (true)
{
Console.WriteLine("Waiting for new message");
using (var scope = new TransactionScope())
{
var receiverSession = receiver.AcceptMessageSession(sessionId);
messageBodyStream = new MemoryStream();
var keepPolling = true;
var isFirstMessage = true;
var expectedNoMessages = 0;
BrokeredMessage[] messages = null;
var messagesReceived = 0;
while (keepPolling)
{
var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
if (message == null)
continue;
if (isFirstMessage)
{
Console.WriteLine("Receiving first message");
expectedNoMessages = (int)message.Properties["TotalMessages"];
messages = new BrokeredMessage[expectedNoMessages];
isFirstMessage = false;
}
var messageNo = (int)message.Properties["MessageNo"];
var messageIndex = messageNo - 1;
Console.WriteLine(string.Format("Receiving message {0}", messageNo));
messages[messageIndex] = message;
messagesReceived++;
if (messagesReceived == expectedNoMessages)
keepPolling = false;
}
//Rebuild Object
var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);
var completeTasks = new List<Task>();
if (messages.Count()< 3)
{
foreach (var message in messages)
completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
Task.WaitAll(completeTasks.ToArray());
}else
{
receiverSession.Close();
}
return o;
}
}
}
catch(Exception ex)
{
System.Diagnostics.Debug.Print(ex.Message);
return null;
}
}
public QueueMessageBody ReceiveResponse(string sessionId)
{
try
{
MessagingFactory _messagingFactory;
_messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var receiver = _messagingFactory.CreateQueueClient(ResponseQueue, ReceiveMode.PeekLock);
var messageBodyStream = new MemoryStream();
while (true)
{
Console.WriteLine("Waiting for new message");
using (var scope = new TransactionScope())
{
var receiverSession = receiver.AcceptMessageSession(sessionId);
messageBodyStream = new MemoryStream();
var keepPolling = true;
var isFirstMessage = true;
var expectedNoMessages = 0;
BrokeredMessage[] messages = null;
var messagesReceived = 0;
while (keepPolling)
{
var message = receiverSession.Receive(TimeSpan.FromSeconds(10));
if (message == null)
continue;
if (isFirstMessage)
{
Console.WriteLine("Receiving first message");
expectedNoMessages = (int)message.Properties["TotalMessages"];
messages = new BrokeredMessage[expectedNoMessages];
isFirstMessage = false;
}
var messageNo = (int)message.Properties["MessageNo"];
var messageIndex = messageNo - 1;
Console.WriteLine(string.Format("Receiving message {0}", messageNo));
messages[messageIndex] = message;
messagesReceived++;
if (messagesReceived == expectedNoMessages)
keepPolling = false;
}
//Rebuild Object
var messageBodyBuilder = new ChunkedMessageBuilder().ReconstructMessageBody(messages);
var o = SerializationHelper.Deserialize<QueueMessageBody>(messageBodyBuilder);
var completeTasks = new List<Task>();
if (messages.Count()< 3)
{
foreach (var message in messages)
completeTasks.Add(receiverSession.CompleteAsync(message.LockToken));
Task.WaitAll(completeTasks.ToArray());
}else
{
receiverSession.Close();
}
return o;
}
}
}
catch(Exception ex)
{
System.Diagnostics.Debug.Print(ex.Message);
return null;
}
}
并且队列配置如下。
我能够接收消息并完成它们的工作流程,问题是它们只是留在队列中。我无法在
完成它们在这种情况下,问题可能在于交易从未完成。删除 TransactionScope 解决了这个问题。