可从链式任务中观察到

Observable from chained Tasks

我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一个项目应该通过对前一个项目(共同递归)的结果的异步调用来产生。用 "Generate" 的说法,这看起来像这样 - 除了 Generate 不支持异步 (也不支持初始状态的委托。

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

作为一个更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请按如下方式实现 ProduceFirst、Continue 和 ProduceNext:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

然后在 IObservable<IEnumerable<BrokeredMessage>> 上调用 .SelectMany(i => i) 将其变成 IObservable<BrokeredMessage>

其中_serviceBusReceiver是一个接口的实例如下:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

而 BrokeredMessage 来自 https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

我认为这可能是正确答案:

这不是一个好的答案。请勿使用。

我自己创建的 Generate 支持 async/await 初始状态 + 迭代函数:

    public static IObservable<TResult> Generate<TResult>(
        Func<Task<TResult>> initialState,
        Func<TResult, bool> condition,
        Func<TResult, Task<TResult>> iterate,
        Func<TResult, TResult> resultSelector
        )
    {
        return Observable.Create<TResult>(async obs =>
        {
            var state = await initialState();

            while (condition(state))
            {
                var result = resultSelector(state);
                obs.OnNext(result);
                state = await iterate(state);
            }

            obs.OnCompleted();

            return System.Reactive.Disposables.Disposable.Empty;
        });
    }

不幸的是,这个似乎有副作用,即消息的生产速度远远超过消费。如果观察者处理消息的速度很慢,那么这将在我们处理少量消息之前获取数百万条消息。不完全是我们想要的服务总线。

我将完成上述内容,也许还会阅读更多内容,如果需要,我会 post 提出更具体的问题。

如果您要推出自己的异步 Generate 函数,我建议使用递归调度而不是包装 while 循环。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

这有两个优点。首先,您可以取消它,使用简单的 while 循环无法直接取消它,事实上,在可观察对象完成之前,您甚至不需要 return 订阅函数。其次,这使您可以控制每个项目的 scheduling/asynchrony(这使测试变得轻而易举),这也使它更适合 library

经过大量测试后,我认为使用内置 Rx 运算符可以很好地完成工作。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

我用以下内容测试了这段代码:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

和运行这个序列:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

我得到了这个结果:

我的测试代码也使用了 Reactive Extension 团队的 Interactive Extensions - NuGet "Ix-Main".

我自己也有类似的问题,也同意以下评论:

I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).

我认为 Ix.NET 中的 IAsyncEnumerableIObservable 更适合这种情况 - 无论是这里的问题还是任何类似的异步展开功能。原因是每次我们迭代然后从 Task 中提取结果时,流程控制与我们(调用者)一起拉取下一个项目,或者如果满足特定条件则选择不拉取。这类似于 IAsyncEnumerable 而不是 IObservable,它在我们无法控制速率的情况下将项目推送给我们。

Ix.NET 没有合适的 AsyncEnumerable.Generate 版本所以我写了下面的来解决这个问题。

   public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var started = false;
            var current = default(TState);
            return AsyncEnumerable.CreateEnumerator(async c =>
            {

                if (!started)
                {
                    started = true;
                    var conditionMet = !c.IsCancellationRequested && condition(initialState);
                    if (conditionMet) current = initialState;
                    return conditionMet;
                }
                {
                    var newVal = await iterate(current).ConfigureAwait(false);
                    var conditionMet = !c.IsCancellationRequested && condition(newVal);
                    if (conditionMet) current = newVal;
                    return conditionMet;
                }

            },
                () => current,
                () => { });
        });



    }

备注:

  • 仅进行了非常轻微的测试。
  • 是否return初始状态。
  • 不是return第一个不符合条件的 TState,即使它有 完成工作以获得该结果。可能不同的版本可以 包括那个。
  • 我更愿意去掉 condition 参数,因为它是一个拉式系统,是否调用 MoveNext 完全取决于调用者,因此 condition 似乎是多余的。它实质上是将对 TakeWhile 的调用添加到 功能。但是我对 Ix.NET 的研究还不够深入 知道是否需要来自 MoveNextfalse 响应才能进行排序 disposeIAsyncEnumerator,因此我将其包括在内。
如果需要特定类型,

IAsyncEnumerable 当然可以转换为 IObservable

这是另一个实现,灵感来自 Enigmativity 的 . It uses newer language features (C# 7 tuple deconstruction)。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null)
{
    return Observable.Create<TResult>(observer =>
    {
        var (isFirst, current) = (true, default(TResult));
        return Observable
            .While(() => isFirst || condition(current),
                Observable.If(() => isFirst,
                    Observable.FromAsync(ct => initialState()),
                    Observable.FromAsync(ct => iterate(current))
                )
            )
            .Do(x => (isFirst, current) = (false, x))
            .Select(x => resultSelector(x))
            .ObserveOn(scheduler ?? Scheduler.Immediate)
            .Subscribe(observer);
    });
}