RxPy with_latest_from 产生不一致的结果
RxPy with_latest_from producing inconsistent results
我在 Observable
上使用 group_by
但对于每个新创建的组,我想捕捉导致使用 [= 创建组的元素(使用新键) 16=]:
>>> from __future__ import print_function
>>> from rx import Observable
>>> # sequence 1, 2, 3, ... every half a second
>>> observable=Observable.interval(500).map(lambda x: x + 1)
>>> # groups into numbers that are divisible by 3 (True) and those that are not (False)
>>> grouped = observable.group_by(lambda x: bool(x%3))
>>> # groups paired with the first element that kicked off the group
>>> grouped.with_latest_from(observable, lambda group, element: (group, element)).subscribe(print)
我希望看到下面的两个打印出来,但我每次只看到其中一个。
(<rx.linq.groupedobservable.GroupedObservable object at 0xabc>, 1) # 1 is the element that created group with key=False
(<rx.linq.groupedobservable.GroupedObservable object at 0xdef>, 3) # 3 is the element that created group with key=True
偶尔我也看到 snapped 元素为 2:
(<rx.linq.groupedobservable.GroupedObservable object at 0x0313EB10>, 2)
知道出了什么问题吗?
来自Dag Brattli:
The problem seems to be that you are basically using with_latest_from() on a stream with itself. Thus there are no guaranties if source or latest will trigger first. If latest triggers before source for a new group, then it will be lost. A way to solve it is to get a separate stream of the first elements in each group and zip that with the stream of groups:
# A stream with the first element if each group
firsts = grouped.flat_map(lambda group: group.first())
# groups paired with the first element that kicked off the group
grouped.zip(firsts, lambda g, k: (g, k)).subscribe(print)
Also note that each group also contains the key that is the result of the modulus operator if that is something that can be used instead of the element:
grouped.map(lambda gr: (gr, gr.key)).subscribe(print)
我在 Observable
上使用 group_by
但对于每个新创建的组,我想捕捉导致使用 [= 创建组的元素(使用新键) 16=]:
>>> from __future__ import print_function
>>> from rx import Observable
>>> # sequence 1, 2, 3, ... every half a second
>>> observable=Observable.interval(500).map(lambda x: x + 1)
>>> # groups into numbers that are divisible by 3 (True) and those that are not (False)
>>> grouped = observable.group_by(lambda x: bool(x%3))
>>> # groups paired with the first element that kicked off the group
>>> grouped.with_latest_from(observable, lambda group, element: (group, element)).subscribe(print)
我希望看到下面的两个打印出来,但我每次只看到其中一个。
(<rx.linq.groupedobservable.GroupedObservable object at 0xabc>, 1) # 1 is the element that created group with key=False
(<rx.linq.groupedobservable.GroupedObservable object at 0xdef>, 3) # 3 is the element that created group with key=True
偶尔我也看到 snapped 元素为 2:
(<rx.linq.groupedobservable.GroupedObservable object at 0x0313EB10>, 2)
知道出了什么问题吗?
来自Dag Brattli:
The problem seems to be that you are basically using with_latest_from() on a stream with itself. Thus there are no guaranties if source or latest will trigger first. If latest triggers before source for a new group, then it will be lost. A way to solve it is to get a separate stream of the first elements in each group and zip that with the stream of groups:
# A stream with the first element if each group
firsts = grouped.flat_map(lambda group: group.first())
# groups paired with the first element that kicked off the group
grouped.zip(firsts, lambda g, k: (g, k)).subscribe(print)
Also note that each group also contains the key that is the result of the modulus operator if that is something that can be used instead of the element:
grouped.map(lambda gr: (gr, gr.key)).subscribe(print)