ActionBlock Framework 4 rx 替代方案

ActionBlock Framework 4 rx alternative

我对 Framework 4.0 的 ActionBlock 实现很感兴趣,因为 Framework 4.0 似乎不支持 TPL.Dataflow。 更具体地说,我对接收 Func 委托和 MaxDegreeOfParallism = 1 情况的构造函数的情况感兴趣。

我考虑过使用响应式扩展来实现它,但我不确定该怎么做。考虑创建一个 Subject 并在 Post 上调用 OnNext,并使用 SelectMany 和任务 ToObservable 东西,但我不确定如何处理调度程序。这是我的想法的草稿。

public class ActionBlock<TInput>
{
    private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
    private readonly Subject<TInput> mQueue = new Subject<TInput>();

    public ActionBlock(Func<TInput, Task> action)
    {
        var observable =
            from item in mQueue
            from _ in action(item).ToObservable()
            select _;

        observable.Subscribe(x => { },
            OnComplete);
    }

    private void OnComplete()
    {
        mCompletion.SetResult(null);
    }

    public void Post(TInput input)
    {
        mQueue.OnNext(input);
    }

    public Task Completion
    {
        get
        {
            return mCompletion.Task;
        }
    }

    public void Complete()
    {
        mQueue.OnCompleted();
    }
}

我想也许可以使用 EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。

有什么想法吗?

mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Merge(maxDegreeOfParallelism)
    .Subscribe(...);

如果 maxDegreeOfParallelism 确实总是 1,那么只需使用 Concat 而不是 Merge:

mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Subscribe(...);

这是有效的,因为 FromAsync 只是创建了一个冷的可观察对象,在它被订阅之前不会 运行 异步操作。然后,我们使用 MergemaxConcurrency 参数(或只是 Concat)来限制并发订阅的数量(从而限制异步操作​​的数量 运行ning)。

编辑:

并且由于您的目标只是有一个代表流完成的 Task,您可以使用 ToTask 而不是直接订阅。 ToTask 将订阅并 return 一个具有最终值的 Task。因为 ToTask 会在 observable 不产生值时抛出,我们将使用 Count 来保证它产生值:

// task to mark completion
private readonly Task mCompletion;

// ...

this.mCompletion = mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Count()
    .ToTask();