乐观的并发工作者实现有什么问题?
What is wrong with that optimistic concurrency worker implementation?
我已经尝试实现乐观并发'worker'。
目标是从同一个数据库table读取一批数据(单个table没有关系),多个并行'worker'。到目前为止,这似乎确实有效。我到处都遇到乐观并发异常,捕获它们并重试。
到目前为止一切顺利,获取数据的功能在我的本地设置中正常工作table。然而,当将应用程序移动到测试环境时,我得到一个奇怪的超时异常,即使被捕获,也会结束异步函数(中断 while 循环)。有人看到实施中的缺陷吗?什么可能导致超时?什么会导致异步函数结束?
public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
DataContext context = null;
try
{
context = GetNewContext(); // create a new dbContext
List<WorkItem> workItems;
bool loadSuccessInner;
while (true)
{
if (token.IsCancellationRequested) break;
loadSuccessInner = false;
context.Dispose();
context = GetNewContext(); // create a new dbContext
RunState currentRunState = context.Runs.Where(a => a.Id == runID).First().Status;
try
{
// Error happens on the following line: Microsoft.Data.SqlClient.SqlException: Timeout
workItems = context.WorkItems.Where(a => a.State == ProcessState.ToProcess).Take(batchSize).ToList();
loadSuccessInner = true;
}
catch (Exception ex)
{
workItems = new List<WorkItem>();
}
if (workItems.Count == 0 && loadSuccessInner)
{
break;
}
//... update to a different RunState
//... if set successful yield the result
//... else cleanup and retry
}
}
finally
{
if (context != null) context.Dispose();
}
}
我验证了 EntityFramework(此处使用 MS SQL 服务器适配器)执行完整的服务器端查询,这
转换为这样的简单查询:SELECT TOP 10 field_1, field_2 FROM WorkItems WHERE field_2 = 0
查询通常需要 <1ms,超时时间默认为
30s
我确认没有触发取消请求
当只有一个工作人员并且没有其他人正在访问数据库时,也会发生这种情况。我知道当资源繁忙或阻塞时可能会发生超时。但直到现在,我还没有看到任何其他查询超时。
(我会在提供更多信息时更新此答案。)
Does someone see a flaw in the implementation?
一般来说,您的代码看起来不错。
What could cause the end of the async function?
您显示的代码中没有任何内容通常应该是一个问题。首先在循环中放置另一个 try-catch 块,以确保不会在其他任何地方抛出其他异常(尤其是在后面未显示的代码中):
public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
DataContext context = null;
try
{
context = GetNewContext();
List<WorkItem> workItems;
bool loadSuccessInner;
while (true)
{
try
{
// ... (the inner loop code)
}
catch (Exception e)
{
// TODO: Log the exception here using your favorite method.
throw;
}
}
}
finally
{
if (context != null) context.Dispose();
}
}
查看您的日志并确保日志没有显示任何抛出的异常。然后另外记录循环中每个可能的退出条件(break
和 return
),以找出代码退出循环的方式和原因。
如果您的代码中没有其他 break
或 return
语句,那么代码退出循环的唯一方法是如果零 workItems
从数据库。
What could cause the timeout?
确保您调用的 any Task
返回/async
方法是使用 await
.
调用的
要追踪异常的实际来源,您应该部署带有 pdb
文件的 Debug
版本,以获得包含源代码行引用的完整堆栈跟踪。
您还可以实现 DbCommandInterceptor
并自行跟踪失败的命令:
public class TracingCommandInterceptor : DbCommandInterceptor
{
public override void CommandFailed(DbCommand command, CommandErrorEventData eventData)
{
LogException(eventData);
}
public override Task CommandFailedAsync(DbCommand command, CommandErrorEventData eventData, CancellationToken cancellationToken = new CancellationToken())
{
LogException(eventData);
return Task.CompletedTask;
}
private static void LogException(CommandErrorEventData eventData)
{
if (eventData.Exception is SqlException sqlException)
{
// -2 = Timeout error
// See https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/cc645611(v=sql.105)?redirectedfrom=MSDN
if (sqlException.Number == -2)
{
var stackTrace = new StackTrace();
var stackTraceText = stackTrace.ToString();
// TODO: Do some logging here and output the stackTraceText
// and other helpful information like the command text etc.
// -->
}
}
}
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseLoggerFactory(LoggingFactory);
optionsBuilder.UseSqlServer(connectionString);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
// Add the command interceptor.
optionsBuilder.AddInterceptors(new TracingCommandInterceptor());
base.OnConfiguring(optionsBuilder);
}
另外在拦截器中记录失败命令的命令文本也是一个好主意。
我已经尝试实现乐观并发'worker'。
目标是从同一个数据库table读取一批数据(单个table没有关系),多个并行'worker'。到目前为止,这似乎确实有效。我到处都遇到乐观并发异常,捕获它们并重试。
到目前为止一切顺利,获取数据的功能在我的本地设置中正常工作table。然而,当将应用程序移动到测试环境时,我得到一个奇怪的超时异常,即使被捕获,也会结束异步函数(中断 while 循环)。有人看到实施中的缺陷吗?什么可能导致超时?什么会导致异步函数结束?
public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
DataContext context = null;
try
{
context = GetNewContext(); // create a new dbContext
List<WorkItem> workItems;
bool loadSuccessInner;
while (true)
{
if (token.IsCancellationRequested) break;
loadSuccessInner = false;
context.Dispose();
context = GetNewContext(); // create a new dbContext
RunState currentRunState = context.Runs.Where(a => a.Id == runID).First().Status;
try
{
// Error happens on the following line: Microsoft.Data.SqlClient.SqlException: Timeout
workItems = context.WorkItems.Where(a => a.State == ProcessState.ToProcess).Take(batchSize).ToList();
loadSuccessInner = true;
}
catch (Exception ex)
{
workItems = new List<WorkItem>();
}
if (workItems.Count == 0 && loadSuccessInner)
{
break;
}
//... update to a different RunState
//... if set successful yield the result
//... else cleanup and retry
}
}
finally
{
if (context != null) context.Dispose();
}
}
我验证了 EntityFramework(此处使用 MS SQL 服务器适配器)执行完整的服务器端查询,这 转换为这样的简单查询:
SELECT TOP 10 field_1, field_2 FROM WorkItems WHERE field_2 = 0
查询通常需要 <1ms,超时时间默认为 30s
我确认没有触发取消请求
当只有一个工作人员并且没有其他人正在访问数据库时,也会发生这种情况。我知道当资源繁忙或阻塞时可能会发生超时。但直到现在,我还没有看到任何其他查询超时。
(我会在提供更多信息时更新此答案。)
Does someone see a flaw in the implementation?
一般来说,您的代码看起来不错。
What could cause the end of the async function?
您显示的代码中没有任何内容通常应该是一个问题。首先在循环中放置另一个 try-catch 块,以确保不会在其他任何地方抛出其他异常(尤其是在后面未显示的代码中):
public async IAsyncEnumerable<List<WorkItem>> LoadBatchedWorkload([EnumeratorCancellation] CancellationToken token, int batchSize, int runID)
{
DataContext context = null;
try
{
context = GetNewContext();
List<WorkItem> workItems;
bool loadSuccessInner;
while (true)
{
try
{
// ... (the inner loop code)
}
catch (Exception e)
{
// TODO: Log the exception here using your favorite method.
throw;
}
}
}
finally
{
if (context != null) context.Dispose();
}
}
查看您的日志并确保日志没有显示任何抛出的异常。然后另外记录循环中每个可能的退出条件(break
和 return
),以找出代码退出循环的方式和原因。
如果您的代码中没有其他 break
或 return
语句,那么代码退出循环的唯一方法是如果零 workItems
从数据库。
What could cause the timeout?
确保您调用的 any Task
返回/async
方法是使用 await
.
要追踪异常的实际来源,您应该部署带有 pdb
文件的 Debug
版本,以获得包含源代码行引用的完整堆栈跟踪。
您还可以实现 DbCommandInterceptor
并自行跟踪失败的命令:
public class TracingCommandInterceptor : DbCommandInterceptor
{
public override void CommandFailed(DbCommand command, CommandErrorEventData eventData)
{
LogException(eventData);
}
public override Task CommandFailedAsync(DbCommand command, CommandErrorEventData eventData, CancellationToken cancellationToken = new CancellationToken())
{
LogException(eventData);
return Task.CompletedTask;
}
private static void LogException(CommandErrorEventData eventData)
{
if (eventData.Exception is SqlException sqlException)
{
// -2 = Timeout error
// See https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/cc645611(v=sql.105)?redirectedfrom=MSDN
if (sqlException.Number == -2)
{
var stackTrace = new StackTrace();
var stackTraceText = stackTrace.ToString();
// TODO: Do some logging here and output the stackTraceText
// and other helpful information like the command text etc.
// -->
}
}
}
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseLoggerFactory(LoggingFactory);
optionsBuilder.UseSqlServer(connectionString);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
// Add the command interceptor.
optionsBuilder.AddInterceptors(new TracingCommandInterceptor());
base.OnConfiguring(optionsBuilder);
}
另外在拦截器中记录失败命令的命令文本也是一个好主意。