subscribe_on 与 from_iterable/range 在 RxPY

subscribe_on with from_iterable/range in RxPY

我正在尝试着手安排 python 的响应式扩展。我想使用 subscribe_on 并行处理多个可观察对象。如果使用 just 创建 observable,这会很好地工作,但如果使用 rangefrom_ 则不行。

just 默认为 Scheduler.immediate,而其他生成器默认为 Scheduler.current_thread。这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

它与 observe_on 一起工作,或者如果调度程序直接传递给生成器,但我想将可观察的创建与处理分离并实现这样的事情:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

我是不是误会了subscribe_on?我的方法错了吗?

您的示例中有三个操作可以独立安排:

  1. 数据馈送操作。 justrange 默认使用不同的调度器,但它们之间没有太大区别。两者都在当前线程上输入初始值。您可以通过将其作为参数传递给这些方法来覆盖它们的默认调度程序。

  2. 订阅操作。默认使用 Scheduler.current_thread。 IE。它与数据馈送操作在同一线程上执行。可以被subscribe_on方法覆盖。

  3. 观察(on_nexton_erroron_completed)动作。默认使用 Scheduler.current_thread。 IE。它与订阅操作在同一线程上执行。可以被observe_on方法覆盖。

如果您仅针对这些操作中的一个覆盖调度程序,其他操作应如上所述进行。

关于调度程序

Scheduler.immediate 并没有真正安排任何事情。它立即在计划的同一线程上调用操作。

Scheduler.current_thread 通过排队操作避免递归,但仍会在计划的同一线程上调用操作。

Scheduler.new_thread启动单个后台线程来一个接一个地执行动作。

Scheduler.timeout 为需要执行的每个操作启动新的后台线程。

尝试并行处理

在不同线程中安排工作最合适的方法似乎是observe_on

问题是目前 RxPy 中没有 thread_pool 调度器。 new_thread 调度程序只启动一个线程,因此对您帮助不大。

timeout 调度程序可用于并行,但它无法控制并发线程的数量,因此并发任务数量的爆炸式增长会导致内存溢出并导致系统崩溃。

不是 observe_on

中的错误

我用 observe_on(Scheduler.timeout) 尝试了 运行 你的例子,但任务仍然没有并行进行。在查看 RxPy 源代码后,我发现它仅在当前事件完成后才安排下一个事件,这有效地 禁用了并行处理 。我的第一反应是在 observe_on 实施中报告 bug

但经过进一步调查,我发现串行执行不是错误,而是 intended behavior

并行执行任务的正确方法

这是有效的代码(基于 this answer):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)

Observable.start 创建 异步 可观察对象,通过 Scheduler.timeout.

安排在单独的线程上

observe_on(Scheduler.event_loop) 是可选的。它强制 finish 在同一线程上调用所有项目的方法。

请注意,无法保证 finish 方法会按初始 range 顺序调用。