Azure函数中的ConcurrentQueue出队问题
ConcurrentQueue dequeue problem in Azure function
我已声明 ConcurrentQueue
并添加了 GUID 列表。添加到队列中很好,但是当我从 TimerTrigger
函数内部访问队列时,它似乎是空的(updateQueue.count
为 0)。这种行为发生在云中,但是当我在本地执行相同的操作时它工作正常。
public static class Function1
{
private static readonly ConcurrentQueue<IEnumerable<Guid>> updateQueue = new ConcurrentQueue<IEnumerable<Guid>>();
public static async Task<IActionResult> UpdateRun(
[HttpTrigger(AuthorizationLevel.Admin, "post", Route = null)] HttpRequest req, ExecutionContext execContext)
{
logger.LogInformation($"FunctionUpdate Start");
using var reader = new StreamReader(req.Body);
var request = JsonConvert.DeserializeObject<IEnumerable<Request>>(reader.ReadToEnd());
var correlationIds = request?.Select(s => s.CorrelationId);
updateQueue.Enqueue(correlationIds);
return new OkObjectResult(new Response { HttpStatusCode = HttpStatusCode.Accepted });
}
[FunctionName("FunctionHandleQueue"), Timeout("00:05:00")]
public static async Task HandleQueue([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, ExecutionContext execContext) // once every 1 minutes
{
logger.LogInformation($"before updateQueue condition : {updateQueue.Count}");
if (updateQueue.Count > 0)
{
logger.LogInformation($"after updateQueue condition {updateQueue.Count}");
var guids = new List<Guid>();
var count = 0;
while (count <= 1000 && updateQueue.Count > 0)
{
updateQueue.TryDequeue(out var updateRequest);
var enumerable = updateRequest.ToList();
count += enumerable.Count;
guids.AddRange(enumerable);
}
await new ProcessUpdateSales(CreateMapper(), execContext)
.Orchestrate(guids)
}
}
}
当 TimerTrigger
每 1 分钟执行一次时创建日志:
before updateQueue condition : 0
为什么 updateQueue.Count
总是 0?我做错了什么?
虽然 Azure Functions 通常可能共享一个背板,但不能保证。资源可以随时关闭或启动,功能的新副本可能无法访问原始状态。因此,如果您使用静态字段在函数执行之间共享数据,它应该能够从外部源重新加载数据。
也就是说,由于 Azure Functions 的设计使用方式,这也不一定是可取的。 Azure Functions 通过动态可伸缩性实现高吞吐量。随着需要更多资源来处理当前工作负载,可以自动配置它们以保持高吞吐量。
因此,在单个函数执行中做太多工作实际上会影响整个系统的吞吐量,因为函数背板无法提供额外的工作线程来处理负载。
如果需要保存状态,请使用永久存储形式。这可以采用 Azure Durable Function、Azure 存储队列、Azure 服务总线队列甚至数据库的形式。此外,为了最好地利用函数的可扩展性,请尝试将工作负载减少到允许大量并行处理的可管理批处理。虽然您可能需要在单个操作中预先加载您的工作,但您希望后续处理尽可能更精细。
我已声明 ConcurrentQueue
并添加了 GUID 列表。添加到队列中很好,但是当我从 TimerTrigger
函数内部访问队列时,它似乎是空的(updateQueue.count
为 0)。这种行为发生在云中,但是当我在本地执行相同的操作时它工作正常。
public static class Function1
{
private static readonly ConcurrentQueue<IEnumerable<Guid>> updateQueue = new ConcurrentQueue<IEnumerable<Guid>>();
public static async Task<IActionResult> UpdateRun(
[HttpTrigger(AuthorizationLevel.Admin, "post", Route = null)] HttpRequest req, ExecutionContext execContext)
{
logger.LogInformation($"FunctionUpdate Start");
using var reader = new StreamReader(req.Body);
var request = JsonConvert.DeserializeObject<IEnumerable<Request>>(reader.ReadToEnd());
var correlationIds = request?.Select(s => s.CorrelationId);
updateQueue.Enqueue(correlationIds);
return new OkObjectResult(new Response { HttpStatusCode = HttpStatusCode.Accepted });
}
[FunctionName("FunctionHandleQueue"), Timeout("00:05:00")]
public static async Task HandleQueue([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, ExecutionContext execContext) // once every 1 minutes
{
logger.LogInformation($"before updateQueue condition : {updateQueue.Count}");
if (updateQueue.Count > 0)
{
logger.LogInformation($"after updateQueue condition {updateQueue.Count}");
var guids = new List<Guid>();
var count = 0;
while (count <= 1000 && updateQueue.Count > 0)
{
updateQueue.TryDequeue(out var updateRequest);
var enumerable = updateRequest.ToList();
count += enumerable.Count;
guids.AddRange(enumerable);
}
await new ProcessUpdateSales(CreateMapper(), execContext)
.Orchestrate(guids)
}
}
}
当 TimerTrigger
每 1 分钟执行一次时创建日志:
before updateQueue condition : 0
为什么 updateQueue.Count
总是 0?我做错了什么?
虽然 Azure Functions 通常可能共享一个背板,但不能保证。资源可以随时关闭或启动,功能的新副本可能无法访问原始状态。因此,如果您使用静态字段在函数执行之间共享数据,它应该能够从外部源重新加载数据。
也就是说,由于 Azure Functions 的设计使用方式,这也不一定是可取的。 Azure Functions 通过动态可伸缩性实现高吞吐量。随着需要更多资源来处理当前工作负载,可以自动配置它们以保持高吞吐量。
因此,在单个函数执行中做太多工作实际上会影响整个系统的吞吐量,因为函数背板无法提供额外的工作线程来处理负载。
如果需要保存状态,请使用永久存储形式。这可以采用 Azure Durable Function、Azure 存储队列、Azure 服务总线队列甚至数据库的形式。此外,为了最好地利用函数的可扩展性,请尝试将工作负载减少到允许大量并行处理的可管理批处理。虽然您可能需要在单个操作中预先加载您的工作,但您希望后续处理尽可能更精细。