如何在运行时确定作业的队列

How to determine job's queue at runtime

我们的网络应用程序允许最终用户在 UI 上设置重复作业队列。 (我们为每个服务器创建一个队列(使用服务器名称)并允许用户选择服务器 运行) 职位注册方式:

RecurringJob.AddOrUpdate<IMyTestJob>(input.Id, x => x.Run(), input.Cron, TimeZoneInfo.Local, input.QueueName);

它工作正常,但有时我们检查生产日志,发现它 运行 在错误的队列(服务器)上。我们没有更多的生产访问权限,因此我们尝试在开发中重现,但没有成功。

为了暂时解决这个问题,我们需要在作业 运行ning 时获取队列名称,然后将其与当前服务器名称进行比较,当它们不同时停止。

是否可能以及如何从 PerformContext 获取它?

注意:我们使用 HangFire 版本:1.7.9 和 ASP.NET Core 3.1

你可以看看https://github.com/HangfireIO/Hangfire/pull/502

专用过滤器拦截队列更改并恢复原始队列。

我想你可以在一个非常相似的过滤器中停止执行,或者通过将 CandidateState 更改为 FailedState[ 来设置参数以在 IElectStateFilter.OnStateElection 阶段完全停止执行=15=]

也许你的问题来自于一个已经存在的过滤器,它弄乱了队列。

这是上面 link 中的代码:

    public class PreserveOriginalQueueAttribute : JobFilterAttribute, IApplyStateFilter
    {
        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            var enqueuedState = context.NewState as EnqueuedState;
    
            // Activating only when enqueueing a background job
            if (enqueuedState != null)
            {
                // Checking if an original queue is already set
                var originalQueue = JobHelper.FromJson<string>(context.Connection.GetJobParameter(
                    context.BackgroundJob.Id,
                    "OriginalQueue"));
    
                if (originalQueue != null)
                {
                    // Override any other queue value that is currently set (by other filters, for example)
                    enqueuedState.Queue = originalQueue;
                }
                else
                {
                    // Queueing for the first time, we should set the original queue
                    context.Connection.SetJobParameter(
                        context.BackgroundJob.Id,
                        "OriginalQueue",
                        JobHelper.ToJson(enqueuedState.Queue));    
                }
            }
        }
    
        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
        }
    }

我找到了一个简单的解决方案:因为我们知道 Recurring Job Id,我们可以从 JobStorage 中获取它的信息并将其与当前队列(当前服务器名称)进行比较:

public bool IsCorrectQueue()
{
    List<RecurringJobDto> recurringJobs = Hangfire.JobStorage.Current.GetConnection().GetRecurringJobs();
    var myJob = recurringJobs.FirstOrDefault(x => x.Id.Equals("My job Id"));
    var definedQueue = myJob.Queue;
    var currentServerQueue = string.Concat(Environment.MachineName.ToLowerInvariant().Where(char.IsLetterOrDigit));

    return definedQueue == "default" || definedQueue == currentServerQueue;
}

然后在作业中查看:

public async Task Run()
{
    //Check correct queue
    if (!IsCorrectQueue())
    {
        Logger.Error("Wrong queue detected");
        return;
    }

    //Job logic
}