多次回调后触发 DeferredList
Fire DeferredList after multiple callbacks
A twisted.internet.defer.DeferredList
这样做:
I combine a group of deferreds into one callback.
I track a list of Deferreds for their callbacks, and make a single
callback when they have all completed, a list of (success, result)
tuples, 'success' being a boolean.
Note that you can still use a Deferred after putting it in a
DeferredList. For example, you can suppress 'Unhandled error in
Deferred' messages by adding errbacks to the Deferreds after putting
them in the DeferredList, as a DeferredList won't swallow the errors.
(Although a more convenient way to do this is simply to set the
consumeErrors flag)
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0): (source)
overrides twisted.internet.defer.Deferred.__init__
Initialize a DeferredList.
Parameters deferredList The list of deferreds to track. (type: list of Deferreds )
fireOnOneCallback (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback
fireOnOneErrback (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback
consumeErrors (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged.
具体来说:
fireOnOneCallback
(keyword param) a flag indicating that only one
callback needs to be fired for me to call my callback
我正在寻找类似 fireOnOneCallback=True
的行为,而是在 n
回调上触发。我试过这样做,但它已经变得一团糟了。我相信有更好的方法。
def _get_fired_index(deferred_list):
for index, (success, value) in enumerate(deferred_list):
if success:
return index
raise ValueError('No deferreds were fired.')
def _fire_on_other_callback(already_fired_index, deferred_list, callback, ):
dlist_except_first_fired = (
deferred_list[:already_fired_index]
+ deferred_list[already_fired_index + 1:]
)
dlist2 = DeferredList(dlist_except_first_fired, fireOnOneCallback=True)
dlist2.addCallback(callback, deferred_list)
def _fire_on_two_callbacks(deferreds, callback, errback):
dlist1 = DeferredList(deferreds, fireOnOneCallback=True)
dlist1.addCallback(_get_fired_index)
dlist1.addCallback(_fire_on_other_callback, deferreds, callback, errback)
这是一种可能的方法。
from __future__ import print_function
import attr
from twisted.internet.defer import Deferred
def fireOnN(n, ds):
acc = _Accumulator(n)
for index, d in enumerate(ds):
d.addCallback(acc.one_result, index)
return acc.n_results
@attr.s
class _Accumulator(object):
n = attr.ib()
so_far = attr.ib(default=attr.Factory(dict))
done = attr.ib(default=False)
n_results = attr.ib(default=attr.Factory(Deferred))
def one_result(self, result, index):
if self.done:
return result
self.so_far[index] = result
if len(self.so_far) == self.n:
self.done = True
so_far = self.so_far
self.so_far = None
self.n_results.callback(so_far)
dx = list(Deferred().addCallback(print, i) for i in range(3))
done = fireOnN(2, dx)
done.addCallback(print, "done")
for i, d in enumerate(dx):
d.callback("result {}".format(i))
请注意,此实现不处理错误提示,并且可能还有其他缺点(例如保留 n_results
参考)。然而,基本思想是合理的:从回调中积累状态,直到达到所需的条件,然后触发另一个 Deferred。
DeferredList
其不相关的功能和界面并非为解决此问题而设计,只会给这个问题带来不必要的复杂性。
这是使用 DeferredSemaphore
处理潜在竞争条件的另一种方法。这将在 n
的 deferreds 被触发后立即触发并取消其余的。
from twisted.internet import defer
def fireAfterNthCallback(deferreds, n):
if not n or n > len(deferreds):
raise ValueError
results = {}
finished_deferred = defer.Deferred()
sem = defer.DeferredSemaphore(1)
def wrap_sem(result, index):
return sem.run(callback_result, result, index)
def cancel_remaining():
finished = [deferreds[index] for index in results.keys()]
for d in finished:
deferreds.remove(d)
for d in deferreds:
d.addErrback(lambda err: err.trap(defer.CancelledError))
d.cancel()
def callback_result(result, index):
results[index] = result
if len(results) >= n:
cancel_remaining()
finished_deferred.callback(results.values())
return result
for deferred_index, deferred in enumerate(deferreds):
deferred.addCallback(wrap_sem, deferred_index)
return finished_deferred
A twisted.internet.defer.DeferredList
这样做:
I combine a group of deferreds into one callback.
I track a list of Deferreds for their callbacks, and make a single callback when they have all completed, a list of (success, result) tuples, 'success' being a boolean.
Note that you can still use a Deferred after putting it in a DeferredList. For example, you can suppress 'Unhandled error in Deferred' messages by adding errbacks to the Deferreds after putting them in the DeferredList, as a DeferredList won't swallow the errors. (Although a more convenient way to do this is simply to set the consumeErrors flag)
def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0): (source)
overrides twisted.internet.defer.Deferred.__init__
Initialize a DeferredList.
Parameters deferredList The list of deferreds to track. (type: list of Deferreds )
fireOnOneCallback (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback
fireOnOneErrback (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback
consumeErrors (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged.
具体来说:
fireOnOneCallback
(keyword param) a flag indicating that only one callback needs to be fired for me to call my callback
我正在寻找类似 fireOnOneCallback=True
的行为,而是在 n
回调上触发。我试过这样做,但它已经变得一团糟了。我相信有更好的方法。
def _get_fired_index(deferred_list):
for index, (success, value) in enumerate(deferred_list):
if success:
return index
raise ValueError('No deferreds were fired.')
def _fire_on_other_callback(already_fired_index, deferred_list, callback, ):
dlist_except_first_fired = (
deferred_list[:already_fired_index]
+ deferred_list[already_fired_index + 1:]
)
dlist2 = DeferredList(dlist_except_first_fired, fireOnOneCallback=True)
dlist2.addCallback(callback, deferred_list)
def _fire_on_two_callbacks(deferreds, callback, errback):
dlist1 = DeferredList(deferreds, fireOnOneCallback=True)
dlist1.addCallback(_get_fired_index)
dlist1.addCallback(_fire_on_other_callback, deferreds, callback, errback)
这是一种可能的方法。
from __future__ import print_function
import attr
from twisted.internet.defer import Deferred
def fireOnN(n, ds):
acc = _Accumulator(n)
for index, d in enumerate(ds):
d.addCallback(acc.one_result, index)
return acc.n_results
@attr.s
class _Accumulator(object):
n = attr.ib()
so_far = attr.ib(default=attr.Factory(dict))
done = attr.ib(default=False)
n_results = attr.ib(default=attr.Factory(Deferred))
def one_result(self, result, index):
if self.done:
return result
self.so_far[index] = result
if len(self.so_far) == self.n:
self.done = True
so_far = self.so_far
self.so_far = None
self.n_results.callback(so_far)
dx = list(Deferred().addCallback(print, i) for i in range(3))
done = fireOnN(2, dx)
done.addCallback(print, "done")
for i, d in enumerate(dx):
d.callback("result {}".format(i))
请注意,此实现不处理错误提示,并且可能还有其他缺点(例如保留 n_results
参考)。然而,基本思想是合理的:从回调中积累状态,直到达到所需的条件,然后触发另一个 Deferred。
DeferredList
其不相关的功能和界面并非为解决此问题而设计,只会给这个问题带来不必要的复杂性。
这是使用 DeferredSemaphore
处理潜在竞争条件的另一种方法。这将在 n
的 deferreds 被触发后立即触发并取消其余的。
from twisted.internet import defer
def fireAfterNthCallback(deferreds, n):
if not n or n > len(deferreds):
raise ValueError
results = {}
finished_deferred = defer.Deferred()
sem = defer.DeferredSemaphore(1)
def wrap_sem(result, index):
return sem.run(callback_result, result, index)
def cancel_remaining():
finished = [deferreds[index] for index in results.keys()]
for d in finished:
deferreds.remove(d)
for d in deferreds:
d.addErrback(lambda err: err.trap(defer.CancelledError))
d.cancel()
def callback_result(result, index):
results[index] = result
if len(results) >= n:
cancel_remaining()
finished_deferred.callback(results.values())
return result
for deferred_index, deferred in enumerate(deferreds):
deferred.addCallback(wrap_sem, deferred_index)
return finished_deferred