C# Azure 服务总线队列 OnMessage 回调
C# Azure Service Bus Queue OnMessage Callback
我正在编写一些响应 Azure 服务总线队列的功能。这当前在指定队列上进行轮询,并且 OnMessage 触发对调用它的原始 class 中的方法的回调:
partial class Class1
{
private void BeginProcessing()
{
serviceBusHelper.Listen(QueueType.Inbound, HandleTransaction);
}
private bool HandleTransaction(BrokeredMessage message)
{
...
}
}
然后是服务总线助手class:
public class ServiceBusHelper : IServiceBusHelper
{
ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessage(message =>
{
try
{
callback(message);
}
catch (Exception ex)
{
...
}
CompletedResetEvent.WaitOne();
});
break;
...
}
}
它正在正确连接到 Azure 服务总线队列并检索消息,但回调似乎从未真正触发。我想要实现的是一个服务,它将持续响应 OnMessage 事件,触发一个新的工作人员(从 class1 内),尽管 OnMessage 实际上是从 ServiceBusHelper class.[= 触发的12=]
the callback never seems to actually trigger.
为了重现该问题,我在控制台应用程序中使用以下代码进行了测试,效果很好。
public static void processmessage()
{
string connectionString = "{connectionstring here}";
var queueName = "{queue name}";
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
var options = new OnMessageOptions();
options.AutoComplete = false;
try
{
client.OnMessage(message =>
{
HandleTransaction(message);
}, options);
}
catch (Exception)
{
}
}
private static void HandleTransaction(BrokeredMessage message)
{
Console.WriteLine(String.Format("Message body: {0}", message.GetBody<String>()));
}
如果可以的话,您可以尝试将private bool HandleTransaction(BrokeredMessage message)
修改为make it return voidprivate void HandleTransaction(BrokeredMessage message)
,看看是否可以正常使用。
因此,我设法通过修改为异步 OnMessage 对应项来实现此功能。我认为根本原因与未将 OnMessageOptions 指定给 OnMessage 或 CompletedResetEvent 没有按照我的预期进行交互有关。
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
OnMessageOptions options = new OnMessageOptions
{
MaxConcurrentCalls = maxConcurrent,
AutoComplete = false
};
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessageAsync(async message =>
{
bool shouldAbandon = false;
try
{
callback(message);
// complete if successful processing
await message.CompleteAsync();
}
catch (Exception ex)
{
shouldAbandon = true;
Debug.WriteLine(ex);
}
if (shouldAbandon)
{
await m.AbandonAsync();
}
}, options);
break;
...
}
}
我正在编写一些响应 Azure 服务总线队列的功能。这当前在指定队列上进行轮询,并且 OnMessage 触发对调用它的原始 class 中的方法的回调:
partial class Class1
{
private void BeginProcessing()
{
serviceBusHelper.Listen(QueueType.Inbound, HandleTransaction);
}
private bool HandleTransaction(BrokeredMessage message)
{
...
}
}
然后是服务总线助手class:
public class ServiceBusHelper : IServiceBusHelper
{
ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessage(message =>
{
try
{
callback(message);
}
catch (Exception ex)
{
...
}
CompletedResetEvent.WaitOne();
});
break;
...
}
}
它正在正确连接到 Azure 服务总线队列并检索消息,但回调似乎从未真正触发。我想要实现的是一个服务,它将持续响应 OnMessage 事件,触发一个新的工作人员(从 class1 内),尽管 OnMessage 实际上是从 ServiceBusHelper class.[= 触发的12=]
the callback never seems to actually trigger.
为了重现该问题,我在控制台应用程序中使用以下代码进行了测试,效果很好。
public static void processmessage()
{
string connectionString = "{connectionstring here}";
var queueName = "{queue name}";
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
var options = new OnMessageOptions();
options.AutoComplete = false;
try
{
client.OnMessage(message =>
{
HandleTransaction(message);
}, options);
}
catch (Exception)
{
}
}
private static void HandleTransaction(BrokeredMessage message)
{
Console.WriteLine(String.Format("Message body: {0}", message.GetBody<String>()));
}
如果可以的话,您可以尝试将private bool HandleTransaction(BrokeredMessage message)
修改为make it return voidprivate void HandleTransaction(BrokeredMessage message)
,看看是否可以正常使用。
因此,我设法通过修改为异步 OnMessage 对应项来实现此功能。我认为根本原因与未将 OnMessageOptions 指定给 OnMessage 或 CompletedResetEvent 没有按照我的预期进行交互有关。
public void Listen(QueueType queue, Action<BrokeredMessage> callback)
{
OnMessageOptions options = new OnMessageOptions
{
MaxConcurrentCalls = maxConcurrent,
AutoComplete = false
};
switch (queue)
{
case QueueType.Inbound:
inboundClient.OnMessageAsync(async message =>
{
bool shouldAbandon = false;
try
{
callback(message);
// complete if successful processing
await message.CompleteAsync();
}
catch (Exception ex)
{
shouldAbandon = true;
Debug.WriteLine(ex);
}
if (shouldAbandon)
{
await m.AbandonAsync();
}
}, options);
break;
...
}
}