异步流与反应式扩展相比如何?
How Async streams compares to reactive extension?
下面两个怎么比较? Rx更强大吗?
响应式扩展:
var observable = Observable.Create<char>(async (observer, cancel) =>
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
observer.OnNext(line);
}
});
observable.Subscribe(
c => Console.WriteLine(c.ToString()),
() => end.Dispose());
异步流:
public async void Run(string path)
{
await foreach (var line in TestAsync())
{
Console.WriteLine(line);
}
}
private async IAsyncEnumerable<string> TestAsync()
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
yield return line;
}
}
这两个功能协同工作。 PS:忘掉async streams
,想想await foreach
。
异步流
异步流是一个(相对)低级的特性,允许异步迭代。它本身不提供任何其他功能,如过滤、聚合等。它是基于拉的,而 Rx 是基于推的。
您可以通过 System.Linq.Async library found in ..... the ReacticeX.NET Github repo 在异步流上使用 LINQ 运算符。它速度很快,但不提供 Rx 的事件处理功能。
例如,没有时间的概念,更不用说使用自定义调度程序的方法了。没有订阅,没有错误事件。 GroupBy 将消耗整个源并将组项作为单独的 IAsyncEnumerable
实例发出,而 Rx 的 GroupBy 将为每个组发出单独的 Observables。
在问题的示例中,IAsyncEnumerable 很适合,因为不涉及事件逻辑,只是迭代异步迭代器。
如果示例尝试轮询远程服务并检测故障峰值(即每个时间间隔的故障数超过阈值),IAsyncEnumerable 将不合适,因为它会阻止等待所有响应。事实上,我们根本无法按时间汇总事件。
线程
None 真的 - IAsyncEnumerable 或 await foreach
调用不指定如何生成或使用事件。如果我们想使用一个单独的任务来处理一个项目,我们必须自己创建它,eg :
public async Task Run(string path)
{
await foreach (var line in LoadStockTrades())
{
var result = await Task.Run(()=>AnalyzeTrade(line));
Console.WriteLine($"{result} : {line});
}
}
响应式扩展
Reactive Extensions 是处理事件流的高级库。它是基于推送的,它理解时间,但它也比异步流或通道等较低级别的结构慢。
在这个问题的例子中,Rx 太过分了。轮询和检测尖峰虽然很容易,但有多个窗口选项。
System.Linq.Async 可以使用 ToObservable 从 IAsyncEnumerable 创建 Observable,这意味着 IAsyncEnumerable 可以用作 Rx 的源。
线程
默认情况下,Rx 是单线程的,这对于它的主要场景——事件流处理来说非常有意义。
另一方面,Rx 允许发布者、订阅者和操作者 运行 在相同或不同的线程上。在 没有 的语言中 async/await
或 DataFlow(例如 Java,JavaScript),Rx 用于通过 运行在不同的线程上设置发布者和订阅者。
下面两个怎么比较? Rx更强大吗?
响应式扩展:
var observable = Observable.Create<char>(async (observer, cancel) =>
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
observer.OnNext(line);
}
});
observable.Subscribe(
c => Console.WriteLine(c.ToString()),
() => end.Dispose());
异步流:
public async void Run(string path)
{
await foreach (var line in TestAsync())
{
Console.WriteLine(line);
}
}
private async IAsyncEnumerable<string> TestAsync()
{
while (true)
{
string line = await sr.ReadLineAsync();
if (line == null)
break;
yield return line;
}
}
这两个功能协同工作。 PS:忘掉async streams
,想想await foreach
。
异步流
异步流是一个(相对)低级的特性,允许异步迭代。它本身不提供任何其他功能,如过滤、聚合等。它是基于拉的,而 Rx 是基于推的。
您可以通过 System.Linq.Async library found in ..... the ReacticeX.NET Github repo 在异步流上使用 LINQ 运算符。它速度很快,但不提供 Rx 的事件处理功能。
例如,没有时间的概念,更不用说使用自定义调度程序的方法了。没有订阅,没有错误事件。 GroupBy 将消耗整个源并将组项作为单独的 IAsyncEnumerable
实例发出,而 Rx 的 GroupBy 将为每个组发出单独的 Observables。
在问题的示例中,IAsyncEnumerable 很适合,因为不涉及事件逻辑,只是迭代异步迭代器。
如果示例尝试轮询远程服务并检测故障峰值(即每个时间间隔的故障数超过阈值),IAsyncEnumerable 将不合适,因为它会阻止等待所有响应。事实上,我们根本无法按时间汇总事件。
线程
None 真的 - IAsyncEnumerable 或 await foreach
调用不指定如何生成或使用事件。如果我们想使用一个单独的任务来处理一个项目,我们必须自己创建它,eg :
public async Task Run(string path)
{
await foreach (var line in LoadStockTrades())
{
var result = await Task.Run(()=>AnalyzeTrade(line));
Console.WriteLine($"{result} : {line});
}
}
响应式扩展
Reactive Extensions 是处理事件流的高级库。它是基于推送的,它理解时间,但它也比异步流或通道等较低级别的结构慢。
在这个问题的例子中,Rx 太过分了。轮询和检测尖峰虽然很容易,但有多个窗口选项。
System.Linq.Async 可以使用 ToObservable 从 IAsyncEnumerable 创建 Observable,这意味着 IAsyncEnumerable 可以用作 Rx 的源。
线程
默认情况下,Rx 是单线程的,这对于它的主要场景——事件流处理来说非常有意义。
另一方面,Rx 允许发布者、订阅者和操作者 运行 在相同或不同的线程上。在 没有 的语言中 async/await
或 DataFlow(例如 Java,JavaScript),Rx 用于通过 运行在不同的线程上设置发布者和订阅者。