ActionBlock Framework 4 rx 替代方案
ActionBlock Framework 4 rx alternative
我对 Framework 4.0 的 ActionBlock 实现很感兴趣,因为 Framework 4.0 似乎不支持 TPL.Dataflow。
更具体地说,我对接收 Func 委托和 MaxDegreeOfParallism = 1 情况的构造函数的情况感兴趣。
我考虑过使用响应式扩展来实现它,但我不确定该怎么做。考虑创建一个 Subject 并在 Post 上调用 OnNext,并使用 SelectMany 和任务 ToObservable 东西,但我不确定如何处理调度程序。这是我的想法的草稿。
public class ActionBlock<TInput>
{
private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
private readonly Subject<TInput> mQueue = new Subject<TInput>();
public ActionBlock(Func<TInput, Task> action)
{
var observable =
from item in mQueue
from _ in action(item).ToObservable()
select _;
observable.Subscribe(x => { },
OnComplete);
}
private void OnComplete()
{
mCompletion.SetResult(null);
}
public void Post(TInput input)
{
mQueue.OnNext(input);
}
public Task Completion
{
get
{
return mCompletion.Task;
}
}
public void Complete()
{
mQueue.OnCompleted();
}
}
我想也许可以使用 EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。
有什么想法吗?
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Merge(maxDegreeOfParallelism)
.Subscribe(...);
如果 maxDegreeOfParallelism
确实总是 1,那么只需使用 Concat
而不是 Merge
:
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Subscribe(...);
这是有效的,因为 FromAsync
只是创建了一个冷的可观察对象,在它被订阅之前不会 运行 异步操作。然后,我们使用 Merge
的 maxConcurrency
参数(或只是 Concat
)来限制并发订阅的数量(从而限制异步操作的数量 运行ning)。
编辑:
并且由于您的目标只是有一个代表流完成的 Task
,您可以使用 ToTask
而不是直接订阅。 ToTask
将订阅并 return 一个具有最终值的 Task
。因为 ToTask
会在 observable 不产生值时抛出,我们将使用 Count
来保证它产生值:
// task to mark completion
private readonly Task mCompletion;
// ...
this.mCompletion = mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Count()
.ToTask();
我对 Framework 4.0 的 ActionBlock 实现很感兴趣,因为 Framework 4.0 似乎不支持 TPL.Dataflow。
更具体地说,我对接收 Func
我考虑过使用响应式扩展来实现它,但我不确定该怎么做。考虑创建一个 Subject
public class ActionBlock<TInput>
{
private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
private readonly Subject<TInput> mQueue = new Subject<TInput>();
public ActionBlock(Func<TInput, Task> action)
{
var observable =
from item in mQueue
from _ in action(item).ToObservable()
select _;
observable.Subscribe(x => { },
OnComplete);
}
private void OnComplete()
{
mCompletion.SetResult(null);
}
public void Post(TInput input)
{
mQueue.OnNext(input);
}
public Task Completion
{
get
{
return mCompletion.Task;
}
}
public void Complete()
{
mQueue.OnCompleted();
}
}
我想也许可以使用 EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。
有什么想法吗?
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Merge(maxDegreeOfParallelism)
.Subscribe(...);
如果 maxDegreeOfParallelism
确实总是 1,那么只需使用 Concat
而不是 Merge
:
mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Subscribe(...);
这是有效的,因为 FromAsync
只是创建了一个冷的可观察对象,在它被订阅之前不会 运行 异步操作。然后,我们使用 Merge
的 maxConcurrency
参数(或只是 Concat
)来限制并发订阅的数量(从而限制异步操作的数量 运行ning)。
编辑:
并且由于您的目标只是有一个代表流完成的 Task
,您可以使用 ToTask
而不是直接订阅。 ToTask
将订阅并 return 一个具有最终值的 Task
。因为 ToTask
会在 observable 不产生值时抛出,我们将使用 Count
来保证它产生值:
// task to mark completion
private readonly Task mCompletion;
// ...
this.mCompletion = mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Count()
.ToTask();