使用 Rx 批处理大型结果集
Batching large result sets using Rx
我有一个有趣的问题要问 Rx 专家。我有一个关系 table 保存有关事件的信息。事件由 id、类型和发生时间组成。在我的代码中,我需要获取某个可能很宽的时间范围内的所有事件。
SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER BY time LIMIT :batch_size
为了提高可靠性和处理大型结果集,我按大小批量查询记录:batch_size。现在,我想编写一个函数,给定 :before 和 :after,将 return 一个表示结果集的 Observable。
Observable<Event> getEvents(long before, long after);
在内部,该函数应该批量查询数据库。事件在时间尺度上的分布是未知的。所以解决批处理的自然方法是这样的:
获取前 N 条记录
如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止
如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止
...等等(思路应该清楚了)
我的问题是:
有没有一种方法可以根据更高级别的 Observable 原语(filter/map/flatMap/scan/range 等)来表达此功能,而无需显式使用订阅者?
到目前为止,我没有做到这一点,而是提出了以下简单的代码:
private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) {
long start = before;
while (start < after) {
final List<Event> records;
try {
records = getRecordsByRange(start, after);
} catch (Exception e) {
subscriber.onError(e);
return;
}
if (records.isEmpty()) break;
records.forEach(subscriber::onNext);
start = Iterables.getLast(records).getTime();
}
subscriber.onCompleted();
}
public Observable<Event> getRecords(final long before, final long after) {
return Observable.create(subscriber -> observeGetRecords(before, after, subscriber));
}
此处,getRecordsByRange 使用 DBI 和 returns 列表实现 SELECT 查询。此代码工作正常,但缺乏高级 Rx 构造的优雅。
注意:我知道我可以 return 迭代器作为 DBI 中 SELECT 查询的结果。但是,我不想这样做,而是更喜欢 运行 多个查询。此计算不必是原子的,因此事务隔离的问题不相关。
虽然我不完全理解你为什么想要这样的时间重用,但我会这样做:
BehaviorSubject<Long> start = BehaviorSubject.create(0L);
start
.subscribeOn(Schedulers.trampoline())
.flatMap(tstart ->
getEvents(tstart, tstart + twindow)
.publish(o ->
o.takeLast(1)
.doOnNext(r -> start.onNext(r.time))
.ignoreElements()
.mergeWith(o)
)
)
.subscribe(...)
我有一个有趣的问题要问 Rx 专家。我有一个关系 table 保存有关事件的信息。事件由 id、类型和发生时间组成。在我的代码中,我需要获取某个可能很宽的时间范围内的所有事件。
SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER BY time LIMIT :batch_size
为了提高可靠性和处理大型结果集,我按大小批量查询记录:batch_size。现在,我想编写一个函数,给定 :before 和 :after,将 return 一个表示结果集的 Observable。
Observable<Event> getEvents(long before, long after);
在内部,该函数应该批量查询数据库。事件在时间尺度上的分布是未知的。所以解决批处理的自然方法是这样的: 获取前 N 条记录 如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止 如果结果不为空,则将最后一条记录的时间作为新的'before'参数,取下N条记录;否则终止 ...等等(思路应该清楚了)
我的问题是:
有没有一种方法可以根据更高级别的 Observable 原语(filter/map/flatMap/scan/range 等)来表达此功能,而无需显式使用订阅者?
到目前为止,我没有做到这一点,而是提出了以下简单的代码:
private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) {
long start = before;
while (start < after) {
final List<Event> records;
try {
records = getRecordsByRange(start, after);
} catch (Exception e) {
subscriber.onError(e);
return;
}
if (records.isEmpty()) break;
records.forEach(subscriber::onNext);
start = Iterables.getLast(records).getTime();
}
subscriber.onCompleted();
}
public Observable<Event> getRecords(final long before, final long after) {
return Observable.create(subscriber -> observeGetRecords(before, after, subscriber));
}
此处,getRecordsByRange 使用 DBI 和 returns 列表实现 SELECT 查询。此代码工作正常,但缺乏高级 Rx 构造的优雅。
注意:我知道我可以 return 迭代器作为 DBI 中 SELECT 查询的结果。但是,我不想这样做,而是更喜欢 运行 多个查询。此计算不必是原子的,因此事务隔离的问题不相关。
虽然我不完全理解你为什么想要这样的时间重用,但我会这样做:
BehaviorSubject<Long> start = BehaviorSubject.create(0L);
start
.subscribeOn(Schedulers.trampoline())
.flatMap(tstart ->
getEvents(tstart, tstart + twindow)
.publish(o ->
o.takeLast(1)
.doOnNext(r -> start.onNext(r.time))
.ignoreElements()
.mergeWith(o)
)
)
.subscribe(...)