乐观的并发工作者实现有什么问题?

What is wrong with that optimistic concurrency worker implementation?

我已经尝试实现乐观并发'worker'。

目标是从同一个数据库table读取一批数据(单个table没有关系),多个并行'worker'。到目前为止,这似乎确实有效。我到处都遇到乐观并发异常,捕获它们并重试。

到目前为止一切顺利,获取数据的功能在我的本地设置中正常工作table。然而,当将应用程序移动到测试环境时,我得到一个奇怪的超时异常,即使被捕获,也会结束异步函数(中断 while 循环)。有人看到实施中的缺陷吗?什么可能导致超时?什么会导致异步函数结束?

public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
    DataContext context = null;
    try
    {
        context = GetNewContext(); // create a new dbContext
        List<WorkItem> workItems;
        bool loadSuccessInner;
        while (true)
        {
            if (token.IsCancellationRequested) break;

            loadSuccessInner = false;

            context.Dispose();
            context = GetNewContext(); // create a new dbContext

            RunState currentRunState = context.Runs.Where(a => a.Id == runID).First().Status;

            try
            {
                // Error happens on the following line: Microsoft.Data.SqlClient.SqlException: Timeout
                workItems = context.WorkItems.Where(a => a.State == ProcessState.ToProcess).Take(batchSize).ToList();
                loadSuccessInner = true;
            }
            catch (Exception ex)
            {
                workItems = new List<WorkItem>();
            }

            if (workItems.Count == 0 && loadSuccessInner)
            {
                break;
            }

            //... update to a different RunState
            //... if set successful yield the result
            //... else cleanup and retry
        }
    }
    finally
    {
        if (context != null) context.Dispose();
    }
}

(我会在提供更多信息时更新此答案。)

Does someone see a flaw in the implementation?

一般来说,您的代码看起来不错。

What could cause the end of the async function?

您显示的代码中没有任何内容通常应该是一个问题。首先在循环中放置另一个 try-catch 块,以确保不会在其他任何地方抛出其他异常(尤其是在后面未显示的代码中):

public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
    DataContext context = null;
    try
    {
        context = GetNewContext();
        List<WorkItem> workItems;
        bool loadSuccessInner;
        while (true)
        {
            try
            {
                // ... (the inner loop code)
            }
            catch (Exception e)
            {
                // TODO: Log the exception here using your favorite method.
                throw;
            }
        }
    }
    finally
    {
        if (context != null) context.Dispose();
    }
}

查看您的日志并确保日志没有显示任何抛出的异常。然后另外记录循环中每个可能的退出条件(breakreturn),以找出代码退出循环的方式和原因。

如果您的代码中没有其他 breakreturn 语句,那么代码退出循环的唯一方法是如果零 workItems 从数据库。

What could cause the timeout?

确保您调用的 any Task 返回/async 方法是使用 await.

调用的

要追踪异常的实际来源,您应该部署带有 pdb 文件的 Debug 版本,以获得包含源代码行引用的完整堆栈跟踪。

您还可以实现 DbCommandInterceptor 并自行跟踪失败的命令:

public class TracingCommandInterceptor : DbCommandInterceptor
{
    public override void CommandFailed(DbCommand command, CommandErrorEventData eventData)
    {
        LogException(eventData);
    }

    public override Task CommandFailedAsync(DbCommand command, CommandErrorEventData eventData, CancellationToken cancellationToken = new CancellationToken())
    {
        LogException(eventData);
        return Task.CompletedTask;
    }

    private static void LogException(CommandErrorEventData eventData)
    {
        if (eventData.Exception is SqlException sqlException)
        {
            // -2 = Timeout error
            // See https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/cc645611(v=sql.105)?redirectedfrom=MSDN
            if (sqlException.Number == -2)
            {
                var stackTrace = new StackTrace();
                var stackTraceText = stackTrace.ToString();

                // TODO: Do some logging here and output the stackTraceText
                //       and other helpful information like the command text etc.
                // -->
            }
        }
    }
}

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
    optionsBuilder.UseLoggerFactory(LoggingFactory);
    optionsBuilder.UseSqlServer(connectionString);
    optionsBuilder.EnableSensitiveDataLogging();
    optionsBuilder.EnableDetailedErrors();

    // Add the command interceptor.
    optionsBuilder.AddInterceptors(new TracingCommandInterceptor());

    base.OnConfiguring(optionsBuilder);
}

另外在拦截器中记录失败命令的命令文本也是一个好主意。