如何将 Rx.Nex 扩展 ForEachAsync 与异步操作一起使用
How to use Rx.Nex extension ForEachAsync with async action
我有代码可以从 SQL 流式传输数据并将其写入不同的存储区。代码大概是这样的:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
我想为此目的使用 Reactive 扩展。理想情况下,代码如下所示:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?
这是关于 ToEnumerable 和 AsObservable 方法的the source code for ForEachAsync and an article
我们可以围绕 ForEachAsync 进行包装,等待任务返回函数:
public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
foreach ( var x in t.ToEnumerable() )
await onNext( x );
}
用法示例:
await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
正确的做法是正确使用 Reactive Extensions 来完成这项工作 - 所以从创建连接开始直到写入数据。
方法如下:
IObservable<IList<MyData>> query =
Observable
.Using(() => new SqlConnection(""), connection =>
Observable
.Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
Observable
.Using(() => cmd.ExecuteReader(), reader =>
Observable
.While(() => reader.Read(), Observable.Return(GetRow(reader))))))
.Buffer(BatchSize);
IDisposable subscription =
query
.Subscribe(async list => await WriteDataAsync(list));
我无法测试代码,但它应该可以工作。此代码假定 WriteDataAsync
也可以采用 IList<MyData>
。如果它不只是掉落在 .ToList()
.
Would it be possible to write an extension which would accept an async action?
不直接。
Rx 订阅必须是同步的,因为 Rx 是一个基于推送的系统。当一个数据项到达时,它会遍历您的查询,直到它到达最终订阅 - 在本例中是执行 Action
.
Rx 提供的 await
-able 方法是 await
ing 序列 本身 - 即 ForEachAsync
是异步的序列(您正在异步等待序列完成),但 ForEachAsync
中的订阅(对每个元素采取的操作)必须仍然是同步的。
为了在您的数据管道中进行同步到异步的转换,您需要有一个缓冲区。 Rx 订阅可以(同步地)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个支持同步和异步操作的 producer/consumer 队列。
TPL Dataflow 中的各种块类型可以满足这种需求。这样的东西应该足够了:
var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
await buffer.Completion;
注意没有背压;一旦 StreamDataFromSql
可以推送数据,它就会被缓冲并存储在 ActionBlock
的传入队列中。根据数据的大小和类型,这会很快占用大量内存。
这是支持异步操作的 ForEachAsync
方法的一个版本。它将源 observable 投影到包含异步操作的嵌套 IObservable<IObservable<Unit>>
,然后使用 Merge
运算符将其扁平化回 IObservable<Unit>
。生成的 observable 最终转换为任务。
默认情况下,这些操作是按顺序调用的,但可以通过配置可选的 maximumConcurrency
参数来同时调用它们。
取消可选的 cancellationToken
参数会立即完成(取消)返回的 Task
,可能在取消当前 运行 操作之前。
任何可能发生的异常都会通过 Task
传播,并导致取消所有当前 运行 操作。
/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, Task> action,
CancellationToken cancellationToken = default,
int maximumConcurrency = 1)
{
// Arguments validation omitted
return source
.Select(item => Observable.FromAsync(ct => action(item, ct)))
.Merge(maximumConcurrency)
.DefaultIfEmpty()
.ToTask(cancellationToken);
}
用法示例:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));
我有代码可以从 SQL 流式传输数据并将其写入不同的存储区。代码大概是这样的:
using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}
我想为此目的使用 Reactive 扩展。理想情况下,代码如下所示:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));
但是,扩展方法 ForEachAsync 似乎只接受同步操作。是否可以编写一个接受异步操作的扩展?
这是关于 ToEnumerable 和 AsObservable 方法的the source code for ForEachAsync and an article
我们可以围绕 ForEachAsync 进行包装,等待任务返回函数:
public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
foreach ( var x in t.ToEnumerable() )
await onNext( x );
}
用法示例:
await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );
正确的做法是正确使用 Reactive Extensions 来完成这项工作 - 所以从创建连接开始直到写入数据。
方法如下:
IObservable<IList<MyData>> query =
Observable
.Using(() => new SqlConnection(""), connection =>
Observable
.Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
Observable
.Using(() => cmd.ExecuteReader(), reader =>
Observable
.While(() => reader.Read(), Observable.Return(GetRow(reader))))))
.Buffer(BatchSize);
IDisposable subscription =
query
.Subscribe(async list => await WriteDataAsync(list));
我无法测试代码,但它应该可以工作。此代码假定 WriteDataAsync
也可以采用 IList<MyData>
。如果它不只是掉落在 .ToList()
.
Would it be possible to write an extension which would accept an async action?
不直接。
Rx 订阅必须是同步的,因为 Rx 是一个基于推送的系统。当一个数据项到达时,它会遍历您的查询,直到它到达最终订阅 - 在本例中是执行 Action
.
Rx 提供的 await
-able 方法是 await
ing 序列 本身 - 即 ForEachAsync
是异步的序列(您正在异步等待序列完成),但 ForEachAsync
中的订阅(对每个元素采取的操作)必须仍然是同步的。
为了在您的数据管道中进行同步到异步的转换,您需要有一个缓冲区。 Rx 订阅可以(同步地)作为生产者添加到缓冲区,而异步消费者正在检索项目并处理它们。因此,您需要一个支持同步和异步操作的 producer/consumer 队列。
TPL Dataflow 中的各种块类型可以满足这种需求。这样的东西应该足够了:
var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
await buffer.Completion;
注意没有背压;一旦 StreamDataFromSql
可以推送数据,它就会被缓冲并存储在 ActionBlock
的传入队列中。根据数据的大小和类型,这会很快占用大量内存。
这是支持异步操作的 ForEachAsync
方法的一个版本。它将源 observable 投影到包含异步操作的嵌套 IObservable<IObservable<Unit>>
,然后使用 Merge
运算符将其扁平化回 IObservable<Unit>
。生成的 observable 最终转换为任务。
默认情况下,这些操作是按顺序调用的,但可以通过配置可选的 maximumConcurrency
参数来同时调用它们。
取消可选的 cancellationToken
参数会立即完成(取消)返回的 Task
,可能在取消当前 运行 操作之前。
任何可能发生的异常都会通过 Task
传播,并导致取消所有当前 运行 操作。
/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, Task> action,
CancellationToken cancellationToken = default,
int maximumConcurrency = 1)
{
// Arguments validation omitted
return source
.Select(item => Observable.FromAsync(ct => action(item, ct)))
.Merge(maximumConcurrency)
.DefaultIfEmpty()
.ToTask(cancellationToken);
}
用法示例:
await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));