在 Azure Function App 中限制 Azure 存储队列处理
Throttling Azure Storage Queue processing in Azure Function App
我创建了一个带有 Azure 存储队列触发器的 Azure Function 应用程序,该触发器处理一个队列,其中每个队列项都是 URL。该函数只是下载 URL 的内容。我有另一个函数可以加载和解析站点的 XML 站点地图并将所有页面 URL 添加到队列中。我遇到的问题是 Functions 应用程序运行得太快,它会破坏网站,因此它开始返回服务器错误。有没有办法 limit/throttle Functions 应用程序的运行速度?
当然,我可以编写一个简单的 Web 作业来串行处理它们(或者使用一些异步但限制并发请求的数量),但我真的很喜欢 Azure Functions 的简单性并想尝试一下 "serverless"计算。
您可以考虑几个选项。
首先,您可以在 host.json
中配置一些旋钮来控制队列处理(已记录 here)。 queues.batchSize
旋钮是一次获取多少队列消息。如果设置为 1,则运行时将一次获取 1 条消息,并且仅在对该消息的处理完成后才获取下一条消息。这可以在 单个实例 .
上为您提供某种程度的序列化
另一种选择可能是您可以为您排队的消息设置 NextVisibleTime,使它们间隔开 - 默认情况下,排队的消息变得可见并准备就绪立即处理。
最后一个选项可能是将包含站点所有 URL 集合的消息加入队列,而不是一次一个,这样在处理消息时,您可以在函数中连续处理这些 URL ,并以此方式限制并行度。
如果有多个并行函数添加到队列中,NextVisibleTime 可能会变得混乱。对于遇到此问题的任何人的另一个简单选择:
创建另一个队列 "throttled-items",并让您的原始函数跟随队列触发器。然后,添加一个简单的计时器函数,每分钟从原始队列中移动消息,相应地间隔 NextVisibleTime。
[FunctionName("ThrottleQueueItems")]
public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
{
var originalQueue = // get original queue here;
var throttledQueue = // get throttled queue here;
var itemsPerMinute = 60; // get from app settings
var individualDelay = 60.0 / itemsPerMinute;
var totalRetrieved = 0;
var maxItemsInBatch = 32; // change if you modify the default queue config
do
{
var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, itemsPerMinute - totalRetrieved))).ToArray();
if (!pending.Any())
break;
foreach (var message in pending)
{
await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
await originalQueue.DeleteMessageAsync(message);
}
} while (itemsPerMinute > totalRetrieved);
}
我在尝试解决类似问题时发现了这个 post。这可能对到达这里的任何人都有用。您现在可以使用 WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT 应用程序设置限制函数的并发实例数。将此设置为 1 并结合批处理限制 1 将允许您执行队列的串行处理。
WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT
函数应用可以扩展到的最大实例数。默认是没有限制。
我创建了一个带有 Azure 存储队列触发器的 Azure Function 应用程序,该触发器处理一个队列,其中每个队列项都是 URL。该函数只是下载 URL 的内容。我有另一个函数可以加载和解析站点的 XML 站点地图并将所有页面 URL 添加到队列中。我遇到的问题是 Functions 应用程序运行得太快,它会破坏网站,因此它开始返回服务器错误。有没有办法 limit/throttle Functions 应用程序的运行速度?
当然,我可以编写一个简单的 Web 作业来串行处理它们(或者使用一些异步但限制并发请求的数量),但我真的很喜欢 Azure Functions 的简单性并想尝试一下 "serverless"计算。
您可以考虑几个选项。
首先,您可以在 host.json
中配置一些旋钮来控制队列处理(已记录 here)。 queues.batchSize
旋钮是一次获取多少队列消息。如果设置为 1,则运行时将一次获取 1 条消息,并且仅在对该消息的处理完成后才获取下一条消息。这可以在 单个实例 .
另一种选择可能是您可以为您排队的消息设置 NextVisibleTime,使它们间隔开 - 默认情况下,排队的消息变得可见并准备就绪立即处理。
最后一个选项可能是将包含站点所有 URL 集合的消息加入队列,而不是一次一个,这样在处理消息时,您可以在函数中连续处理这些 URL ,并以此方式限制并行度。
NextVisibleTime 可能会变得混乱。对于遇到此问题的任何人的另一个简单选择: 创建另一个队列 "throttled-items",并让您的原始函数跟随队列触发器。然后,添加一个简单的计时器函数,每分钟从原始队列中移动消息,相应地间隔 NextVisibleTime。
[FunctionName("ThrottleQueueItems")]
public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
{
var originalQueue = // get original queue here;
var throttledQueue = // get throttled queue here;
var itemsPerMinute = 60; // get from app settings
var individualDelay = 60.0 / itemsPerMinute;
var totalRetrieved = 0;
var maxItemsInBatch = 32; // change if you modify the default queue config
do
{
var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, itemsPerMinute - totalRetrieved))).ToArray();
if (!pending.Any())
break;
foreach (var message in pending)
{
await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
await originalQueue.DeleteMessageAsync(message);
}
} while (itemsPerMinute > totalRetrieved);
}
我在尝试解决类似问题时发现了这个 post。这可能对到达这里的任何人都有用。您现在可以使用 WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT 应用程序设置限制函数的并发实例数。将此设置为 1 并结合批处理限制 1 将允许您执行队列的串行处理。
WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT
函数应用可以扩展到的最大实例数。默认是没有限制。