单任务异步 OnNext
async OnNext with single task
我有一个主题,每 100 毫秒获取一个 int。这个数字在每次迭代时递增。我订阅了这个主题,得到 50 块返回并等待几秒钟。
示例:
SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
async x =>
{
Console.WriteLine($"1: {x[x.Count - 1]}------");
await Task.Delay(10000);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
SequenceGenerator 仅执行以下操作:
int i = 0;
while (true)
{
++i;
subj.OnNext(i.ToString());
await Task.Delay(1);
}
每次缓冲区填满时都会调用"onnext"。尽管最后的 "onnext" 还没有完成。有没有办法调整这个?如果最后一个 "onnext" 完成,我只想调用下一个 "onnext"。
实际行为:
1: 50------
(wait ~0.5s)
1: 100------
...
预期行为:
1: 50------
(wait 10 seconds)
1: 100------
...
有办法吗?
谢谢你们!
最好的问候。
好的,我想我有一个解决方案。我找到了一个提示 here。 Zip 通过这种方法帮助了我很多。我需要另一个压缩到我的流中的主题。
至少这行得通:
SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();
Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
.Zip(lockReleaser, (res, _) => res)
.Subscribe(async x =>
{
if (x.Count > 0)
{
Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
await Task.Delay(10000);
}
lockReleaser .OnNext(true);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);
我有一个主题,每 100 毫秒获取一个 int。这个数字在每次迭代时递增。我订阅了这个主题,得到 50 块返回并等待几秒钟。 示例:
SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
async x =>
{
Console.WriteLine($"1: {x[x.Count - 1]}------");
await Task.Delay(10000);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
SequenceGenerator 仅执行以下操作:
int i = 0;
while (true)
{
++i;
subj.OnNext(i.ToString());
await Task.Delay(1);
}
每次缓冲区填满时都会调用"onnext"。尽管最后的 "onnext" 还没有完成。有没有办法调整这个?如果最后一个 "onnext" 完成,我只想调用下一个 "onnext"。 实际行为:
1: 50------
(wait ~0.5s)
1: 100------
...
预期行为:
1: 50------
(wait 10 seconds)
1: 100------
...
有办法吗? 谢谢你们! 最好的问候。
好的,我想我有一个解决方案。我找到了一个提示 here。 Zip 通过这种方法帮助了我很多。我需要另一个压缩到我的流中的主题。 至少这行得通:
SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();
Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
.Zip(lockReleaser, (res, _) => res)
.Subscribe(async x =>
{
if (x.Count > 0)
{
Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
await Task.Delay(10000);
}
lockReleaser .OnNext(true);
Console.WriteLine($"1:------");
},
x => Console.WriteLine("EX"),
() => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);