等到上一个事件完成 Rx

Wait until previous event finish Rx

在我的代码中,我有一些事件需要按顺序处理,而其他事件需要并行处理。

我设法做了并行部分,但我不能做顺序版本。

event.ofType(Message.class)
    .groupBy(Message::getGroup)
    .flatMap(g -> {
        if (g.getKey() == Message.GROUP_PARALLEL)
            return g.flatMap(m -> Observable.just(m)
                        .observeOn(Schedulers.computation())
                        .doOnNext(this::send)
                        .delay(m.getRetryInterval(), TimeUnit.SECONDS)
                        .repeat(m.getRetryCount())
                        .takeUntil(event.ofType(Sent.class)
                            .filter(a -> a.getId() == m.getId())
                        )
                        .map(s -> m)
                    );
        else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
            // ?
        }
    })

我想这样实现:

When an event is received
    If the previous event is still being processed, wait before emission
    Process the event (blocking)

所以:

    event.subscribeOn(Schedulers.computation())
        .waitUntilPreviousComplete()
        .flatMap(this::processEvent) // we assume it take 1 seconds to complete
        .susbcribe(this::processed, this::onError);

    event.onNext(Event.A)
    event.onNext(Event.B)
    event.onNext(Event.C)

应该结果:

    12h00:00 processed call with event A
    12h00:01 processed call with event B
    12h00:02 processed call with event C

所以我想要一些看起来像队列的东西。我用 HashMap 和 Subjects 尝试了奇怪的事情,但我确信有一种干净的方法可以做到这一点。我该如何实施?

听起来您正在寻找 concatMap。该操作员将按顺序订阅上游发出的项目,并且在前一个 on 完成之前不会移动到下一个项目。所以您的示例代码将如下所示:

event.subscribeOn(Schedulers.computation())
    .concatMap(this::processEvent)
    .subscribe(this::processed, this::onError);