ActionBlock 似乎在我希望它完成之前完成
ActionBlock appears to complete before I want it to
我有一个 ActionBlock
作为我的一部分 class:
private readonly ActionBlock<QueueMessage> block;
在构造函数中,我这样初始化它:
block = new ActionBlock<QueueMessage>(async s => await Process(s),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });
我的Process
方法实现为:
private async Task Process(QueueMessage message)
{
using var tx = stateManager.CreateTransaction();
var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
if (!result.HasValue)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
return;
}
try
{
await processorService.Process(message);
logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
await tx.CommitAsync();
}
catch (Exception e)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue. Error: {e}");
message.RetryCount++;
await retryQueue.EnqueueAsync(tx, message);
}
finally
{
await tx.CommitAsync();
}
}
当我收到要处理的消息时,我调用:
// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}
第一次成功,消息处理完美。我收到的第二条消息(可能几秒后,可能一分钟后,取决于系统的负载),然后 block.SendAsync
立即 returns false。 Process
中没有代码是 运行。当我在调试器下查看时,我注意到 block.IsCompleted
为真。但是,我从来没有打电话给 block.Complete();
或任何东西。
为什么我的块已经完成,而我可能还有更多工作要做?谢谢!
更新:我可能有线索了。
我似乎遇到了这个断点(在 ActionBlock.cs 源代码中)
谜团解开了。我的操作抛出了一个未处理的异常,它显然将块标记为已完成并且不允许任何新消息。
我很想知道是否有任何方法可以控制此行为,或者是否建议始终在您的操作周围设置 try/catch
块。
我有一个 ActionBlock
作为我的一部分 class:
private readonly ActionBlock<QueueMessage> block;
在构造函数中,我这样初始化它:
block = new ActionBlock<QueueMessage>(async s => await Process(s),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelism });
我的Process
方法实现为:
private async Task Process(QueueMessage message)
{
using var tx = stateManager.CreateTransaction();
var result = await processQueue.TryGetValueAsync(tx, message.Data.Checksum);
if (!result.HasValue)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} is being processed, but doesn't exist on the queue.");
return;
}
try
{
await processorService.Process(message);
logger.Log(LogLevel.Debug, $"Message {message.Data.Checksum} processed, removing from queue.");
await processQueue.TryRemoveAsync(tx, message.Data.Checksum);
await tx.CommitAsync();
}
catch (Exception e)
{
logger.Log(LogLevel.Error, $"Message {message.Data.Checksum} could not be processed, adding to retry queue. Error: {e}");
message.RetryCount++;
await retryQueue.EnqueueAsync(tx, message);
}
finally
{
await tx.CommitAsync();
}
}
当我收到要处理的消息时,我调用:
// Process the message
if (!await block.SendAsync(message, cancellationToken))
{
logger.Log(LogLevel.Error, $"Failed to enqueue data with checksum {message.Data.Checksum}");
}
第一次成功,消息处理完美。我收到的第二条消息(可能几秒后,可能一分钟后,取决于系统的负载),然后 block.SendAsync
立即 returns false。 Process
中没有代码是 运行。当我在调试器下查看时,我注意到 block.IsCompleted
为真。但是,我从来没有打电话给 block.Complete();
或任何东西。
为什么我的块已经完成,而我可能还有更多工作要做?谢谢!
更新:我可能有线索了。
我似乎遇到了这个断点(在 ActionBlock.cs 源代码中)
谜团解开了。我的操作抛出了一个未处理的异常,它显然将块标记为已完成并且不允许任何新消息。
我很想知道是否有任何方法可以控制此行为,或者是否建议始终在您的操作周围设置 try/catch
块。