使用 IObserver 接口等待事件和 AutoEventHandler/TaskCompletionSource

awaiting on an Event using IObserver Interface and AutoEventHandler/TaskCompletionSource

在我的场景中,我正在实现一个 IObserver 接口,以通过一种方法轮询特定变量的更新。我有一个线程只是为了执行这个方法而创建的。这个线程现在永远不应该停止寻找一些数据的更新。

public static void findUpdates()
{
    while(true)
    {
        CheckForUpdates(); //registered to the observer
    }
}

然后我让多个线程进入 class 方法来接收更新。

TaskCompletionSource<bool>updateHappened = new TaskCompletionSource<bool>();
object updatedValue;

public async Task receiveUpdates(){

    while(true)
    {
        await updateHappened.Task;
        //check to see if the thread cares about the updated value or not
        //update or do not update and possibly terminate
    }

}

并在 OnNext 实施中

public void OnNext(Object value){
    updatedHappened.SetResult(true);
}

我需要等待一个事件的原因是因为当我有线程进入时我的 while() 循环是 receiveUpdates 如果它们没有被等待那么最终会有最大数量的线程可以被调度并且一些线程得到阻止执行直到另一个退出。

这现在表现不正常,我永远无法通过等待 updateHappened.Task;无论我创建并发送多少线程来接收更新,都行。

我希望能够让想要接收更新的线程通过,在等待执行的线程池中暂停执行,以便其他线程(无论多少)也可以尝试接收更新,然后在更新时碰巧一次恢复每个线程并在再次挂起之前更新它或不更新它。一个线程将始终使用 IObserver 接口轮询更新,但尝试接收更新的所有其他线程将始终继续寻找更新,除非它在某种情况下退出。

一个TaskCompletionSource<T>只能完成一次;这是一个单发信号,仅此而已。

由于您已经在使用 Rx,请考虑让源线程使用 PublishConnect/RefCount 公开其可观察对象。这将允许多个订阅者接收相同的数据。然后对于只想 await 的线程,使用 await sequence.FirstAsync().