为什么我的 Observable 中的 Buffer 不接收事件?
Why does Buffer from my Observable not receive events?
我已经尝试了几种方法,但都无法调用我的订阅方法:
方法一:
var buffer = new List<Kpi>();
buffer.ToObservable().Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
async kpis =>
{
await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
});
那么buffer.Add(new Kpi());
就不会触发我的方法了
方法 2:(注意:我已经阅读了特殊方法的定义 Empty/Never/Throw
但除此之外,我似乎无法找到一种方法来创建可发出某些东西的可观察对象除了原始数字等)
var buffer = Observable.Empty<Kpi>();
buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
async kpis =>
{
await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
});
然后buffer.Publish(new Kpi())
。再次没有任何反应
我哪里错了?
在第一种情况下,在 List
上调用 ToObservable
不会使 List
神奇地通知它的更改。列表根本没有那个功能。
在第二种情况下,Publish 做的事情与您的预期完全不同。
如果您想从事件中创建可观察对象,您正在寻找 Subject class。
var buffer = new Subject<Kpi>();
buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
async kpis =>
{
await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
});
// notify of new item
buffer.OnNext(new Kpi());
创建方法有很多种new observable sequence。我建议您仔细阅读它,看看是否有更适合您的。例如将事件变成可观察的。
我已经尝试了几种方法,但都无法调用我的订阅方法:
方法一:
var buffer = new List<Kpi>(); buffer.ToObservable().Buffer(TimeSpan.FromMinutes(1), 5).Subscribe( async kpis => { await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false); });
那么
buffer.Add(new Kpi());
就不会触发我的方法了
方法 2:(注意:我已经阅读了特殊方法的定义
Empty/Never/Throw
但除此之外,我似乎无法找到一种方法来创建可发出某些东西的可观察对象除了原始数字等)var buffer = Observable.Empty<Kpi>(); buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe( async kpis => { await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false); });
然后
buffer.Publish(new Kpi())
。再次没有任何反应
我哪里错了?
在第一种情况下,在 List
上调用 ToObservable
不会使 List
神奇地通知它的更改。列表根本没有那个功能。
在第二种情况下,Publish 做的事情与您的预期完全不同。
如果您想从事件中创建可观察对象,您正在寻找 Subject class。
var buffer = new Subject<Kpi>();
buffer.Buffer(TimeSpan.FromMinutes(1), 5).Subscribe(
async kpis =>
{
await _retry.ExecuteAsync(() => Process(kpis.ToList())).ConfigureAwait(false);
});
// notify of new item
buffer.OnNext(new Kpi());
创建方法有很多种new observable sequence。我建议您仔细阅读它,看看是否有更适合您的。例如将事件变成可观察的。