如何在 Rx.Net 中使用异步累加器实现 ScanAsync 运算符?

How to implement a ScanAsync operator with async accumulator in Rx.Net?

Rx.Net 中的 Scan 运算符具有签名:

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);

累加器是

Func<TAccumulate, TSource, TAccumulate> accumulator

在尝试使用异步状态转换实现状态机模型时,我发现具有以下签名的 ScanAsync 运算符会有所帮助。

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, Task<TAccumulate>> accumulator);

累加器有签名

Func<TAccumulate, TSource, Task<TAccumulate>> accumulator

理想的应用程序代码应该是这样的(类似于普通的 Scan 运算符,不同之处在于使用异步累加器)。

IObservable<TEvent> events;
IObservable<State> states = events.ScanAsync(
    initialState, 
    async (previousState, evt) => {
        var newState = await transitionAsync(previousState, evt);
        return newState;
    });

MS 似乎正在开发 AsyncRx.NET, however it is not released yet(no schedule)


相关内容:

如果通过BehaviourSubject为状态建模异步状态机,并订阅可观察的事件,如以下代码

IObservable<TEvent> events;
BehaviourSubject<State> states = new BehaviourSubject<State>(initialState);
events.Subscribe(async e => {
    var newState = await transition(states.Value, e);
    states.OnNext(newState);
})

我想在某些情况下可能存在竞争条件。

我试过用

实现它
IObservable<TS> ScanAsync<TS, TE>(
IObservable<TE> source,
Func<TS, TE, Task<TS>> reducer,
TS initialState)
{
    var states = from m in source.Take(1)
                    from nextState in reducer(initialState, m).ToObservable()
                    from s in ScanAsync(source.Skip(1), reducer, nextState)
                    select s;
    return Observable.Return(initialState).Concat(states);
}

但是有时它可以工作,有时它只是被阻止,我不知道是什么原因造成的。

您可以使用 Scan 运算符创建一个中间 IObservable<Task<TAccumulate>>,然后可以使用 Concat 运算符将其展平:

public static IObservable<TAccumulate> Scan<TSource, TAccumulate>(
    this IObservable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, Task<TAccumulate>> accumulator)
{
    return source.Scan(Task.FromResult(seed), async (previousTask, item) =>
    {
        return await accumulator(await previousTask, item);
    }).Concat();
}

上面的实现使用 Concat 重载来接受任务的可观察对象,而不是嵌套的可观察对象:

// Concatenates all task results, as long as the previous task terminated successfully.
public static IObservable<TSource> Concat<TSource>(
    this IObservable<Task<TSource>> sources);