Hangfire - DisableConcurrentExecution - 如果在方法参数中传递了相同的值,则阻止并发执行

Hangfire - DisableConcurrentExecution - Prevent concurrent execution if same value passed in method parameter

Hangfire DisableConcurrentExecution 属性未按预期工作。

我有一种方法,可以用不同的 ID 调用。如果传递了相同的 Id,我想阻止方法的并发执行。

string jobName= $"{Id} - Entry Job";

_recurringJobManager.AddOrUpdate<EntryJob>(jobName, j => j.RunAsync(Id, Null), "0 2 * * *");

我的 EntryJob 接口有 RunAsync 方法。

public class EntryJob:  IJob
  {
 [DisableConcurrentExecution(3600)] <-- Tried here
public async Task RunAsync(int Id, SomeObj obj)
    {
      //Some coe
    }
  }

界面是这样的

 [DisableConcurrentExecution(3600)] <-- Tried here
    public interface IJob
      {
       [DisableConcurrentExecution(3600)] <-- Tried here
        Task RunAsync(int Id, SomeObj obj);
      }

现在我想防止 RunAsync 方法在 Id 相同时调用多次。我试图将 DisableConcurrentExecution 放在 RunAsync 方法的顶部,位于接口声明内的两个位置以及实现接口的位置。

但它似乎不适合我。有没有办法根据Id来防止并发?

DisableConcurrentExecution 的现有实现不支持此功能。它将阻止使用任何参数并发执行该方法。添加支持会相当简单。下面的注释是未经测试的伪代码:

public class DisableConcurrentExecutionWithArgAttribute : JobFilterAttribute, IServerFilter
{
    private readonly int _timeoutInSeconds;
    private readonly int _argPos;

    // add additional param to pass in which method arg you want to use for 
    // deduping jobs
    public DisableConcurrentExecutionAttribute(int timeoutInSeconds, int argPos)
    {
        if (timeoutInSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");

        _timeoutInSeconds = timeoutInSeconds;
        _argPos = argPos;
    }

    public void OnPerforming(PerformingContext filterContext)
    {
        var resource = GetResource(filterContext.BackgroundJob.Job);

        var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);

        var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeout);
        filterContext.Items["DistributedLock"] = distributedLock;
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey("DistributedLock"))
        {
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
        }

        var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
        distributedLock.Dispose();
    }

    private static string GetResource(Job job)
    {
        // adjust locked resource to include the argument to make it unique
        // for a given ID
        return $"{job.Type.ToGenericTypeString()}.{job.Method.Name}.{job.Args[_argPos].ToString()}";
    }
}