subscribe_on 与 from_iterable/range 在 RxPY
subscribe_on with from_iterable/range in RxPY
我正在尝试着手安排 python 的响应式扩展。我想使用 subscribe_on
并行处理多个可观察对象。如果使用 just
创建 observable,这会很好地工作,但如果使用 range
或 from_
则不行。
just
默认为 Scheduler.immediate
,而其他生成器默认为 Scheduler.current_thread
。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
它与 observe_on
一起工作,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现这样的事情:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
我是不是误会了subscribe_on
?我的方法错了吗?
您的示例中有三个操作可以独立安排:
数据馈送操作。 just
和 range
默认使用不同的调度器,但它们之间没有太大区别。两者都在当前线程上输入初始值。您可以通过将其作为参数传递给这些方法来覆盖它们的默认调度程序。
订阅操作。默认使用 Scheduler.current_thread
。 IE。它与数据馈送操作在同一线程上执行。可以被subscribe_on
方法覆盖。
观察(on_next
、on_error
、on_completed
)动作。默认使用 Scheduler.current_thread
。 IE。它与订阅操作在同一线程上执行。可以被observe_on
方法覆盖。
如果您仅针对这些操作中的一个覆盖调度程序,其他操作应如上所述进行。
关于调度程序
Scheduler.immediate
并没有真正安排任何事情。它立即在计划的同一线程上调用操作。
Scheduler.current_thread
通过排队操作避免递归,但仍会在计划的同一线程上调用操作。
Scheduler.new_thread
启动单个后台线程来一个接一个地执行动作。
Scheduler.timeout
为需要执行的每个操作启动新的后台线程。
尝试并行处理
在不同线程中安排工作最合适的方法似乎是observe_on
。
问题是目前 RxPy 中没有 thread_pool
调度器。 new_thread
调度程序只启动一个线程,因此对您帮助不大。
timeout
调度程序可用于并行,但它无法控制并发线程的数量,因此并发任务数量的爆炸式增长会导致内存溢出并导致系统崩溃。
不是 observe_on
中的错误
我用 observe_on(Scheduler.timeout)
尝试了 运行 你的例子,但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这有效地 禁用了并行处理 。我的第一反应是在 observe_on
实施中报告 bug。
但经过进一步调查,我发现串行执行不是错误,而是 intended behavior。
并行执行任务的正确方法
这是有效的代码(基于 this answer):
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(finish)
Observable.start
创建 异步 可观察对象,通过 Scheduler.timeout
.
安排在单独的线程上
observe_on(Scheduler.event_loop)
是可选的。它强制 finish
在同一线程上调用所有项目的方法。
请注意,无法保证 finish
方法会按初始 range
顺序调用。
我正在尝试着手安排 python 的响应式扩展。我想使用 subscribe_on
并行处理多个可观察对象。如果使用 just
创建 observable,这会很好地工作,但如果使用 range
或 from_
则不行。
just
默认为 Scheduler.immediate
,而其他生成器默认为 Scheduler.current_thread
。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
它与 observe_on
一起工作,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现这样的事情:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
我是不是误会了subscribe_on
?我的方法错了吗?
您的示例中有三个操作可以独立安排:
数据馈送操作。
just
和range
默认使用不同的调度器,但它们之间没有太大区别。两者都在当前线程上输入初始值。您可以通过将其作为参数传递给这些方法来覆盖它们的默认调度程序。订阅操作。默认使用
Scheduler.current_thread
。 IE。它与数据馈送操作在同一线程上执行。可以被subscribe_on
方法覆盖。观察(
on_next
、on_error
、on_completed
)动作。默认使用Scheduler.current_thread
。 IE。它与订阅操作在同一线程上执行。可以被observe_on
方法覆盖。
如果您仅针对这些操作中的一个覆盖调度程序,其他操作应如上所述进行。
关于调度程序
Scheduler.immediate
并没有真正安排任何事情。它立即在计划的同一线程上调用操作。
Scheduler.current_thread
通过排队操作避免递归,但仍会在计划的同一线程上调用操作。
Scheduler.new_thread
启动单个后台线程来一个接一个地执行动作。
Scheduler.timeout
为需要执行的每个操作启动新的后台线程。
尝试并行处理
在不同线程中安排工作最合适的方法似乎是observe_on
。
问题是目前 RxPy 中没有 thread_pool
调度器。 new_thread
调度程序只启动一个线程,因此对您帮助不大。
timeout
调度程序可用于并行,但它无法控制并发线程的数量,因此并发任务数量的爆炸式增长会导致内存溢出并导致系统崩溃。
不是 observe_on
中的错误我用 observe_on(Scheduler.timeout)
尝试了 运行 你的例子,但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这有效地 禁用了并行处理 。我的第一反应是在 observe_on
实施中报告 bug。
但经过进一步调查,我发现串行执行不是错误,而是 intended behavior。
并行执行任务的正确方法
这是有效的代码(基于 this answer):
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(finish)
Observable.start
创建 异步 可观察对象,通过 Scheduler.timeout
.
observe_on(Scheduler.event_loop)
是可选的。它强制 finish
在同一线程上调用所有项目的方法。
请注意,无法保证 finish
方法会按初始 range
顺序调用。