来自 ReactiveX Observable 的批处理结果

Batching Results from ReactiveX Observable

假设我有一个看起来像这样的可观察对象(这是 Python,但应该对所有语言都是通用的):

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给一个函数,所以像这样:

batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])

实际上,传入的数据将是无限的(可能是空的)列表流。我只是想确保在我累积 5 个实际值之前不会对 batch_function 进行后续调用。感谢您的帮助。

以下代码片段对我有用buffer_with_count。不过,我不确定是否有更简洁的方法来做到这一点,即没有 flat_map.

BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
  .flat_map(lambda it: it) \
  .buffer_with_count(BUFFER_COUNT) \
  .map(lambda my_array: do_something_with(my_array)) \
  .subscribe(lambda it: print(it))