如何等待 RxPy 并行线程完成
How to wait for RxPy parallel threads to complete
基于此 我可以在 RxPy 中并行执行多个任务,我的问题是如何等待它们全部完成?我知道使用线程我可以做到 .join()
但是 Rx Schedulers 似乎没有任何这样的选项。 .to_blocking()
也无济于事,MainThread 在触发所有通知并调用完整的处理程序之前完成。这是一个例子:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
# time.sleep(2)
预期输出
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread
实际产量
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
如果我取消注释睡眠调用的实际输出
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
对于ThreadPoolScheduler
,您可以:
- 调度器 = ThreadPoolScheduler(pool_size)
- 并行调用。
scheduler.executor.shutdown()
然后,完成后就可以得到所有的结果了。
在此发布完整的解决方案:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(4)
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
scheduler.executor.shutdown()
# time.sleep(2)
使用run()
等待RxPy并行线程完成。
BlockingObservables 已从 RxPY v3 中删除。
from threading import current_thread
import rx, random, multiprocessing, time
from rx import operators as ops
def intense_calculation(value):
delay = random.randint(5, 20) * 0.2
time.sleep(delay)
print("From adding_delay: {0} Value : {1} {2}".format(current_thread(), value, delay))
return (value[0], value[1]+ " processed")
thread_pool_scheduler = rx.scheduler.NewThreadScheduler()
my_dict={'A':'url1', 'B':'url2', 'C':'url3'}
new_dict = rx.from_iterable(my_dict.items()).pipe(
ops.flat_map(lambda a: rx.of(a).pipe(
ops.map(lambda a: intense_calculation(a)),
ops.subscribe_on(thread_pool_scheduler)
)),
ops.to_dict(lambda x: x[0], lambda x: x[1])
).run()
print("From main: {0}".format(current_thread()))
print(str(new_dict))
基于此 .join()
但是 Rx Schedulers 似乎没有任何这样的选项。 .to_blocking()
也无济于事,MainThread 在触发所有通知并调用完整的处理程序之前完成。这是一个例子:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
# time.sleep(2)
预期输出
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread
实际产量
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
如果我取消注释睡眠调用的实际输出
calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3
All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
对于ThreadPoolScheduler
,您可以:
- 调度器 = ThreadPoolScheduler(pool_size)
- 并行调用。
scheduler.executor.shutdown()
然后,完成后就可以得到所有的结果了。
在此发布完整的解决方案:
from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread
from rx.concurrency import ThreadPoolScheduler
def printthread(val):
print("{}, thread: {}".format(val, current_thread().name))
def intense_calculation(value):
printthread("calc {}".format(value))
time.sleep(random.randint(5, 20) * .1)
return value
if __name__ == "__main__":
scheduler = ThreadPoolScheduler(4)
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=scheduler)) \
.observe_on(Scheduler.event_loop) \
.subscribe(
on_next=lambda x: printthread("on_next: {}".format(x)),
on_completed=lambda: printthread("on_completed"),
on_error=lambda err: printthread("on_error: {}".format(err)))
printthread("\nAll done")
scheduler.executor.shutdown()
# time.sleep(2)
使用run()
等待RxPy并行线程完成。
BlockingObservables 已从 RxPY v3 中删除。
from threading import current_thread
import rx, random, multiprocessing, time
from rx import operators as ops
def intense_calculation(value):
delay = random.randint(5, 20) * 0.2
time.sleep(delay)
print("From adding_delay: {0} Value : {1} {2}".format(current_thread(), value, delay))
return (value[0], value[1]+ " processed")
thread_pool_scheduler = rx.scheduler.NewThreadScheduler()
my_dict={'A':'url1', 'B':'url2', 'C':'url3'}
new_dict = rx.from_iterable(my_dict.items()).pipe(
ops.flat_map(lambda a: rx.of(a).pipe(
ops.map(lambda a: intense_calculation(a)),
ops.subscribe_on(thread_pool_scheduler)
)),
ops.to_dict(lambda x: x[0], lambda x: x[1])
).run()
print("From main: {0}".format(current_thread()))
print(str(new_dict))