RxPy:在(慢速)扫描执行之间对热可观察对象进行排序
RxPy: Sort hot observable between (slow) scan executions
TL;DR 我正在寻求帮助来实现下面的弹珠图。目的是在扫描执行之间没有等待时间的情况下尽可能对未排序的值进行排序。
我不是要求完整的实施。欢迎任何指导。
我有一个无限热可观察对象的异步慢速(出于测试目的而强制)扫描。这是相关代码:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
这是当前输出(值是随机生成的):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
我想对扫描执行之间的未决消息进行排序。因此,第一个发出的消息将始终是第一个被消费的消息,但下一个消费的消息将是在那一点之前发出的和未消费的消息的最小值(所有这些都在当前版本中,因为即时发出)。等等……我觉得弹珠图比我解释的好。
请注意,扫描不是在等待完成事件,它在发出最后一条消息后没有开始的唯一原因是因为睡眠。 Here you have another version 其中睡眠已从扫描中删除并放入 ExternalDummyService。您可以看到值在发出时就被消耗掉了。这在弹珠图中也有体现。
我尝试使用 to_sorted_list,这是我在 RxPy 中找到的唯一排序方法,但我无法使其工作。
我要找的是这样的:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
谢谢
如果您想使用 to_sorted_list
,您需要重新映射您在单个元素中获得的列表。将 main
函数更改为:
def main():
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.to_sorted_list(key_selector=lambda msg: msg.timestamp) \
.flat_map(lambda msglist: Observable.from_iterable(msglist)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
给出:
>emitting Msg(count=0, timestamp=18.924474716186523)
>emitting Msg(count=1, timestamp=4.669189453125)
>emitting Msg(count=2, timestamp=18.633127212524414)
>emitting Msg(count=3, timestamp=15.151262283325195)
>emitting Msg(count=4, timestamp=14.705896377563477)
>SLOW st.count=0 last_msg.counter=1 ts=4.67
>SLOW st.count=1 last_msg.counter=4 ts=14.71
>SLOW st.count=2 last_msg.counter=3 ts=15.15
>SLOW st.count=3 last_msg.counter=2 ts=18.63
>SLOW st.count=4 last_msg.counter=0 ts=18.92
>SLOW FINISHED
请注意,to_sorted_list
方法将等待主题流结束后开始扫描,因此您不能使用它来实现问题中显示的弹珠图。
为了正确地实现它,我认为你需要像 onBackpressureBuffer
这样的东西,它在 RxJava 中实现但在 RxPy 中没有实现。
这不会完全解决问题,因为缓冲区是 FIFO(先进先出)并且您需要一种自定义方式来选择先发出的消息。这可能需要调整对缓冲区的请求的处理方式。
您可能会找到一种更好的方法来获得一个名为 rxbackpressure, particularly with its class dequeuablebuffer.py 的 RxPy 扩展解决方案,您可以根据自己的需要进行调整。
TL;DR 我正在寻求帮助来实现下面的弹珠图。目的是在扫描执行之间没有等待时间的情况下尽可能对未排序的值进行排序。
我不是要求完整的实施。欢迎任何指导。
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
这是完整版:https://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
这是当前输出(值是随机生成的):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
我想对扫描执行之间的未决消息进行排序。因此,第一个发出的消息将始终是第一个被消费的消息,但下一个消费的消息将是在那一点之前发出的和未消费的消息的最小值(所有这些都在当前版本中,因为即时发出)。等等……我觉得弹珠图比我解释的好。
请注意,扫描不是在等待完成事件,它在发出最后一条消息后没有开始的唯一原因是因为睡眠。 Here you have another version 其中睡眠已从扫描中删除并放入 ExternalDummyService。您可以看到值在发出时就被消耗掉了。这在弹珠图中也有体现。
我尝试使用 to_sorted_list,这是我在 RxPy 中找到的唯一排序方法,但我无法使其工作。
我要找的是这样的:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
谢谢
如果您想使用 to_sorted_list
,您需要重新映射您在单个元素中获得的列表。将 main
函数更改为:
def main():
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.to_sorted_list(key_selector=lambda msg: msg.timestamp) \
.flat_map(lambda msglist: Observable.from_iterable(msglist)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
给出:
>emitting Msg(count=0, timestamp=18.924474716186523)
>emitting Msg(count=1, timestamp=4.669189453125)
>emitting Msg(count=2, timestamp=18.633127212524414)
>emitting Msg(count=3, timestamp=15.151262283325195)
>emitting Msg(count=4, timestamp=14.705896377563477)
>SLOW st.count=0 last_msg.counter=1 ts=4.67
>SLOW st.count=1 last_msg.counter=4 ts=14.71
>SLOW st.count=2 last_msg.counter=3 ts=15.15
>SLOW st.count=3 last_msg.counter=2 ts=18.63
>SLOW st.count=4 last_msg.counter=0 ts=18.92
>SLOW FINISHED
请注意,to_sorted_list
方法将等待主题流结束后开始扫描,因此您不能使用它来实现问题中显示的弹珠图。
为了正确地实现它,我认为你需要像 onBackpressureBuffer
这样的东西,它在 RxJava 中实现但在 RxPy 中没有实现。
这不会完全解决问题,因为缓冲区是 FIFO(先进先出)并且您需要一种自定义方式来选择先发出的消息。这可能需要调整对缓冲区的请求的处理方式。
您可能会找到一种更好的方法来获得一个名为 rxbackpressure, particularly with its class dequeuablebuffer.py 的 RxPy 扩展解决方案,您可以根据自己的需要进行调整。