ActionBlock 似乎在我希望它完成之前完成

ActionBlock appears to complete before I want it to

我有一个 ActionBlock 作为我的一部分 class:

private readonly ActionBlock<QueueMessage> block;

在构造函数中,我这样初始化它:

block = new ActionBlock<QueueMessage>(async s => await Process(s),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });

我的Process方法实现为:

private async Task Process(QueueMessage message)
{
    using var tx = stateManager.CreateTransaction();

    var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
    if (!result.HasValue)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
        return;
    }

    try
    {
        await processorService.Process(message);
        logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
        await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
        await tx.CommitAsync();
    }
    catch (Exception e)
    {
        logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue.  Error: {e}");

        message.RetryCount++;
        await retryQueue.EnqueueAsync(tx, message);
    }
    finally
    {
        await tx.CommitAsync();
    }
}

当我收到要处理的消息时,我调用:

// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
    logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}

第一次成功,消息处理完美。我收到的第二条消息(可能几秒后,可能一分钟后,取决于系统的负载),然后 block.SendAsync 立即 returns false。 Process 中没有代码是 运行。当我在调试器下查看时,我注意到 block.IsCompleted 为真。但是,我从来没有打电话给 block.Complete(); 或任何东西。

为什么我的块已经完成,而我可能还有更多工作要做?谢谢!

更新:我可能有线索了。

我似乎遇到了这个断点(在 ActionBlock.cs 源代码中)

谜团解开了。我的操作抛出了一个未处理的异常,它显然将块标记为已完成并且不允许任何新消息。

我很想知道是否有任何方法可以控制此行为,或者是否建议始终在您的操作周围设置 try/catch 块。