某处有 Ix.NET (System.Interactive) 的例子吗?
Is there an example of Ix.NET (System.Interactive) somewhere?
我有一个异步方法,比如:
public async Task<T> GetAsync()
{
}
并且会被调用:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}
以上语法无效,但基本上我是在使用异步生成器。我知道它可以通过 Observables 处理。我用 Rx.NET 做了实验,它在一定程度上起作用了。但我试图避免它给代码库带来的复杂性,更重要的是,上述要求本质上仍然不是一个反应式系统(我们的仍然是基于拉动的)。例如我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()
但这使得在不阻塞的情况下执行 LINQ 操作有点棘手。我希望它是非阻塞的,也不需要将整个东西加载到内存中。这个图书馆:AsyncEnumerable does exactly what I am looking for but how can the same be done with Ix.NET?它们的意义与我相信的相同。
换句话说,如何在处理await
时利用Ix.NET生成一个IAsyncEnumerable
?喜欢,
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}
(已编辑)
使用来自 NuGet 的 System.Linq.Async 4.0.0,现在您可以使用 SelectAwait
.
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something
.ToAsyncEnumerable()
.SelectAwait(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
(已过时)
使用来自 NuGet 的 System.Interactive.Async 3.2.0,这个怎么样?目前Select()
不支持async lambda,需要自己实现。
Better support for async - Task based overloads for AsyncEnumerable
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something.SelectAsync(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var enumerator = enumerable.GetEnumerator();
var current = default(TResult);
return AsyncEnumerable.CreateEnumerator(async c =>
{
var moveNext = enumerator.MoveNext();
current = moveNext
? await selector(enumerator.Current).ConfigureAwait(false)
: default(TResult);
return moveNext;
},
() => current,
() => enumerator.Dispose());
});
}
}
扩展方法引用自本示例。 https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972
我有一个异步方法,比如:
public async Task<T> GetAsync()
{
}
并且会被调用:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}
以上语法无效,但基本上我是在使用异步生成器。我知道它可以通过 Observables 处理。我用 Rx.NET 做了实验,它在一定程度上起作用了。但我试图避免它给代码库带来的复杂性,更重要的是,上述要求本质上仍然不是一个反应式系统(我们的仍然是基于拉动的)。例如我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()
但这使得在不阻塞的情况下执行 LINQ 操作有点棘手。我希望它是非阻塞的,也不需要将整个东西加载到内存中。这个图书馆:AsyncEnumerable does exactly what I am looking for but how can the same be done with Ix.NET?它们的意义与我相信的相同。
换句话说,如何在处理await
时利用Ix.NET生成一个IAsyncEnumerable
?喜欢,
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}
(已编辑)
使用来自 NuGet 的 System.Linq.Async 4.0.0,现在您可以使用 SelectAwait
.
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something
.ToAsyncEnumerable()
.SelectAwait(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
(已过时)
使用来自 NuGet 的 System.Interactive.Async 3.2.0,这个怎么样?目前Select()
不支持async lambda,需要自己实现。
Better support for async - Task based overloads for AsyncEnumerable
class Program
{
static void Main(string[] args)
{
Task.Run(async () =>
await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));
Thread.Sleep(4000);
}
static IAsyncEnumerable<string> GetAllAsync()
{
var something = new[] { 1, 2, 3 };
return something.SelectAsync(async (x) => await GetAsync(x));
}
static async Task<string> GetAsync(int item)
{
await Task.Delay(1000); // heavy
return "got " + item;
}
}
static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var enumerator = enumerable.GetEnumerator();
var current = default(TResult);
return AsyncEnumerable.CreateEnumerator(async c =>
{
var moveNext = enumerator.MoveNext();
current = moveNext
? await selector(enumerator.Current).ConfigureAwait(false)
: default(TResult);
return moveNext;
},
() => current,
() => enumerator.Dispose());
});
}
}
扩展方法引用自本示例。 https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972