单任务异步 OnNext

async OnNext with single task

我有一个主题,每 100 毫秒获取一个 int。这个数字在每次迭代时递增。我订阅了这个主题,得到 50 块返回并等待几秒钟。 示例:

SequenceGenerator gen = new SequenceGenerator();
gen.subj.Buffer(50).Subscribe(
    async x =>
    {
        Console.WriteLine($"1: {x[x.Count - 1]}------");
        await Task.Delay(10000);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));

SequenceGenerator 仅执行以下操作:

int i = 0;
while (true)
{
    ++i;
    subj.OnNext(i.ToString());
    await Task.Delay(1);
}

每次缓冲区填满时都会调用"onnext"。尽管最后的 "onnext" 还没有完成。有没有办法调整这个?如果最后一个 "onnext" 完成,我只想调用下一个 "onnext"。 实际行为:

1: 50------
(wait ~0.5s)
1: 100------
...

预期行为:

1: 50------
(wait 10 seconds)
1: 100------
...

有办法吗? 谢谢你们! 最好的问候。

好的,我想我有一个解决方案。我找到了一个提示 here。 Zip 通过这种方法帮助了我很多。我需要另一个压缩到我的流中的主题。 至少这行得通:

SequenceGenerator gen = new SequenceGenerator();
Subject<bool> lockReleaser = new Subject<bool>();

Stopwatch watch = new Stopwatch();
watch.Start();
var test = gen.messageQueue.Buffer(TimeSpan.FromSeconds(4),5)
        .Zip(lockReleaser, (res, _) => res)
    .Subscribe(async x =>
    {
        if (x.Count > 0)
        {
            Console.WriteLine($"{watch.Elapsed}: {x[x.Count - 1]}------");
            await Task.Delay(10000);
        }
        lockReleaser .OnNext(true);
        Console.WriteLine($"1:------");
    },
    x => Console.WriteLine("EX"),
    () => Console.WriteLine("1 Done"));
lockReleaser.OnNext(true);