等到上一个事件完成 Rx
Wait until previous event finish Rx
在我的代码中,我有一些事件需要按顺序处理,而其他事件需要并行处理。
我设法做了并行部分,但我不能做顺序版本。
event.ofType(Message.class)
.groupBy(Message::getGroup)
.flatMap(g -> {
if (g.getKey() == Message.GROUP_PARALLEL)
return g.flatMap(m -> Observable.just(m)
.observeOn(Schedulers.computation())
.doOnNext(this::send)
.delay(m.getRetryInterval(), TimeUnit.SECONDS)
.repeat(m.getRetryCount())
.takeUntil(event.ofType(Sent.class)
.filter(a -> a.getId() == m.getId())
)
.map(s -> m)
);
else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
// ?
}
})
我想这样实现:
When an event is received
If the previous event is still being processed, wait before emission
Process the event (blocking)
所以:
event.subscribeOn(Schedulers.computation())
.waitUntilPreviousComplete()
.flatMap(this::processEvent) // we assume it take 1 seconds to complete
.susbcribe(this::processed, this::onError);
event.onNext(Event.A)
event.onNext(Event.B)
event.onNext(Event.C)
应该结果:
12h00:00 processed call with event A
12h00:01 processed call with event B
12h00:02 processed call with event C
所以我想要一些看起来像队列的东西。我用 HashMap 和 Subjects 尝试了奇怪的事情,但我确信有一种干净的方法可以做到这一点。我该如何实施?
听起来您正在寻找 concatMap
。该操作员将按顺序订阅上游发出的项目,并且在前一个 on 完成之前不会移动到下一个项目。所以您的示例代码将如下所示:
event.subscribeOn(Schedulers.computation())
.concatMap(this::processEvent)
.subscribe(this::processed, this::onError);
在我的代码中,我有一些事件需要按顺序处理,而其他事件需要并行处理。
我设法做了并行部分,但我不能做顺序版本。
event.ofType(Message.class)
.groupBy(Message::getGroup)
.flatMap(g -> {
if (g.getKey() == Message.GROUP_PARALLEL)
return g.flatMap(m -> Observable.just(m)
.observeOn(Schedulers.computation())
.doOnNext(this::send)
.delay(m.getRetryInterval(), TimeUnit.SECONDS)
.repeat(m.getRetryCount())
.takeUntil(event.ofType(Sent.class)
.filter(a -> a.getId() == m.getId())
)
.map(s -> m)
);
else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
// ?
}
})
我想这样实现:
When an event is received
If the previous event is still being processed, wait before emission
Process the event (blocking)
所以:
event.subscribeOn(Schedulers.computation())
.waitUntilPreviousComplete()
.flatMap(this::processEvent) // we assume it take 1 seconds to complete
.susbcribe(this::processed, this::onError);
event.onNext(Event.A)
event.onNext(Event.B)
event.onNext(Event.C)
应该结果:
12h00:00 processed call with event A
12h00:01 processed call with event B
12h00:02 processed call with event C
所以我想要一些看起来像队列的东西。我用 HashMap 和 Subjects 尝试了奇怪的事情,但我确信有一种干净的方法可以做到这一点。我该如何实施?
听起来您正在寻找 concatMap
。该操作员将按顺序订阅上游发出的项目,并且在前一个 on 完成之前不会移动到下一个项目。所以您的示例代码将如下所示:
event.subscribeOn(Schedulers.computation())
.concatMap(this::processEvent)
.subscribe(this::processed, this::onError);