某处有 Ix.NET (System.Interactive) 的例子吗?

Is there an example of Ix.NET (System.Interactive) somewhere?

我有一个异步方法,比如:

public async Task<T> GetAsync()
{

}

并且会被调用:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

以上语法无效,但基本上我是在使用异步生成器。我知道它可以通过 Observables 处理。我用 Rx.NET 做了实验,它在一定程度上起作用了。但我试图避免它给代码库带来的复杂性,更重要的是,上述要求本质上仍然不是一个反应式系统(我们的仍然是基于拉动的)。例如我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。

我可以像这样反转方法签名:

public IEnumerable<Task<T>> GetAllAsync()

但这使得在不阻塞的情况下执行 LINQ 操作有点棘手。我希望它是非阻塞的,也不需要将整个东西加载到内存中。这个图书馆:AsyncEnumerable does exactly what I am looking for but how can the same be done with Ix.NET?它们的意义与我相信的相同。

换句话说,如何在处理await时利用Ix.NET生成一个IAsyncEnumerable?喜欢,

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}

(已编辑)

使用来自 NuGet 的 System.Linq.Async 4.0.0,现在您可以使用 SelectAwait.

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(已过时)

使用来自 NuGet 的 System.Interactive.Async 3.2.0,这个怎么样?目前Select()不支持async lambda,需要自己实现。

Better support for async - Task based overloads for AsyncEnumerable

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

扩展方法引用自本示例。 https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972