来自 ReactiveX Observable 的批处理结果
Batching Results from ReactiveX Observable
假设我有一个看起来像这样的可观察对象(这是 Python,但应该对所有语言都是通用的):
rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])
我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给一个函数,所以像这样:
batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])
实际上,传入的数据将是无限的(可能是空的)列表流。我只是想确保在我累积 5 个实际值之前不会对 batch_function
进行后续调用。感谢您的帮助。
以下代码片段对我有用buffer_with_count
。不过,我不确定是否有更简洁的方法来做到这一点,即没有 flat_map
.
BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
.flat_map(lambda it: it) \
.buffer_with_count(BUFFER_COUNT) \
.map(lambda my_array: do_something_with(my_array)) \
.subscribe(lambda it: print(it))
假设我有一个看起来像这样的可观察对象(这是 Python,但应该对所有语言都是通用的):
rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])
我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给一个函数,所以像这样:
batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])
实际上,传入的数据将是无限的(可能是空的)列表流。我只是想确保在我累积 5 个实际值之前不会对 batch_function
进行后续调用。感谢您的帮助。
以下代码片段对我有用buffer_with_count
。不过,我不确定是否有更简洁的方法来做到这一点,即没有 flat_map
.
BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
.flat_map(lambda it: it) \
.buffer_with_count(BUFFER_COUNT) \
.map(lambda my_array: do_something_with(my_array)) \
.subscribe(lambda it: print(it))