如何创建连续发射的 RxJava Observable?

How to create RxJava Observable that emits continuously?

我正在尝试创建一个 RxJava BlockingObservable,它将每隔 X 毫秒发出一个变量的值,直到 (condition == true) 或发生超时。

下面的代码似乎接近我想要的,但它总是发出 ONCE 然后退出。奇怪的是,我在 takeUntil() 中有一个条件,它永远不会是真的——我希望这个 observable 会持续发出并最终超时,但事实并非如此。

我哪里错了missing/doing?

Observable.fromCallable(() -> getSendWindow())
        .sample(10, TimeUnit.MILLISECONDS)
        .timeout(30, TimeUnit.SECONDS)
        .takeUntil(sendWindow -> 1==2)
        .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
        .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
        .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());

.sample 并不像您想象的那样。采样率将上述 Observable 限制为(最多)每 10 秒一次。

Observable.fromCallable() 只发出一次事件,然后完成。

.sample() 等待 10 秒并每 10 秒发出最后一个事件(如果有的话)。因此,当您将它附加到只有一个事件的 Observable 时,它​​只会发出一个事件。然后就完成了。

您可能真正想要的(我是 .net 程序员,所以请原谅我的外壳等)是这个。

编辑:感谢@akanokd 告诉我 java 对重复事件使用间隔。

Observable.interval(10, timeUnit.MILLISECONDS)
    .map(x -> getSendWindow())
    .takeUntil(sendWindow -> 1==2)
    .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
     .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
    .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());

随时使用 API 对 JAVA 特定版本的调用来编辑此答案...