SemaphoreSlim class 中的 WaitHandle.WaitOne() 方法无法正常工作
WaitHandle.WaitOne() method in SemaphoreSlim class does not work properly
我的情况很复杂,但我会尽量简短,只告知重要细节。我正在尝试实施基于任务的作业处理。这是 class 的:
internal class TaskBasedJob : IJob
{
public WaitHandle WaitHandle { get; }
public JobStatus Status { get; private set; }
public TaskBasedJob(Func<Task<JobStatus>> action, TimeSpan interval, TimeSpan delay)
{
Status = JobStatus.NotExecuted;
var semaphore = new SemaphoreSlim(0, 1);
WaitHandle = semaphore.AvailableWaitHandle;
_timer = new Timer(async x =>
{
// return to prevent duplicate executions
// Semaphore starts locked so WaitHandle works properly
if (semaphore.CurrentCount == 0 && Status != JobStatus.NotExecuted)
{
return;
Status = JobStatus.Failure;
}
if(Status != JobStatus.NotExecuted)
await semaphore.WaitAsync();
try
{
await action();
}
finally
{
semaphore.Release();
}
}, null, delay, interval);
}
}
下面是调度器 class :
internal class Scheduler : IScheduler
{
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, IJob> _timers = new ConcurrentDictionary<string, IJob>();
public Scheduler(ILogger logger)
{
_logger = logger;
}
public IJob ScheduleAsync(string jobName, Func<Task<JobStatus>> action, TimeSpan interval, TimeSpan delay = default(TimeSpan))
{
if (!_timers.ContainsKey(jobName))
{
lock (_timers)
{
if (!_timers.ContainsKey(jobName))
_timers.TryAdd(jobName, new TaskBasedJob(jobName, action, interval, delay, _logger));
}
}
return _timers[jobName];
}
public IReadOnlyDictionary<string, IJob> GetJobs()
{
return _timers;
}
}
在这个库中,我有如下服务:所以这个服务的想法只是在名为 _accessInfos
的字典及其异步方法中获取一些数据。您可以在构造函数中看到我已经添加了获取数据的作业。
internal class AccessInfoStore : IAccessInfoStore
{
private readonly ILogger _logger;
private readonly Func<HttpClient> _httpClientFunc;
private volatile Dictionary<string, IAccessInfo> _accessInfos;
private readonly IScheduler _scheduler;
private static string JobName = "AccessInfoProviderJob";
public AccessInfoStore(IScheduler scheduler, ILogger logger, Func<HttpClient> httpClientFunc)
{
_accessInfos = new Dictionary<string, IAccessInfo>();
_config = config;
_logger = logger;
_httpClientFunc = httpClientFunc;
_scheduler = scheduler;
scheduler.ScheduleAsync(JobName, FetchAccessInfos, TimeSpan.FromMinutes(1));
}
public IJob FetchJob => _scheduler.GetJobs()[JobName];
private async Task<JobStatus> FetchAccessInfos()
{
using (var client = _httpClientFunc())
{
accessIds = //calling a webservice
_accessInfos = accessIds;
return JobStatus.Success;
}
}
所有这些代码都在我的 ASP.NET Core 2.1 项目中引用的另一个库中。在启动时 class 我有一个这样的电话:
//adding services
...
services.AddScoped<IScheduler, Scheduler>();
services.AddScoped<IAccessInfoStore, AccessInfoStore>();
var accessInfoStore = services.BuildServiceProvider().GetService<IAccessInfoStore>();
accessInfoStore.FetchJob.WaitHandle.WaitOne();
第一次WaitOne()
方法不起作用所以数据没有加载(_accessInfos
是空的)但是如果我再次刷新页面我可以看到数据加载(_accessInfos
不为空但有数据)。因此,据我所知 WaitOne()
方法是在我的工作完成之前阻止线程执行。
有人知道为什么 WaitOne()
方法不能正常工作或者我可能做错了什么吗?
编辑 1:
Scheduler
仅将所有 IJob
-s 存储到并发字典中,以便以后在需要时获取它们,主要是为了在健康页面中显示它们。然后每次我们在字典中插入一个新的 TaskBasedJob
时,构造函数都会被执行,最后我们使用 Timer
在一段时间后重新执行作业,但为了使这个线程- safe 我使用 SemaphoreSlim class 并从那里公开 WaitHandle
。这仅适用于我需要将方法从异步转换为同步的极少数情况。因为通常我不会使用它,因为在正常情况下作业将以异步方式执行。
我的期望 - WaitOne()
应该停止执行当前线程并等待我的预定作业执行,然后继续执行当前线程。在我的例子中,当前线程是 运行 Configure
方法 StartUp
class.
这里是 Rajmond 的同事。我弄清楚了我们的问题。基本上,等待工作正常等等。我们的问题很简单,如果你这样做 IServiceCollection.BuildServiceProvider()
你每次都会得到不同的范围(因此即使使用 Singleton 实例也会创建不同的对象)。尝试这个的简单方法:
var serviceProvider1 = services.BuildServiceProvider();
var hashCode1 = serviceProvider1.GetService<IAccessInfoStore>().GetHashCode();
var hashCode2 = serviceProvider1.GetService<IAccessInfoStore>().GetHashCode();
var serviceProvider2 = services.BuildServiceProvider();
var hashCode3 = serviceProvider2.GetService<IAccessInfoStore>().GetHashCode();
var hashCode4 = serviceProvider2.GetService<IAccessInfoStore>().GetHashCode();
hashCode1
和hashCode2
是一样的,和hashCode3
和hashCode4
一样(因为Singleton),但是hashCode1
/hashCode2
与 hashCode3
/hashCode4
不同(因为不同的服务提供商)。
真正的解决方法可能是在 IAccessInfoStore 中进行一些检查,它将在内部阻塞,直到作业第一次完成。
干杯!
我的情况很复杂,但我会尽量简短,只告知重要细节。我正在尝试实施基于任务的作业处理。这是 class 的:
internal class TaskBasedJob : IJob
{
public WaitHandle WaitHandle { get; }
public JobStatus Status { get; private set; }
public TaskBasedJob(Func<Task<JobStatus>> action, TimeSpan interval, TimeSpan delay)
{
Status = JobStatus.NotExecuted;
var semaphore = new SemaphoreSlim(0, 1);
WaitHandle = semaphore.AvailableWaitHandle;
_timer = new Timer(async x =>
{
// return to prevent duplicate executions
// Semaphore starts locked so WaitHandle works properly
if (semaphore.CurrentCount == 0 && Status != JobStatus.NotExecuted)
{
return;
Status = JobStatus.Failure;
}
if(Status != JobStatus.NotExecuted)
await semaphore.WaitAsync();
try
{
await action();
}
finally
{
semaphore.Release();
}
}, null, delay, interval);
}
}
下面是调度器 class :
internal class Scheduler : IScheduler
{
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, IJob> _timers = new ConcurrentDictionary<string, IJob>();
public Scheduler(ILogger logger)
{
_logger = logger;
}
public IJob ScheduleAsync(string jobName, Func<Task<JobStatus>> action, TimeSpan interval, TimeSpan delay = default(TimeSpan))
{
if (!_timers.ContainsKey(jobName))
{
lock (_timers)
{
if (!_timers.ContainsKey(jobName))
_timers.TryAdd(jobName, new TaskBasedJob(jobName, action, interval, delay, _logger));
}
}
return _timers[jobName];
}
public IReadOnlyDictionary<string, IJob> GetJobs()
{
return _timers;
}
}
在这个库中,我有如下服务:所以这个服务的想法只是在名为 _accessInfos
的字典及其异步方法中获取一些数据。您可以在构造函数中看到我已经添加了获取数据的作业。
internal class AccessInfoStore : IAccessInfoStore
{
private readonly ILogger _logger;
private readonly Func<HttpClient> _httpClientFunc;
private volatile Dictionary<string, IAccessInfo> _accessInfos;
private readonly IScheduler _scheduler;
private static string JobName = "AccessInfoProviderJob";
public AccessInfoStore(IScheduler scheduler, ILogger logger, Func<HttpClient> httpClientFunc)
{
_accessInfos = new Dictionary<string, IAccessInfo>();
_config = config;
_logger = logger;
_httpClientFunc = httpClientFunc;
_scheduler = scheduler;
scheduler.ScheduleAsync(JobName, FetchAccessInfos, TimeSpan.FromMinutes(1));
}
public IJob FetchJob => _scheduler.GetJobs()[JobName];
private async Task<JobStatus> FetchAccessInfos()
{
using (var client = _httpClientFunc())
{
accessIds = //calling a webservice
_accessInfos = accessIds;
return JobStatus.Success;
}
}
所有这些代码都在我的 ASP.NET Core 2.1 项目中引用的另一个库中。在启动时 class 我有一个这样的电话:
//adding services
...
services.AddScoped<IScheduler, Scheduler>();
services.AddScoped<IAccessInfoStore, AccessInfoStore>();
var accessInfoStore = services.BuildServiceProvider().GetService<IAccessInfoStore>();
accessInfoStore.FetchJob.WaitHandle.WaitOne();
第一次WaitOne()
方法不起作用所以数据没有加载(_accessInfos
是空的)但是如果我再次刷新页面我可以看到数据加载(_accessInfos
不为空但有数据)。因此,据我所知 WaitOne()
方法是在我的工作完成之前阻止线程执行。
有人知道为什么 WaitOne()
方法不能正常工作或者我可能做错了什么吗?
编辑 1:
Scheduler
仅将所有 IJob
-s 存储到并发字典中,以便以后在需要时获取它们,主要是为了在健康页面中显示它们。然后每次我们在字典中插入一个新的 TaskBasedJob
时,构造函数都会被执行,最后我们使用 Timer
在一段时间后重新执行作业,但为了使这个线程- safe 我使用 SemaphoreSlim class 并从那里公开 WaitHandle
。这仅适用于我需要将方法从异步转换为同步的极少数情况。因为通常我不会使用它,因为在正常情况下作业将以异步方式执行。
我的期望 - WaitOne()
应该停止执行当前线程并等待我的预定作业执行,然后继续执行当前线程。在我的例子中,当前线程是 运行 Configure
方法 StartUp
class.
这里是 Rajmond 的同事。我弄清楚了我们的问题。基本上,等待工作正常等等。我们的问题很简单,如果你这样做 IServiceCollection.BuildServiceProvider()
你每次都会得到不同的范围(因此即使使用 Singleton 实例也会创建不同的对象)。尝试这个的简单方法:
var serviceProvider1 = services.BuildServiceProvider();
var hashCode1 = serviceProvider1.GetService<IAccessInfoStore>().GetHashCode();
var hashCode2 = serviceProvider1.GetService<IAccessInfoStore>().GetHashCode();
var serviceProvider2 = services.BuildServiceProvider();
var hashCode3 = serviceProvider2.GetService<IAccessInfoStore>().GetHashCode();
var hashCode4 = serviceProvider2.GetService<IAccessInfoStore>().GetHashCode();
hashCode1
和hashCode2
是一样的,和hashCode3
和hashCode4
一样(因为Singleton),但是hashCode1
/hashCode2
与 hashCode3
/hashCode4
不同(因为不同的服务提供商)。
真正的解决方法可能是在 IAccessInfoStore 中进行一些检查,它将在内部阻塞,直到作业第一次完成。
干杯!