将 IObservable<Task<T>> 展开为 IObservable<T> 并保留顺序
Unwrapping IObservable<Task<T>> into IObservable<T> with order preservation
有没有办法将 IObservable<Task<T>>
解包为 IObservable<T>
,保持相同的事件顺序,就像这样?
Tasks: ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
假设我有一个使用消息流的桌面应用程序,其中一些需要大量 post- 处理:
IObservable<Message> streamOfMessages = ...;
IObservable<Task<Result>> streamOfTasks = streamOfMessages
.Select(async msg => await PostprocessAsync(msg));
IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
我设想了两种处理方法。
首先,我可以使用异步事件处理程序订阅 streamOfTasks
:
streamOfTasks.Subscribe(async task =>
{
var result = await task;
Display(result);
});
其次,我可以使用 Observable.Create
转换 streamOfTasks
,像这样:
var streamOfResults =
from task in streamOfTasks
from value in Observable.Create<T>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
// TODO: don't know when to call obs.OnComplete()
})
select value;
streamOfResults.Subscribe(result => Display(result));
无论哪种方式,消息的顺序都不会保留:一些较晚的消息
不需要任何 post-处理比之前的消息更快
需要 post-处理。我的两个解决方案都处理传入的消息
并行处理,但我希望按顺序逐一处理它们。
我可以编写一个简单的任务队列来一次只处理一个任务,
但也许这是一个矫枉过正。在我看来,我遗漏了一些明显的东西。
更新。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都不会保留事件的原始顺序。这是程序的输出:
Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0
这里是完整的源代码:
// "C:\Program Files (x86)\MSBuild.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;
class Program
{
static void Main()
{
Console.WriteLine("Press ENTER to exit.");
// the source stream
var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));
// solution #1: using async event handler
timerEvents.Subscribe(async x =>
{
var result = await PostprocessAsync(x);
Console.WriteLine($"Async handler: {x}");
});
// solution #2: using Observable.Create
var processedEventsV2 =
from task in timerEvents.Select(async x => await PostprocessAsync(x))
from value in Observable.Create<long>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
})
select value;
processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));
// solution #3: using FromAsync, as answered by @Enigmativity
var processedEventsV3 =
from msg in timerEvents
from result in Observable.FromAsync(() => PostprocessAsync(msg))
select result;
processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));
Console.ReadLine();
}
static async Task<long> PostprocessAsync(long x)
{
// some messages require long post-processing
if (x % 3 == 0)
{
await Task.Delay(TimeSpan.FromSeconds(2.5));
}
// and some don't
return x;
}
}
下面的简单方法是否适合您?
IObservable<Result> streamOfResults =
from msg in streamOfMessages
from result in Observable.FromAsync(() => PostprocessAsync(msg))
select result;
为了保持事件的顺序,您可以将您的信息流汇集到来自 TPL Dataflow 的 TransformBlock
。 TransformBlock
将执行您的 post 处理逻辑,并默认保持其输出顺序。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;
namespace HandlingStreamInOrder {
[TestFixture]
public class ItemHandlerTests {
[Test]
public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
var itemHandler = new ItemHandler();
var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
timerEvents.Subscribe(async x => {
var data = (int)x;
Console.WriteLine($"Value Produced: {x}");
var dataAccepted = await itemHandler.SendAsync((int)data);
if (dataAccepted) {
InputItems.Add(data);
}
});
await Task.Delay(5000);
itemHandler.Complete();
await itemHandler.Completion;
CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
}
private IList<int> InputItems {
get;
} = new List<int>();
}
public class ItemHandler {
public ItemHandler() {
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded,
MaxDegreeOfParallelism = Environment.ProcessorCount,
EnsureOrdered = true
};
PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);
var output = PostProcessBlock.AsObservable().Subscribe(x => {
Console.WriteLine($"Value Output: {x}");
OutputValues.Add(x);
});
}
public async Task<bool> SendAsync(int data) {
return await PostProcessBlock.SendAsync(data);
}
public void Complete() {
PostProcessBlock.Complete();
}
public Task Completion {
get { return PostProcessBlock.Completion; }
}
public IList<int> OutputValues {
get;
} = new List<int>();
private IPropagatorBlock<int, int> PostProcessBlock {
get;
}
private async Task<int> PostProcess(int data) {
if (data % 3 == 0) {
await Task.Delay(TimeSpan.FromSeconds(2));
}
return data;
}
}
}
Rx
and TPL
can be easily combined here, and TPL
默认保存事件顺序,因此您的代码可能是这样的:
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static async Task<long> PostprocessAsync(long x) { ... }
IObservable<Message> streamOfMessages = ...;
var streamOfTasks = new TransformBlock<long, long>(async msg =>
await PostprocessAsync(msg)
// set the concurrency level for messages to handle
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
// easily convert block into observable
IObservable<long> streamOfResults = streamOfTasks.AsObservable();
编辑:Rx
扩展旨在成为 UI 事件的反应管道。由于这类应用程序一般都是单线程的,所以消息都是在保存顺序的情况下处理的。但一般来说 events in C#
aren't thread safe,所以你必须提供一些额外的逻辑来保持相同的顺序。
如果你不喜欢引入另一个依赖的想法,你需要用 Interlocked
class 存储操作编号,像这样:
// counter for operations get started
int operationNumber = 0;
// counter for operations get done
int doneNumber = 0;
...
var currentOperationNumber = Interlocked.Increment(ref operationNumber);
...
while (Interlocked.CompareExchange(ref doneNumber, currentOperationNumber + 1, currentOperationNumber) != currentOperationNumber)
{
// spin once here
}
// handle event
Interlocked.Increment(ref doneNumber);
结合@Enigmativity 的 with @VMAtm's idea of and some code snippets from this SO question,我想出了这个解决方案:
// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));
processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));
// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....
这是我的 SelectAsync
扩展方法,用于将 IObservable<Task<TSource>>
转换为 IObservable<TResult>
并保持事件的原始顺序:
public static IObservable<TResult> SelectAsync<TSource, TResult>(
this IObservable<TSource> src,
Func<TSource, Task<TResult>> selectorAsync)
{
// using local variable for counter is easier than src.Scan(...)
var counter = 0;
var streamOfTasks =
from source in src
from result in Observable.FromAsync(async () => new
{
Index = Interlocked.Increment(ref counter) - 1,
Result = await selectorAsync(source)
})
select result;
// buffer the results coming out of order
return Observable.Create<TResult>(observer =>
{
var index = 0;
var buffer = new Dictionary<int, TResult>();
return streamOfTasks.Subscribe(item =>
{
buffer.Add(item.Index, item.Result);
TResult result;
while (buffer.TryGetValue(index, out result))
{
buffer.Remove(index);
observer.OnNext(result);
index++;
}
});
});
}
我对我的解决方案不是特别满意,因为它对我来说太复杂了,但至少它不需要任何外部依赖项。我在这里使用一个简单的字典来缓冲和重新排序任务结果,因为订阅者 need not to be thread-safe(订阅不需要同时调用)。
欢迎大家提出意见或建议。我仍然希望找到无需自定义缓冲扩展方法的原生 RX 方式。
RX 库包含三个可以解包可观察任务序列的运算符 Concat
, Merge
and Switch
。这三个都接受 IObservable<Task<T>>
类型的单个 source
参数,以及 return 和 IObservable<T>
。以下是文档中的描述:
Concat
Concatenates all task results, as long as the previous task terminated successfully.
Merge
Merges results from all source tasks into a single observable sequence.
Switch
Transforms an observable sequence of tasks into an observable sequence producing values only from the most recent observable sequence. Each time a new task is received, the previous task's result is ignored.
换句话说,Concat
return 是按原始顺序排列的结果,Merge
return 是按完成顺序排列的结果,Switch
过滤掉在发出下一个任务之前未完成的任务的所有结果。所以你的问题可以通过使用内置的 Concat
运算符来解决。不需要自定义运算符。
var streamOfResults = streamOfTasks
.Select(async task =>
{
var result1 = await task;
var result2 = await PostprocessAsync(result1);
return result2;
})
.Concat();
任务在 streamOfTasks
发出之前就已经开始了。换句话说,它们正处于 "hot" 状态。因此 Concat
运算符一个接一个地等待它们这一事实对操作的并发性没有影响。它只会影响结果的顺序。如果不是热任务而是冷可观察对象,这将是一个考虑因素,比如由 Observable.FromAsync
和 Observable.Create
方法创建的这些,在这种情况下 Concat
将按顺序执行操作。
有没有办法将 IObservable<Task<T>>
解包为 IObservable<T>
,保持相同的事件顺序,就像这样?
Tasks: ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->
假设我有一个使用消息流的桌面应用程序,其中一些需要大量 post- 处理:
IObservable<Message> streamOfMessages = ...;
IObservable<Task<Result>> streamOfTasks = streamOfMessages
.Select(async msg => await PostprocessAsync(msg));
IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks
我设想了两种处理方法。
首先,我可以使用异步事件处理程序订阅 streamOfTasks
:
streamOfTasks.Subscribe(async task =>
{
var result = await task;
Display(result);
});
其次,我可以使用 Observable.Create
转换 streamOfTasks
,像这样:
var streamOfResults =
from task in streamOfTasks
from value in Observable.Create<T>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
// TODO: don't know when to call obs.OnComplete()
})
select value;
streamOfResults.Subscribe(result => Display(result));
无论哪种方式,消息的顺序都不会保留:一些较晚的消息 不需要任何 post-处理比之前的消息更快 需要 post-处理。我的两个解决方案都处理传入的消息 并行处理,但我希望按顺序逐一处理它们。
我可以编写一个简单的任务队列来一次只处理一个任务, 但也许这是一个矫枉过正。在我看来,我遗漏了一些明显的东西。
更新。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都不会保留事件的原始顺序。这是程序的输出:
Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0
这里是完整的源代码:
// "C:\Program Files (x86)\MSBuild.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll
using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;
class Program
{
static void Main()
{
Console.WriteLine("Press ENTER to exit.");
// the source stream
var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));
// solution #1: using async event handler
timerEvents.Subscribe(async x =>
{
var result = await PostprocessAsync(x);
Console.WriteLine($"Async handler: {x}");
});
// solution #2: using Observable.Create
var processedEventsV2 =
from task in timerEvents.Select(async x => await PostprocessAsync(x))
from value in Observable.Create<long>(async (obs, cancel) =>
{
var v = await task;
obs.OnNext(v);
})
select value;
processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));
// solution #3: using FromAsync, as answered by @Enigmativity
var processedEventsV3 =
from msg in timerEvents
from result in Observable.FromAsync(() => PostprocessAsync(msg))
select result;
processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));
Console.ReadLine();
}
static async Task<long> PostprocessAsync(long x)
{
// some messages require long post-processing
if (x % 3 == 0)
{
await Task.Delay(TimeSpan.FromSeconds(2.5));
}
// and some don't
return x;
}
}
下面的简单方法是否适合您?
IObservable<Result> streamOfResults =
from msg in streamOfMessages
from result in Observable.FromAsync(() => PostprocessAsync(msg))
select result;
为了保持事件的顺序,您可以将您的信息流汇集到来自 TPL Dataflow 的 TransformBlock
。 TransformBlock
将执行您的 post 处理逻辑,并默认保持其输出顺序。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;
namespace HandlingStreamInOrder {
[TestFixture]
public class ItemHandlerTests {
[Test]
public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
var itemHandler = new ItemHandler();
var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
timerEvents.Subscribe(async x => {
var data = (int)x;
Console.WriteLine($"Value Produced: {x}");
var dataAccepted = await itemHandler.SendAsync((int)data);
if (dataAccepted) {
InputItems.Add(data);
}
});
await Task.Delay(5000);
itemHandler.Complete();
await itemHandler.Completion;
CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
}
private IList<int> InputItems {
get;
} = new List<int>();
}
public class ItemHandler {
public ItemHandler() {
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded,
MaxDegreeOfParallelism = Environment.ProcessorCount,
EnsureOrdered = true
};
PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);
var output = PostProcessBlock.AsObservable().Subscribe(x => {
Console.WriteLine($"Value Output: {x}");
OutputValues.Add(x);
});
}
public async Task<bool> SendAsync(int data) {
return await PostProcessBlock.SendAsync(data);
}
public void Complete() {
PostProcessBlock.Complete();
}
public Task Completion {
get { return PostProcessBlock.Completion; }
}
public IList<int> OutputValues {
get;
} = new List<int>();
private IPropagatorBlock<int, int> PostProcessBlock {
get;
}
private async Task<int> PostProcess(int data) {
if (data % 3 == 0) {
await Task.Delay(TimeSpan.FromSeconds(2));
}
return data;
}
}
}
Rx
and TPL
can be easily combined here, and TPL
默认保存事件顺序,因此您的代码可能是这样的:
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static async Task<long> PostprocessAsync(long x) { ... }
IObservable<Message> streamOfMessages = ...;
var streamOfTasks = new TransformBlock<long, long>(async msg =>
await PostprocessAsync(msg)
// set the concurrency level for messages to handle
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
// easily convert block into observable
IObservable<long> streamOfResults = streamOfTasks.AsObservable();
编辑:Rx
扩展旨在成为 UI 事件的反应管道。由于这类应用程序一般都是单线程的,所以消息都是在保存顺序的情况下处理的。但一般来说 events in C#
aren't thread safe,所以你必须提供一些额外的逻辑来保持相同的顺序。
如果你不喜欢引入另一个依赖的想法,你需要用 Interlocked
class 存储操作编号,像这样:
// counter for operations get started
int operationNumber = 0;
// counter for operations get done
int doneNumber = 0;
...
var currentOperationNumber = Interlocked.Increment(ref operationNumber);
...
while (Interlocked.CompareExchange(ref doneNumber, currentOperationNumber + 1, currentOperationNumber) != currentOperationNumber)
{
// spin once here
}
// handle event
Interlocked.Increment(ref doneNumber);
结合@Enigmativity 的
// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));
processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));
// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....
这是我的 SelectAsync
扩展方法,用于将 IObservable<Task<TSource>>
转换为 IObservable<TResult>
并保持事件的原始顺序:
public static IObservable<TResult> SelectAsync<TSource, TResult>(
this IObservable<TSource> src,
Func<TSource, Task<TResult>> selectorAsync)
{
// using local variable for counter is easier than src.Scan(...)
var counter = 0;
var streamOfTasks =
from source in src
from result in Observable.FromAsync(async () => new
{
Index = Interlocked.Increment(ref counter) - 1,
Result = await selectorAsync(source)
})
select result;
// buffer the results coming out of order
return Observable.Create<TResult>(observer =>
{
var index = 0;
var buffer = new Dictionary<int, TResult>();
return streamOfTasks.Subscribe(item =>
{
buffer.Add(item.Index, item.Result);
TResult result;
while (buffer.TryGetValue(index, out result))
{
buffer.Remove(index);
observer.OnNext(result);
index++;
}
});
});
}
我对我的解决方案不是特别满意,因为它对我来说太复杂了,但至少它不需要任何外部依赖项。我在这里使用一个简单的字典来缓冲和重新排序任务结果,因为订阅者 need not to be thread-safe(订阅不需要同时调用)。
欢迎大家提出意见或建议。我仍然希望找到无需自定义缓冲扩展方法的原生 RX 方式。
RX 库包含三个可以解包可观察任务序列的运算符 Concat
, Merge
and Switch
。这三个都接受 IObservable<Task<T>>
类型的单个 source
参数,以及 return 和 IObservable<T>
。以下是文档中的描述:
Concat
Concatenates all task results, as long as the previous task terminated successfully.
Merge
Merges results from all source tasks into a single observable sequence.
Switch
Transforms an observable sequence of tasks into an observable sequence producing values only from the most recent observable sequence. Each time a new task is received, the previous task's result is ignored.
换句话说,Concat
return 是按原始顺序排列的结果,Merge
return 是按完成顺序排列的结果,Switch
过滤掉在发出下一个任务之前未完成的任务的所有结果。所以你的问题可以通过使用内置的 Concat
运算符来解决。不需要自定义运算符。
var streamOfResults = streamOfTasks
.Select(async task =>
{
var result1 = await task;
var result2 = await PostprocessAsync(result1);
return result2;
})
.Concat();
任务在 streamOfTasks
发出之前就已经开始了。换句话说,它们正处于 "hot" 状态。因此 Concat
运算符一个接一个地等待它们这一事实对操作的并发性没有影响。它只会影响结果的顺序。如果不是热任务而是冷可观察对象,这将是一个考虑因素,比如由 Observable.FromAsync
和 Observable.Create
方法创建的这些,在这种情况下 Concat
将按顺序执行操作。