使用 Hangfire 而不是 Tasks 执行异步操作

Perform async operations with Hangfire instead of Tasks

我有 4 个 类 执行不同的操作,这 2 个任务中的每一个都应该 运行 同步。

我当前的实现工作正常,但有时会出现奇怪的滞后,并且某些 processors 不会 运行。我想这可能是线程问题。

我想重构代码以使用 Hangfire 库或任何其他方式来保持程序正常运行。我不确定如何 属性 做到这一点,如果有任何帮助,我将不胜感激。

public void Run()
{
        var processors1 = new CommonProcessor[] { new AProcessor(), new BProcessor() };  
        //AProcessor should be first!
        var processors2 = new CommonProcessor[] { new CProcessor(), new DProcessor() }; 
        //CProcessor should be first!

        Task task1 = Task.Run(() => RunSyncProcess(processors1);
        Task task2 = Task.Run(() => RunSyncProcess(processors2);

        Task.WaitAll(task1, task2);
}

 private void RunSyncProcess(CommonProcessor[] processors)
 {
        while (true)
        {
             foreach (var processor in processors)
             { 
               // do some job 
             }
             Thread.Sleep(frequency);
        }
 }

您使用 Tasks 的方式有误。任务应该是非阻塞或短期存在的项目。

基本上,这里发生的是您启动永不结束且永不释放线程的任务。这将导致阻塞 ThreadPool 的某些线程。

有多种更改方式:

1) 非阻塞任务:

public void Run()
{
        var processors1 = new CommonProcessor[] { new AProcessor(), new BProcessor() };  
        //AProcessor should be first!
        var processors2 = new CommonProcessor[] { new CProcessor(), new DProcessor() }; 
        //CProcessor should be first!

        Task task1 = RunSyncProcess(processors1);
        Task task2 = RunSyncProcess(processors2);

        Task.WhenAll(task1, task2);
}

 private async Task RunSyncProcess(CommonProcessor[] processors)
 {
    while (true)
    {
         foreach (var processor in processors)
         { 
           // do some job 
         }
         await Task.Delay(TimeSpan.FromMilliseconds(frequency));//will free threadpool while waiting
    }
 }

2) 使用阻塞线程但不影响线程池:

public void Run()
{
    var processors1 = new CommonProcessor[] { new AProcessor(), new BProcessor() };  
    //AProcessor should be first!
    var processors2 = new CommonProcessor[] { new CProcessor(), new DProcessor() }; 
    //CProcessor should be first!

    Thread t1 = new Thread(() => RunSyncProcess(processors1));
    t1.Start();
    Thread t2 = new Thread(() => RunSyncProcess(processors1));
    t2.Start();

    t1.Join();
    t2.Join();
}

 private void RunSyncProcess(CommonProcessor[] processors)
 {
    while (true)
    {
         foreach (var processor in processors)
         { 
           // do some job 
         }
         Thread.Sleep(frequency);
    }
 }

Hangfire 主要用于即发即弃任务,您可以在其中排队、安排和重新排队作业,如果这是您想要实现的目标,您可以通过 nuget 安装,然后使用以下语法

BackgroundJob.Enqueue(() => RunSyncProcess(processors1));

因此,为了重构您的代码,您需要决定是要安排作业,还是要等待前一个任务成功完成后再等待新任务,这实际上取决于什么你想实现。

public void Run()
{
    var processors1 = new CommonProcessor[] { new AProcessor(), new BProcessor() };  
    //AProcessor should be first!
    var processors2 = new CommonProcessor[] { new CProcessor(), new DProcessor() }; 
    //CProcessor should be first!

    BackgroundJob.Enqueue(() => RunSyncProcess(processors1));
    BackgroundJob.Enqueue(() => RunSyncProcess(processors2));
}

 public void RunSyncProcess(CommonProcessor[] processors)
 {
    while (true)
    {
         foreach (var processor in processors)
         { 
           // do some job 
         }
    }
 }

您不必等待所有这些,因为这些将在幕后启动,并且您的 UI 会做出响应。请记住,当您想使用 hangfire 时,这些方法需要 public。