在重复任务需求的情况下的替代方案

Alternative in a situation of recurring Task demand

我有观察者模块,它负责订阅我从 Kafka 创建的一些反应流。遗憾的是,我需要轮询才能从 kafka 接收消息,因此我需要为此专门提供一个后台线程。我的第一个解决方案是这个:

public void Poll()
{
    if (Interlocked.Exchange(ref _state, POLLING) == NOTPOLLING)
    {
        Task.Run(() =>
        {
            while (CurrentSubscriptions.Count != 0)
            {
                _consumer.Poll(TimeSpan.FromSeconds(1));
            }
             _state = NOTPOLLING;
        });
    }
}

现在我的评论者建议我应该 Task 因为它有状态并且可以检查它们是否 运行。这导致了这段代码:

public void Poll()
{
    // checks for statuses: WaitingForActivation, WaitingToRun, Running
    if (_runningStatuses.Contains(_pollingTask.Status)) return;
    _pollingTask.Start(); // this obviously throws exception once Task already completes and then I want to start it again
}

任务几乎保持不变,但检查已更改,因为我的逻辑是我想在有订阅时开始轮询,在没有订阅时停止我需要某种 重新使用 任务,但由于我不能,所以我想知道我是否需要回到我的第一个实现,或者是否有任何其他巧妙的方法可以做到这一点,而我现在正在错过?

I am wondering do I need to go back to my first implementation or is there any other neat way of doing this that right now I am missing?

您的第一个实现看起来不错。您可能会使用 ManualResetEventSlim 而不是 enumInterlocked.Exchange,但这本质上是相同的...只要您只有两个状态。

我想我做出了妥协并为 MethodImpl(MethodImpl.Options.Synchronized) 删除了 Interlocked API 它让我拥有简单的方法体,而不会混淆最终 newcomer/inexperienced 人的 Interlocked API 代码.

[MethodImpl(MethodImplOptions.Synchronized)]
public void Poll()
{
    if (!_polling)
    {
        _polling = true;
        new Task(() =>
        {
            while (_currentSubscriptions.Count != 0)
            {
                _consumer.Poll(TimeSpan.FromSeconds(1));
            }
            _polling = false;
        }, TaskCreationOptions.LongRunning).Start();
    }
}