定期调用 deferToThread

Periodically call deferToThread

我有一个字符串列表,我想定期处理这些字符串。

开始处理一个新字符串的周期是1秒,处理一个字符串需要3秒。

我期望观察到的是,从第 3 秒开始,我将每秒看到一个新结果,直到处理完所有字符串。

然而,我实际看到的是,当所有的结果都生成时,所有的结果都一起出现了。 那么问题来了,如何修改代码才能达到我期望看到的效果呢?

from twisted.internet import reactor, threads
import json
import time


def process(string):
    print "Processing " + string + "\n"
    time.sleep(3)  # simulate computation time

    # write result to file; result is mocked by string*3
    file_name = string + ".txt"
    with open(file_name, "w") as fp:
        json.dump(string*3, fp)

    print string + " processed\n"

string_list = ["AAAA", "BBBB", "CCCC", "XXXX", "YYYY", "ZZZZ"]

for s in string_list:
    # start a new thread every second
    time.sleep(1)
    threads.deferToThread(process, s)

reactor.run()

同时,生成结果的顺序似乎与处理字符串的顺序不一致。我猜它只是乱序打印,但它们实际上是按顺序处理的。如何验证我的猜测?

我注意到的另一件小事是 Processing YYYY 没有打印在正确的位置。这是为什么? (它和之前的结果之间应该有一个空行。)

Processing AAAA

Processing BBBB

Processing CCCC

Processing XXXX
Processing YYYY


Processing ZZZZ

YYYY processed

CCCC processed

AAAA processed

BBBB processed

XXXX processed

ZZZZ processed

这部分代码的作用:

for s in string_list:
    # start a new thread every second
    time.sleep(1)
    threads.deferToThread(process, s)

reactor.run()

是在每个调度操作之间延迟一秒来调度每个工作块。然后,最后,它启动允许处理开始的反应器。直到 reactor.run().

才开始处理

time.sleep(1)的使用也意味着你的延迟是阻塞的,一旦你解决了上述问题,这将是一个问题。

一个解决方案是用 LoopingCall.

替换 for 循环和 time.sleep(1)
from twisted.internet.task import LoopingCall, react

string_list = [...]
def process(string):
    ...

def process_strings(the_strings, f):
    def dispatch(s):
        d = deferToThread(f, s)
        # Add callback / errback to d here to process the
        # result or report any problems.
        # Do _not_ return `d` though.  LoopingCall will
        # wait on it before running the next iteration if
        # we do.

    string_iter = iter(the_strings)
    c = LoopingCall(lambda: dispatch(next(string_iter)))
    d = c.start(1)
    d.addErrback(lambda err: err.trap(StopIteration))
    return d

def main(reactor):
    return process_strings(string_list, process)

react(main, [])

此代码使用 react 来启动和停止反应器(它会在 main 返回的 Deferred 触发时停止)。它在线程池中使用 LoopingCall 开始,周期为 1 到 运行 f(next(string_iter)),直到遇到 StopIteration(或其他一些错误)。

(LoopingCalldeferToThread 都采用 *args**kwargs 传递给它们的可调用对象,所以如果您愿意(这是风格问题),您可以也将该表达式写为 LoopingCall(lambda: deferToThread(f, next(string_iter)))。您不能 "unwrap" 剩余的 lambda,因为这会导致 LoopingCall(deferToThread, f, next(string_iter)),它在调用 LoopingCall 时仅计算 next(string_iter) 一次所以你最终会永远处理第一个字符串。)

还有其他可能的调度方法。例如,您可以一次使用 cooperate 到 运行 恰好 3 个处理线程 - 一旦旧线程完成就启动一个新线程。

from twisted.internet.defer import gatherResults
from twisted.internet.task import cooperate

def process_strings(the_strings, f):
    # Define a generator of all of the jobs to be accomplished.
    work_iter = (
        deferToThread(lambda: f(a_string))
        for a_string
        in the_strings
    )
    # Consume jobs from the generator in parallel until done.
    tasks = list(cooperate(work_iter) for i in range(3))

    # Return a Deferred that fires when all three tasks have
    # finished consuming all available jobs.
    return gatherResults(list(
        t.whenDone()
        for t
        in tasks
    ))

在这两种情况下,请注意没有使用 time.sleep