定期调用 deferToThread
Periodically call deferToThread
我有一个字符串列表,我想定期处理这些字符串。
开始处理一个新字符串的周期是1秒,处理一个字符串需要3秒。
我期望观察到的是,从第 3 秒开始,我将每秒看到一个新结果,直到处理完所有字符串。
然而,我实际看到的是,当所有的结果都生成时,所有的结果都一起出现了。
那么问题来了,如何修改代码才能达到我期望看到的效果呢?
from twisted.internet import reactor, threads
import json
import time
def process(string):
print "Processing " + string + "\n"
time.sleep(3) # simulate computation time
# write result to file; result is mocked by string*3
file_name = string + ".txt"
with open(file_name, "w") as fp:
json.dump(string*3, fp)
print string + " processed\n"
string_list = ["AAAA", "BBBB", "CCCC", "XXXX", "YYYY", "ZZZZ"]
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
同时,生成结果的顺序似乎与处理字符串的顺序不一致。我猜它只是乱序打印,但它们实际上是按顺序处理的。如何验证我的猜测?
我注意到的另一件小事是 Processing YYYY
没有打印在正确的位置。这是为什么? (它和之前的结果之间应该有一个空行。)
Processing AAAA
Processing BBBB
Processing CCCC
Processing XXXX
Processing YYYY
Processing ZZZZ
YYYY processed
CCCC processed
AAAA processed
BBBB processed
XXXX processed
ZZZZ processed
这部分代码的作用:
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
是在每个调度操作之间延迟一秒来调度每个工作块。然后,最后,它启动允许处理开始的反应器。直到 reactor.run()
.
才开始处理
time.sleep(1)
的使用也意味着你的延迟是阻塞的,一旦你解决了上述问题,这将是一个问题。
一个解决方案是用 LoopingCall
.
替换 for
循环和 time.sleep(1)
from twisted.internet.task import LoopingCall, react
string_list = [...]
def process(string):
...
def process_strings(the_strings, f):
def dispatch(s):
d = deferToThread(f, s)
# Add callback / errback to d here to process the
# result or report any problems.
# Do _not_ return `d` though. LoopingCall will
# wait on it before running the next iteration if
# we do.
string_iter = iter(the_strings)
c = LoopingCall(lambda: dispatch(next(string_iter)))
d = c.start(1)
d.addErrback(lambda err: err.trap(StopIteration))
return d
def main(reactor):
return process_strings(string_list, process)
react(main, [])
此代码使用 react
来启动和停止反应器(它会在 main
返回的 Deferred
触发时停止)。它在线程池中使用 LoopingCall
开始,周期为 1 到 运行 f(next(string_iter))
,直到遇到 StopIteration
(或其他一些错误)。
(LoopingCall
和 deferToThread
都采用 *args
和 **kwargs
传递给它们的可调用对象,所以如果您愿意(这是风格问题),您可以也将该表达式写为 LoopingCall(lambda: deferToThread(f, next(string_iter)))
。您不能 "unwrap" 剩余的 lambda,因为这会导致 LoopingCall(deferToThread, f, next(string_iter))
,它在调用 LoopingCall
时仅计算 next(string_iter)
一次所以你最终会永远处理第一个字符串。)
还有其他可能的调度方法。例如,您可以一次使用 cooperate
到 运行 恰好 3 个处理线程 - 一旦旧线程完成就启动一个新线程。
from twisted.internet.defer import gatherResults
from twisted.internet.task import cooperate
def process_strings(the_strings, f):
# Define a generator of all of the jobs to be accomplished.
work_iter = (
deferToThread(lambda: f(a_string))
for a_string
in the_strings
)
# Consume jobs from the generator in parallel until done.
tasks = list(cooperate(work_iter) for i in range(3))
# Return a Deferred that fires when all three tasks have
# finished consuming all available jobs.
return gatherResults(list(
t.whenDone()
for t
in tasks
))
在这两种情况下,请注意没有使用 time.sleep
。
我有一个字符串列表,我想定期处理这些字符串。
开始处理一个新字符串的周期是1秒,处理一个字符串需要3秒。
我期望观察到的是,从第 3 秒开始,我将每秒看到一个新结果,直到处理完所有字符串。
然而,我实际看到的是,当所有的结果都生成时,所有的结果都一起出现了。 那么问题来了,如何修改代码才能达到我期望看到的效果呢?
from twisted.internet import reactor, threads
import json
import time
def process(string):
print "Processing " + string + "\n"
time.sleep(3) # simulate computation time
# write result to file; result is mocked by string*3
file_name = string + ".txt"
with open(file_name, "w") as fp:
json.dump(string*3, fp)
print string + " processed\n"
string_list = ["AAAA", "BBBB", "CCCC", "XXXX", "YYYY", "ZZZZ"]
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
同时,生成结果的顺序似乎与处理字符串的顺序不一致。我猜它只是乱序打印,但它们实际上是按顺序处理的。如何验证我的猜测?
我注意到的另一件小事是 Processing YYYY
没有打印在正确的位置。这是为什么? (它和之前的结果之间应该有一个空行。)
Processing AAAA
Processing BBBB
Processing CCCC
Processing XXXX
Processing YYYY
Processing ZZZZ
YYYY processed
CCCC processed
AAAA processed
BBBB processed
XXXX processed
ZZZZ processed
这部分代码的作用:
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
是在每个调度操作之间延迟一秒来调度每个工作块。然后,最后,它启动允许处理开始的反应器。直到 reactor.run()
.
time.sleep(1)
的使用也意味着你的延迟是阻塞的,一旦你解决了上述问题,这将是一个问题。
一个解决方案是用 LoopingCall
.
for
循环和 time.sleep(1)
from twisted.internet.task import LoopingCall, react
string_list = [...]
def process(string):
...
def process_strings(the_strings, f):
def dispatch(s):
d = deferToThread(f, s)
# Add callback / errback to d here to process the
# result or report any problems.
# Do _not_ return `d` though. LoopingCall will
# wait on it before running the next iteration if
# we do.
string_iter = iter(the_strings)
c = LoopingCall(lambda: dispatch(next(string_iter)))
d = c.start(1)
d.addErrback(lambda err: err.trap(StopIteration))
return d
def main(reactor):
return process_strings(string_list, process)
react(main, [])
此代码使用 react
来启动和停止反应器(它会在 main
返回的 Deferred
触发时停止)。它在线程池中使用 LoopingCall
开始,周期为 1 到 运行 f(next(string_iter))
,直到遇到 StopIteration
(或其他一些错误)。
(LoopingCall
和 deferToThread
都采用 *args
和 **kwargs
传递给它们的可调用对象,所以如果您愿意(这是风格问题),您可以也将该表达式写为 LoopingCall(lambda: deferToThread(f, next(string_iter)))
。您不能 "unwrap" 剩余的 lambda,因为这会导致 LoopingCall(deferToThread, f, next(string_iter))
,它在调用 LoopingCall
时仅计算 next(string_iter)
一次所以你最终会永远处理第一个字符串。)
还有其他可能的调度方法。例如,您可以一次使用 cooperate
到 运行 恰好 3 个处理线程 - 一旦旧线程完成就启动一个新线程。
from twisted.internet.defer import gatherResults
from twisted.internet.task import cooperate
def process_strings(the_strings, f):
# Define a generator of all of the jobs to be accomplished.
work_iter = (
deferToThread(lambda: f(a_string))
for a_string
in the_strings
)
# Consume jobs from the generator in parallel until done.
tasks = list(cooperate(work_iter) for i in range(3))
# Return a Deferred that fires when all three tasks have
# finished consuming all available jobs.
return gatherResults(list(
t.whenDone()
for t
in tasks
))
在这两种情况下,请注意没有使用 time.sleep
。