Hangfire 自定义状态过期

Hangfire Custom State Expiration

我实现了一个自定义状态 "blocked",在满足某些外部要求后进入入队状态。

有时这些外部要求永远得不到满足,导致作业卡在阻塞状态。我想要的是让处于这种状态的作业在一段可配置的时间后自动过期。

这样的要求有支持吗?有 ExpirationDate 字段,但从代码来看它似乎只用于 final 状态。

状态尽可能简单:

    internal sealed class BlockedState : IState
    {
        internal const string STATE_NAME = "Blocked";

        public Dictionary<string, string> SerializeData()
        {
            return new Dictionary<string, string>();
        }

        public string Name => STATE_NAME;

        public string Reason => "Waiting for external resource";

        public bool IsFinal => false;

        public bool IgnoreJobLoadException => false;
    }

并简单地用作 _hangfireBackgroundJobClient.Create(() => Console.WriteLine("hello world"), new BlockedState());

在稍后阶段,它会通过 _hangfireBackgroundJobClient.ChangeState(jobId, new EnqueuedState(), BlockedState.STATE_NAME)

向前移动

你能利用EventWaitHandle吗?

看看Generic Timout

例如:

    //action : your job
    //timeout : your desired ExpirationDate 
    void DoSomething(Action action, int timeout)
    {
        EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
        AsyncCallback callback = ar => waitHandle.Set();
        action.BeginInvoke(callback, null);

        if (!waitHandle.WaitOne(timeout))
        {
             // Expired here
        }
    }

我会选择自定义实现 IBackgroundProcessDelayedJobScheduler 为例 它会定期获取延迟的作业以将其排队。

在这个自定义实现中,我将使用 JobStorageConnection.GetAllItemsFromSet("blocked") 来获取所有被阻止的作业 ID(其中 DelayedJobScheduler 使用 JobStorageConnection.GetFirstByLowestScoreFromSet

然后我会用 JobStorageConnection.GetJobData(jobId) 获取每个被阻止的作业数据。对于它们中的每一个,取决于它的 CreatedAt 字段,如果作业没有过期我什么都不做,或者如果它已过期则将其状态更改为另一个状态(Failed?)。

自定义作业流程可以这样声明:

       app.UseHangfireServer(storage, options, 
             new IBackgroundProcess[] { 
                        new MyCustomJobProcess(
                                myTimeSpanForExpiration, 
                                (IBackgroundJobStateChanger) new BackgroundJobStateChanger(filterProvider)) });

这里的一个困难是获得一个 IBackgroundJobStateChanger,因为服务器似乎没有公开它自己的。 如果您使用自定义 FilterProvider 作为服务器的选项,则将其值作为 filterProvider 传递,否则使用 (IJobFilterProvider) JobFilterProviders.Providers