可从链式任务中观察到
Observable from chained Tasks
我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一个项目应该通过对前一个项目(共同递归)的结果的异步调用来产生。用 "Generate" 的说法,这看起来像这样 - 除了 Generate 不支持异步 (也不支持初始状态的委托。
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
作为一个更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请按如下方式实现 ProduceFirst、Continue 和 ProduceNext:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
然后在 IObservable<IEnumerable<BrokeredMessage>>
上调用 .SelectMany(i => i)
将其变成 IObservable<BrokeredMessage>
其中_serviceBusReceiver是一个接口的实例如下:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
而 BrokeredMessage 来自 https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
我认为这可能是正确答案:
这不是一个好的答案。请勿使用。
我自己创建的 Generate
支持 async/await 初始状态 + 迭代函数:
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector
)
{
return Observable.Create<TResult>(async obs =>
{
var state = await initialState();
while (condition(state))
{
var result = resultSelector(state);
obs.OnNext(result);
state = await iterate(state);
}
obs.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
});
}
不幸的是,这个似乎有副作用,即消息的生产速度远远超过消费。如果观察者处理消息的速度很慢,那么这将在我们处理少量消息之前获取数百万条消息。不完全是我们想要的服务总线。
我将完成上述内容,也许还会阅读更多内容,如果需要,我会 post 提出更具体的问题。
如果您要推出自己的异步 Generate
函数,我建议使用递归调度而不是包装 while 循环。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
这有两个优点。首先,您可以取消它,使用简单的 while 循环无法直接取消它,事实上,在可观察对象完成之前,您甚至不需要 return 订阅函数。其次,这使您可以控制每个项目的 scheduling/asynchrony(这使测试变得轻而易举),这也使它更适合 library
经过大量测试后,我认为使用内置 Rx 运算符可以很好地完成工作。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
我用以下内容测试了这段代码:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
和运行这个序列:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
我得到了这个结果:
我的测试代码也使用了 Reactive Extension 团队的 Interactive Extensions - NuGet "Ix-Main".
我自己也有类似的问题,也同意以下评论:
I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).
我认为 Ix.NET 中的 IAsyncEnumerable
比 IObservable
更适合这种情况 - 无论是这里的问题还是任何类似的异步展开功能。原因是每次我们迭代然后从 Task
中提取结果时,流程控制与我们(调用者)一起拉取下一个项目,或者如果满足特定条件则选择不拉取。这类似于 IAsyncEnumerable
而不是 IObservable
,它在我们无法控制速率的情况下将项目推送给我们。
Ix.NET 没有合适的 AsyncEnumerable.Generate
版本所以我写了下面的来解决这个问题。
public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var started = false;
var current = default(TState);
return AsyncEnumerable.CreateEnumerator(async c =>
{
if (!started)
{
started = true;
var conditionMet = !c.IsCancellationRequested && condition(initialState);
if (conditionMet) current = initialState;
return conditionMet;
}
{
var newVal = await iterate(current).ConfigureAwait(false);
var conditionMet = !c.IsCancellationRequested && condition(newVal);
if (conditionMet) current = newVal;
return conditionMet;
}
},
() => current,
() => { });
});
}
备注:
- 仅进行了非常轻微的测试。
- 是否return初始状态。
- 不是return第一个不符合条件的 TState,即使它有
完成工作以获得该结果。可能不同的版本可以
包括那个。
- 我更愿意去掉
condition
参数,因为它是一个拉式系统,是否调用 MoveNext 完全取决于调用者,因此 condition
似乎是多余的。它实质上是将对 TakeWhile
的调用添加到
功能。但是我对 Ix.NET 的研究还不够深入
知道是否需要来自 MoveNext
的 false
响应才能进行排序
dispose
到 IAsyncEnumerator
,因此我将其包括在内。
如果需要特定类型,IAsyncEnumerable
当然可以转换为 IObservable
。
这是另一个实现,灵感来自 Enigmativity 的 . It uses newer language features (C# 7 tuple deconstruction)。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(observer =>
{
var (isFirst, current) = (true, default(TResult));
return Observable
.While(() => isFirst || condition(current),
Observable.If(() => isFirst,
Observable.FromAsync(ct => initialState()),
Observable.FromAsync(ct => iterate(current))
)
)
.Do(x => (isFirst, current) = (false, x))
.Select(x => resultSelector(x))
.ObserveOn(scheduler ?? Scheduler.Immediate)
.Subscribe(observer);
});
}
我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一个项目应该通过对前一个项目(共同递归)的结果的异步调用来产生。用 "Generate" 的说法,这看起来像这样 - 除了 Generate 不支持异步 (也不支持初始状态的委托。
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
作为一个更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请按如下方式实现 ProduceFirst、Continue 和 ProduceNext:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
然后在 IObservable<IEnumerable<BrokeredMessage>>
上调用 .SelectMany(i => i)
将其变成 IObservable<BrokeredMessage>
其中_serviceBusReceiver是一个接口的实例如下:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
而 BrokeredMessage 来自 https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
我认为这可能是正确答案:
这不是一个好的答案。请勿使用。
我自己创建的 Generate
支持 async/await 初始状态 + 迭代函数:
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector
)
{
return Observable.Create<TResult>(async obs =>
{
var state = await initialState();
while (condition(state))
{
var result = resultSelector(state);
obs.OnNext(result);
state = await iterate(state);
}
obs.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
});
}
不幸的是,这个似乎有副作用,即消息的生产速度远远超过消费。如果观察者处理消息的速度很慢,那么这将在我们处理少量消息之前获取数百万条消息。不完全是我们想要的服务总线。
我将完成上述内容,也许还会阅读更多内容,如果需要,我会 post 提出更具体的问题。
如果您要推出自己的异步 Generate
函数,我建议使用递归调度而不是包装 while 循环。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
这有两个优点。首先,您可以取消它,使用简单的 while 循环无法直接取消它,事实上,在可观察对象完成之前,您甚至不需要 return 订阅函数。其次,这使您可以控制每个项目的 scheduling/asynchrony(这使测试变得轻而易举),这也使它更适合 library
经过大量测试后,我认为使用内置 Rx 运算符可以很好地完成工作。
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
我用以下内容测试了这段代码:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
和运行这个序列:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
我得到了这个结果:
我的测试代码也使用了 Reactive Extension 团队的 Interactive Extensions - NuGet "Ix-Main".
我自己也有类似的问题,也同意以下评论:
I might be violating the spirit of the reactive paradigm but this is what I need at the moment - it should not continue pulling messages from a queue until they can be processed (at least in the near future).
我认为 Ix.NET 中的 IAsyncEnumerable
比 IObservable
更适合这种情况 - 无论是这里的问题还是任何类似的异步展开功能。原因是每次我们迭代然后从 Task
中提取结果时,流程控制与我们(调用者)一起拉取下一个项目,或者如果满足特定条件则选择不拉取。这类似于 IAsyncEnumerable
而不是 IObservable
,它在我们无法控制速率的情况下将项目推送给我们。
Ix.NET 没有合适的 AsyncEnumerable.Generate
版本所以我写了下面的来解决这个问题。
public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
{
return AsyncEnumerable.CreateEnumerable(() =>
{
var started = false;
var current = default(TState);
return AsyncEnumerable.CreateEnumerator(async c =>
{
if (!started)
{
started = true;
var conditionMet = !c.IsCancellationRequested && condition(initialState);
if (conditionMet) current = initialState;
return conditionMet;
}
{
var newVal = await iterate(current).ConfigureAwait(false);
var conditionMet = !c.IsCancellationRequested && condition(newVal);
if (conditionMet) current = newVal;
return conditionMet;
}
},
() => current,
() => { });
});
}
备注:
- 仅进行了非常轻微的测试。
- 是否return初始状态。
- 不是return第一个不符合条件的 TState,即使它有 完成工作以获得该结果。可能不同的版本可以 包括那个。
- 我更愿意去掉
condition
参数,因为它是一个拉式系统,是否调用 MoveNext 完全取决于调用者,因此condition
似乎是多余的。它实质上是将对TakeWhile
的调用添加到 功能。但是我对 Ix.NET 的研究还不够深入 知道是否需要来自MoveNext
的false
响应才能进行排序dispose
到IAsyncEnumerator
,因此我将其包括在内。
IAsyncEnumerable
当然可以转换为 IObservable
。
这是另一个实现,灵感来自 Enigmativity 的
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(observer =>
{
var (isFirst, current) = (true, default(TResult));
return Observable
.While(() => isFirst || condition(current),
Observable.If(() => isFirst,
Observable.FromAsync(ct => initialState()),
Observable.FromAsync(ct => iterate(current))
)
)
.Do(x => (isFirst, current) = (false, x))
.Select(x => resultSelector(x))
.ObserveOn(scheduler ?? Scheduler.Immediate)
.Subscribe(observer);
});
}