创建像环形缓冲区一样批处理的 Observable(需要建议)

Creating Observable that batches like ring buffer (advice needed)

中找不到适合我的问题的解决方案后,我决定实施此解决方案。

但是我缺乏使用 monad 的经验,像 lift(..) 这样的东西对我来说仍然看起来有点神奇......

我打开这个的目的是让那些在 rxjava 之上实现了一些自定义东西的人可以给我关于如何实现这个的建议。

现在这是怎么回事,这是界面。

我想这对你们大多数人来说都是不言自明的,但为了确保我会举一个例子。

假设我们有一个订阅者(消费者),它实际上对数据库进行持久化,很明显,如果你给它 1 个或 1000 个对象来持久化,差异不会是 1000 倍,而是 10 倍或更少,这意味着它是一个可以采用加载的消费者......所以一次推送一个项目是愚蠢的,而你可以一次坚持多个项目,另一方面,等待一批 N 元素填满是愚蠢的直到你坚持(一秒钟你可能会得到 1000 个元素,另一秒钟你可能会得到 none,所以假设我们不知道传入数据的频率)...

所以我们现在拥有的是 Observable.batch(),它会要求批处理的大小为 N,我们通常会等待而不工作......另一方面,我们有 Disruptor,它确实可以做到我们想要但没有提供 Observable 的漂亮界面... Disruptor 将处理单个元素,当您处理它时,它将收集所有传入的元素,下次您将获得所有元素的批次这是因为您的消费者忙于最后一个值而收集的...

目前我想我应该使用 Observable.from() 来实现这个或者 lift()...

请分享您对此的想法,也许已经有我不知道的可用解决方案,或者我将以错误的方式实现它...

这是一个运算符,可以对异步边界后面堆积的值进行批处理:

public final class OperatorRequestBatcher<T> 
implements Operator<List<T>, T> {
    final Scheduler scheduler;
    public OperatorRequestBatcher(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
    @Override
    public Subscriber<? super T> call(Subscriber<? super List<T>> t) {
        Scheduler.Worker w = scheduler.createWorker();
        RequestBatcherSubscriber<T> parent = 
                new RequestBatcherSubscriber<>(t, w);

        t.add(w);
        t.add(parent);

        return parent;
    }

    static final class RequestBatcherSubscriber<T> 
    extends Subscriber<T> implements Action0 {
        final Subscriber<? super List<T>> actual;
        final Scheduler.Worker w;
        final Queue<T> queue;
        final AtomicInteger wip;

        volatile boolean done;
        Throwable error;

        public RequestBatcherSubscriber(
                Subscriber<? super List<T>> actual, 
                Scheduler.Worker w) {
            this.actual = actual;
            this.w = w;
            this.wip = new AtomicInteger();
            this.queue = new SpscLinkedArrayQueue<>(256);
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            queue.offer(t);
            schedule();
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                return;
            }
            error = e;
            done = true;
            schedule();
        }

        @Override
        public void onCompleted() {
            done = true;
            schedule();
        }

        void schedule() {
            if (wip.getAndIncrement() == 0) {
                w.schedule(this);
            }
        }

        @Override
        public void call() {
            int missed = 1;

            final Queue<T> q = queue;
            final Subscriber<? super List<T>> a = actual;
            final AtomicInteger wip = this.wip;

            for (;;) {

                List<T> list = new ArrayList<>();

                for (;;) {
                    boolean d = done;
                    T v = q.poll();
                    boolean e = v == null;

                    if (isUnsubscribed()) {
                        q.clear();
                        return;
                    }

                    if (d) {
                        Throwable err = error;
                        if (err != null) {
                            a.onError(err);
                            return;
                        } else
                        if (e) {
                            if (!list.isEmpty()) {
                                a.onNext(list);
                            }
                            a.onCompleted();
                            return;
                        }
                    }

                    if (e) {
                        break;
                    }

                    list.add(v);
                }

                if (!list.isEmpty()) {
                    a.onNext(list);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }

    public static void main(String[] args) {
        PublishSubject<Integer> ps = PublishSubject.create();
        TestScheduler sch = Schedulers.test();

        ps.lift(new OperatorRequestBatcher<>(sch))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        ps.onNext(1);
        ps.onNext(2);

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

        ps.onNext(3);

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

        ps.onNext(4);
        ps.onNext(5);
        ps.onNext(6);
        ps.onCompleted();

        sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    }
}

但是请注意,您在 API 中描述的是热 Observable 的一种形式:冷源不跨多个订阅者进行协调。为此,您需要创建自定义 ConnectableObservable

publish() 可能对 disruptForEachSubscriber 有效,但 publish().observeOn()disruptForAllSubscriber 不太可能,因为 observeOn 会请求一堆值,而 publish 会将其解释为成功处理 N 个批次。

这不是一个答案(akarnokd 已经给出了一个很棒的答案),只是一个更长的评论。

这是一个我也觉得非常有用的运算符。每当我有一个热源和一个慢速订阅者时,我都会使用它——我使用一些聚合策略来聚合在消费过程中堆积的事件,并消费聚合结果。

示例:

  • 一个反应式绘图库。用户移动鼠标,拖动绘图,但是渲染很慢,所以我在渲染新版本之前聚合了用户的动作
  • 再次绘图,这次显示通过 TCP/IP 从服务器延迟获取的数据。当用户显示长图的新部分时,将生成对服务器新数据的请求。在发送新的 TCP 请求数据包之前,我将所有请求聚合为一个。
  • 是这个问题的一个更具体的版本,其中 "aggregation" 只是取最后一个元素。

在 C# 世界中,此运算符称为 . I've managed to write my own version using ,但我一定会在有时间时查看这段代码。

在我的实现中有一个稍微不同的地方是我的变体有一个参数,sendFinalBufferedItemsBeforeCompletion: Boolean,指定取消订阅行为应该是什么样子。当true时,它发送OnComplete之前积累的最后一批项目,当false时,它立即OnCompletes。对于我的用例,两种用法都有意义。