Hangfire 自定义状态过期
Hangfire Custom State Expiration
我实现了一个自定义状态 "blocked",在满足某些外部要求后进入入队状态。
有时这些外部要求永远得不到满足,导致作业卡在阻塞状态。我想要的是让处于这种状态的作业在一段可配置的时间后自动过期。
这样的要求有支持吗?有 ExpirationDate
字段,但从代码来看它似乎只用于 final
状态。
状态尽可能简单:
internal sealed class BlockedState : IState
{
internal const string STATE_NAME = "Blocked";
public Dictionary<string, string> SerializeData()
{
return new Dictionary<string, string>();
}
public string Name => STATE_NAME;
public string Reason => "Waiting for external resource";
public bool IsFinal => false;
public bool IgnoreJobLoadException => false;
}
并简单地用作 _hangfireBackgroundJobClient.Create(() => Console.WriteLine("hello world"), new BlockedState());
在稍后阶段,它会通过 _hangfireBackgroundJobClient.ChangeState(jobId, new EnqueuedState(), BlockedState.STATE_NAME)
向前移动
你能利用EventWaitHandle吗?
看看Generic Timout。
例如:
//action : your job
//timeout : your desired ExpirationDate
void DoSomething(Action action, int timeout)
{
EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
AsyncCallback callback = ar => waitHandle.Set();
action.BeginInvoke(callback, null);
if (!waitHandle.WaitOne(timeout))
{
// Expired here
}
}
我会选择自定义实现 IBackgroundProcess
以 DelayedJobScheduler 为例
它会定期获取延迟的作业以将其排队。
在这个自定义实现中,我将使用 JobStorageConnection.GetAllItemsFromSet("blocked")
来获取所有被阻止的作业 ID(其中 DelayedJobScheduler
使用 JobStorageConnection.GetFirstByLowestScoreFromSet
)
然后我会用 JobStorageConnection.GetJobData(jobId)
获取每个被阻止的作业数据。对于它们中的每一个,取决于它的 CreatedAt
字段,如果作业没有过期我什么都不做,或者如果它已过期则将其状态更改为另一个状态(Failed
?)。
自定义作业流程可以这样声明:
app.UseHangfireServer(storage, options,
new IBackgroundProcess[] {
new MyCustomJobProcess(
myTimeSpanForExpiration,
(IBackgroundJobStateChanger) new BackgroundJobStateChanger(filterProvider)) });
这里的一个困难是获得一个 IBackgroundJobStateChanger
,因为服务器似乎没有公开它自己的。
如果您使用自定义 FilterProvider
作为服务器的选项,则将其值作为 filterProvider
传递,否则使用 (IJobFilterProvider) JobFilterProviders.Providers
我实现了一个自定义状态 "blocked",在满足某些外部要求后进入入队状态。
有时这些外部要求永远得不到满足,导致作业卡在阻塞状态。我想要的是让处于这种状态的作业在一段可配置的时间后自动过期。
这样的要求有支持吗?有 ExpirationDate
字段,但从代码来看它似乎只用于 final
状态。
状态尽可能简单:
internal sealed class BlockedState : IState
{
internal const string STATE_NAME = "Blocked";
public Dictionary<string, string> SerializeData()
{
return new Dictionary<string, string>();
}
public string Name => STATE_NAME;
public string Reason => "Waiting for external resource";
public bool IsFinal => false;
public bool IgnoreJobLoadException => false;
}
并简单地用作 _hangfireBackgroundJobClient.Create(() => Console.WriteLine("hello world"), new BlockedState());
在稍后阶段,它会通过 _hangfireBackgroundJobClient.ChangeState(jobId, new EnqueuedState(), BlockedState.STATE_NAME)
你能利用EventWaitHandle吗?
看看Generic Timout。
例如:
//action : your job
//timeout : your desired ExpirationDate
void DoSomething(Action action, int timeout)
{
EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
AsyncCallback callback = ar => waitHandle.Set();
action.BeginInvoke(callback, null);
if (!waitHandle.WaitOne(timeout))
{
// Expired here
}
}
我会选择自定义实现 IBackgroundProcess
以 DelayedJobScheduler 为例
它会定期获取延迟的作业以将其排队。
在这个自定义实现中,我将使用 JobStorageConnection.GetAllItemsFromSet("blocked")
来获取所有被阻止的作业 ID(其中 DelayedJobScheduler
使用 JobStorageConnection.GetFirstByLowestScoreFromSet
)
然后我会用 JobStorageConnection.GetJobData(jobId)
获取每个被阻止的作业数据。对于它们中的每一个,取决于它的 CreatedAt
字段,如果作业没有过期我什么都不做,或者如果它已过期则将其状态更改为另一个状态(Failed
?)。
自定义作业流程可以这样声明:
app.UseHangfireServer(storage, options,
new IBackgroundProcess[] {
new MyCustomJobProcess(
myTimeSpanForExpiration,
(IBackgroundJobStateChanger) new BackgroundJobStateChanger(filterProvider)) });
这里的一个困难是获得一个 IBackgroundJobStateChanger
,因为服务器似乎没有公开它自己的。
如果您使用自定义 FilterProvider
作为服务器的选项,则将其值作为 filterProvider
传递,否则使用 (IJobFilterProvider) JobFilterProviders.Providers