rxJava 与 throttleFirst 运算符相反(不是节流而是收集)

rxJava opposite for throttleFirst operator (not throttling but collecting)

我需要以下运算符,它在元素出现时启动计时器(为元素创建 window)并将它们收集到 ListObservable/Flowable 中。当为定时器指定的时间结束并且没有元素到来时,运算符不发送空事件。当下一个元素到来时,创建新的计时器并开始收集元素。

Rx java 有 Buffer 和 Window 运算符,但是这个运算符有缺点:

可以过滤这个空元素,但我想避免用 List/Observable-s/Flowable-s.

的空事件(基于计时器)污染调度程序

我花了一些时间,发现在形式上非常相似,但在功能上却扮演着相反的角色 throttleFirst(long windowDuration, TimeUnit unit) https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png

但不幸的是它节流了,但没有收集物品。

除了 onComplete,您想发生什么?你能把 buffer()window()switchIfEmpty() 组合起来吗?

//No emissions or on complete
source.window(...).switchIfEmpty(Observable.never());

//Empty list emission
source.window(...).switchIfEmpty(Observable.just(Collections.emptyList()));

我不认为可以节省创建对象的时间,因此如果您想避免出现空列表,请将它们过滤掉。

至于在新物品迟到时启动新的周期性计时器,我想不出现有运算符的任何组合都可以在不丢失物品的情况下做到这一点。

我创建了以下可以在不丢失物品的情况下完成的“装置”:

public static final class BufferWithTimeout<T> {

    Scheduler.Worker trampoline = Schedulers.trampoline().createWorker();

    final long timeout;
    
    final TimeUnit unit;
    
    final Scheduler.Worker worker;
    
    final SerialDisposable timer = new SerialDisposable();
    
    final PublishSubject<List<T>> output = PublishSubject.create();
    
    List<T> current;
    
    long bufferIndex;
    
    BufferWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
        this.worker = scheduler.createWorker();
        this.timeout = timeout;
        this.unit = unit;
    }
    
    void onValue(T value) {
        trampoline.schedule(() -> {
            if (timer.isDisposed()) {
                return;
            }
            if (current == null) {
                current = new ArrayList<>();
                long bi = ++bufferIndex;
                timer.set(worker.schedulePeriodically(() -> {
                    onTime(bi);
                }, timeout, timeout, unit));
            }
            current.add(value);
        });
    }

    void onTime(long index) {
        trampoline.schedule(() -> {
            if (index == bufferIndex && current != null) {
                if (current.isEmpty()) {
                    current = null;
                    bufferIndex++;
                    timer.set(null);
                } else {
                    output.onNext(current);
                    current = new ArrayList<>();
                }
            }
        });
    }

    void onTerminate(Throwable error) {
        timer.dispose();
        worker.dispose();
        trampoline.schedule(() -> {
            if (current != null && !current.isEmpty()) {
                output.onNext(current);
                current = null;
            }
            if (error != null) {
                output.onError(error);
            } else {
                output.onComplete();
            }
        });
    }
    
    void dispose() {
        timer.dispose();
        worker.dispose();
        trampoline.schedule(() -> {
            current = null;
        });
    }

    public static <T> ObservableTransformer<T, List<T>> create(
            long timeout, TimeUnit unit, Scheduler scheduler) {
        return o -> 
            Observable.defer(() -> {
                BufferWithTimeout<T> state = new BufferWithTimeout<>(
                    timeout, unit, scheduler);

                return  o
                        .doOnNext(v -> state.onValue(v))
                        .doOnError(e -> state.onTerminate(e))
                        .doOnComplete(() -> state.onTerminate(null))
                        .ignoreElements()
                        .<List<T>>toObservable()
                        .mergeWith(state.output.doOnDispose(state::dispose));
            });
    }
}

您可以通过以下方式尝试:

// generate events over time
        Observable.fromArray(1, 2, 3, 5, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31)
        .flatMap(v -> Observable.timer(v * 100, TimeUnit.MILLISECONDS).map(w -> v))

// apply operator
        .compose(BufferWithTimeout.create(
             700, TimeUnit.MILLISECONDS, Schedulers.computation()
        ))

// wait for it all
        .blockingSubscribe(System.out::println);

请注意,虽然这会为每个源元素创建更多的对象,但有办法解决它,但它会变得更复杂。