使用 Rx 批处理大型结果集

Batching large result sets using Rx

我有一个有趣的问题要问 Rx 专家。我有一个关系 table 保存有关事件的信息。事件由 id、类型和发生时间组成。在我的代码中,我需要获取某个可能很宽的时间范围内的所有事件。

SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER        BY time LIMIT :batch_size

为了提高可靠性和处理大型结果集,我按大小批量查询记录:batch_size。现在,我想编写一个函数,给定 :before 和 :after,将 return 一个表示结果集的 Observable。

Observable<Event> getEvents(long before, long after);

在内部,该函数应该批量查询数据库。事件在时间尺度上的分布是未知的。所以解决批处理的自然方法是这样的: 获取前 N 条记录 如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止 如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止 ...等等(思路应该清楚了)

我的问题是:

有没有一种方法可以根据更高级别的 Observable 原语(filter/map/flatMap/scan/range 等)来表达此功能,而无需显式使用订阅者?

到目前为止,我没有做到这一点,而是提出了以下简单的代码:

private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) {
    long start = before;
    while (start < after) {
        final List<Event> records;
        try {
            records = getRecordsByRange(start, after);
        } catch (Exception e) {
            subscriber.onError(e);
            return;
        }
        if (records.isEmpty()) break;
        records.forEach(subscriber::onNext);
        start = Iterables.getLast(records).getTime();
    }

    subscriber.onCompleted();
}

public Observable<Event> getRecords(final long before, final long after) {
        return Observable.create(subscriber -> observeGetRecords(before, after, subscriber));
}

此处,getRecordsByRange 使用 DBI 和 returns 列表实现 SELECT 查询。此代码工作正常,但缺乏高级 Rx 构造的优雅。

注意:我知道我可以 return 迭代器作为 DBI 中 SELECT 查询的结果。但是,我不想这样做,而是更喜欢 运行 多个查询。此计算不必是原子的,因此事务隔离的问题不相关。

虽然我不完全理解你为什么想要这样的时间重用,但我会这样做:

BehaviorSubject<Long> start = BehaviorSubject.create(0L);

start
.subscribeOn(Schedulers.trampoline())
.flatMap(tstart -> 
    getEvents(tstart, tstart + twindow)
    .publish(o -> 
         o.takeLast(1)
         .doOnNext(r -> start.onNext(r.time))
         .ignoreElements()
         .mergeWith(o)
    )
)
.subscribe(...)