RxJava:在最后一次发射后开始心跳,直到下一次发射到达
RxJava : Start heartbeat after the last emission til the next one arrives
我正在处理 Http SSE 流,我必须保持 http 连接有效。为此,我每 15 秒发送一条空消息(命名为 heartbeat,所有其他消息命名为真实事件)。
source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat")))
此解决方案的缺点是,如果最后一个真实事件在 < 15 秒内发送,我将发送心跳事件。
我想要的是在最后收到的消息超过 15 秒后开始发送心跳事件,并在返回真实事件时停止发送此事件。
类似于以下内容:
---x-x-x------x----------x---------------h---------------h---------------h--x---------------h-x----------x---------------h----
- x 是真实事件
- h 是心跳事件
- - 等于 1 秒
感谢任何帮助:)
我猜 switchMap
运算符就是您要查找的内容:
source
.map(EventSource.Event::event)
.startWith(event("").withName("heartbeat"))
.switchMap (event -> interval(15, TimeUnit.SECONDS)
.map(t -> event("").withName("heartbeat"))
.startWith(event)
)
编辑
正如您在评论中提到的,即使还没有任何事件,您也需要启动 heardBeat。为此,您需要在 EventSource.Event
中添加一些 Init
class 并稍后使用 filter()
运算符忽略它。查看编辑后的代码。
编辑 2
删除了 initialDelay 并使其以 HeartBeat 事件开始
我正在处理 Http SSE 流,我必须保持 http 连接有效。为此,我每 15 秒发送一条空消息(命名为 heartbeat,所有其他消息命名为真实事件)。
source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat")))
此解决方案的缺点是,如果最后一个真实事件在 < 15 秒内发送,我将发送心跳事件。
我想要的是在最后收到的消息超过 15 秒后开始发送心跳事件,并在返回真实事件时停止发送此事件。
类似于以下内容:
---x-x-x------x----------x---------------h---------------h---------------h--x---------------h-x----------x---------------h----
- x 是真实事件
- h 是心跳事件
- - 等于 1 秒
感谢任何帮助:)
我猜 switchMap
运算符就是您要查找的内容:
source
.map(EventSource.Event::event)
.startWith(event("").withName("heartbeat"))
.switchMap (event -> interval(15, TimeUnit.SECONDS)
.map(t -> event("").withName("heartbeat"))
.startWith(event)
)
编辑
正如您在评论中提到的,即使还没有任何事件,您也需要启动 heardBeat。为此,您需要在 EventSource.Event
中添加一些 Init
class 并稍后使用 filter()
运算符忽略它。查看编辑后的代码。
编辑 2
删除了 initialDelay 并使其以 HeartBeat 事件开始