Azure Function:如何更好地实现重试队列消息的延迟
Azure Function : how to implement delay of retrying queue message better
我的 Azure 函数应该监听队列中的消息,然后获取消息,尝试调用外部服务并在消息中调用值,如果外部服务 returns "OK" 那么我们必须写入消息到另一个队列(对于下一个 Azure 函数),如果 returns "Fail" 我们必须 return 到我们当前的队列并在 5 分钟后再次通过我们的 Azure 函数重试。如何实施?我用 Timer 做到了,但解决方案不喜欢我:
[FunctionName("FunctionOffice365VerificateDomain_and_AddService_and_GexMxRecord")]
public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]CloudQueue outputQueue,
ILogger log)
{
while (true)
{
// do "invisible" message for next 30 sec
var message = await listenQueue.GetMessageAsync();
if (message != null)
{
DomainForRegistration domainForRegistration = JsonConvert.DeserializeObject<DomainForRegistration>(message.AsString);
try
{
await _office365DomainService.VerifyDomainAsync(domainForRegistration.DomainName);
// remove message
await listenQueue.DeleteMessageAsync(message);
await _office365DomainService.UpdateIndicateSupportedServicesDomainAsync(domainForRegistration.DomainName);
var mxRecord = await _office365DomainService.GetMxRecordForDomainAsync(domainForRegistration.DomainName);
}
catch (DomainVerificationRecordNotFoundException)
{
// thrown when VerifyDomainAsync failed
}
}
else
break;
}
}
如果没有这些while(true)
,但在验证失败后超时,如何更仔细地做?
同意@DavidG,尝试使用队列触发器来实现你的目标。
W可以依赖Queue的host setting。
visibilityTimeout is The time interval between retries when processing of a message fails
maxDequeueCount is The number of times to try processing a message before moving it to the poison queue.
{
"version": "2.0",
"extensions": {
"queues": {
"visibilityTimeout" : "00:05:00",
"maxDequeueCount": 2,
}
}
}
这样,函数应该是这样的
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue
)
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
如果不想抛异常给宿主机,可以转向initialVisibilityDelayCloudQueue方法
specifying the interval of time from now during which the message will be invisible
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue
)
{
try
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
catch(DomainVerificationRecordNotFoundException)
{
// add the message in current queue and can only be visible after 5 minutes
await listenQueue.AddMessageAsync(new CloudQueueMessage(myQueueItem), null, TimeSpan.FromMinutes(5), null, null);
}
}
我的 Azure 函数应该监听队列中的消息,然后获取消息,尝试调用外部服务并在消息中调用值,如果外部服务 returns "OK" 那么我们必须写入消息到另一个队列(对于下一个 Azure 函数),如果 returns "Fail" 我们必须 return 到我们当前的队列并在 5 分钟后再次通过我们的 Azure 函数重试。如何实施?我用 Timer 做到了,但解决方案不喜欢我:
[FunctionName("FunctionOffice365VerificateDomain_and_AddService_and_GexMxRecord")]
public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]CloudQueue outputQueue,
ILogger log)
{
while (true)
{
// do "invisible" message for next 30 sec
var message = await listenQueue.GetMessageAsync();
if (message != null)
{
DomainForRegistration domainForRegistration = JsonConvert.DeserializeObject<DomainForRegistration>(message.AsString);
try
{
await _office365DomainService.VerifyDomainAsync(domainForRegistration.DomainName);
// remove message
await listenQueue.DeleteMessageAsync(message);
await _office365DomainService.UpdateIndicateSupportedServicesDomainAsync(domainForRegistration.DomainName);
var mxRecord = await _office365DomainService.GetMxRecordForDomainAsync(domainForRegistration.DomainName);
}
catch (DomainVerificationRecordNotFoundException)
{
// thrown when VerifyDomainAsync failed
}
}
else
break;
}
}
如果没有这些while(true)
,但在验证失败后超时,如何更仔细地做?
同意@DavidG,尝试使用队列触发器来实现你的目标。 W可以依赖Queue的host setting。
visibilityTimeout is The time interval between retries when processing of a message fails maxDequeueCount is The number of times to try processing a message before moving it to the poison queue.
{
"version": "2.0",
"extensions": {
"queues": {
"visibilityTimeout" : "00:05:00",
"maxDequeueCount": 2,
}
}
}
这样,函数应该是这样的
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue
)
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
如果不想抛异常给宿主机,可以转向initialVisibilityDelayCloudQueue方法
specifying the interval of time from now during which the message will be invisible
public static async Task Run(
[QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
[Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue,
[Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue
)
{
try
{
// do stuff then output message
await outputQueue.AddAsync(myQueueItem);
}
catch(DomainVerificationRecordNotFoundException)
{
// add the message in current queue and can only be visible after 5 minutes
await listenQueue.AddMessageAsync(new CloudQueueMessage(myQueueItem), null, TimeSpan.FromMinutes(5), null, null);
}
}