RxJava:在最后一次发射后开始心跳,直到下一次发射到达

RxJava : Start heartbeat after the last emission til the next one arrives

我正在处理 Http SSE 流,我必须保持 http 连接有效。为此,我每 15 秒发送一条空消息(命名为 heartbeat,所有其他消息命名为真实事件)。

source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat"))) 

此解决方案的缺点是,如果最后一个真实事件在 < 15 秒内发送,我将发送心跳事件。

我想要的是在最后收到的消息超过 15 秒后开始发送心跳事件,并在返回真实事件时停止发送此事件。

类似于以下内容:

---x-x-x------x----------x---------------h---------------h---------------h--x---------------h-x----------x---------------h----

感谢任何帮助:)

我猜 switchMap 运算符就是您要查找的内容:

source
    .map(EventSource.Event::event)
    .startWith(event("").withName("heartbeat"))
    .switchMap (event -> interval(15, TimeUnit.SECONDS)
        .map(t -> event("").withName("heartbeat"))
        .startWith(event)
    )

编辑

正如您在评论中提到的,即使还没有任何事件,您也需要启动 heardBeat。为此,您需要在 EventSource.Event 中添加一些 Init class 并稍后使用 filter() 运算符忽略它。查看编辑后的代码。

编辑 2

删除了 initialDelay 并使其以 HeartBeat 事件开始