如果完成并获得预期结果,则取消挂起的异步任务
Cancel pending asyncio tasks if one completes with desired result
下面的示例纯粹是理论上的,用于传达我在这里想要实现的目标。
我有几个名字 - 杰克、爱丽丝、鲍勃
其中一个人姓墨菲
我可以通过网络调用来检索全名。一找到姓“墨菲”的人就开心
async def get_persons():
persons = await asyncio.gather(
get_person("Jack"),
get_person("Alice"),
get_person("Bob"))
for person in persons:
if person.surname == "Murphy":
return person
def main():
person = asyncio.run(get_persons())
print(f"{person.first_name} has surname {person.last_name}")
当然这里的问题是我们必须等待所有 3 个请求完成。
所以最短等待时间是所有 3 个请求中的最大请求时间。
有3个网络请求。
假设第一个需要3秒,第二个需要1秒,第三个需要6秒
运行此代码需要 6 秒。
但是我们看到第二个请求(Alice)的姓氏是Murphy,而且显然是在1秒后完成。
我们可以忽略其他两个网络请求,此时只 return 吗?
所以最终,整个过程只需要 1 秒,而不是 6 秒。
编辑:
(代码更新以反映 Ajax1234 的解决方案)
class Persons:
def __init__(self):
self.p = []
def get_person_request(self, name):
if name == "Alice":
print("Searching Alice")
time.sleep(6)
print("Returning Alice")
return {'firstname': "Alice", 'surname': "Donnelly"}
if name == "Bob":
print("Searching Bob")
time.sleep(3)
print("Returning Bob")
return {'firstname': "Bob", 'surname': "Murphy"}
if name == "Jack":
print("Searching Jack")
time.sleep(8)
print("Returning Jack")
return {'firstname': "Jack", 'surname': "Connell"}
return None
async def get_person(self, n, _id):
# the process for checking if the request response returns a person with the target surname
if (person := self.get_person_request(n))["surname"] == "Murphy":
for i, a in self.p:
if i != _id:
a.cancel()
return person
async def get_persons(self, names):
print("Setting tasks...")
self.p = [(i, asyncio.create_task(self.get_person(a, i)))
for i, a in enumerate(names)]
print("Gathering async results...")
persons = await asyncio.gather(*[a for _, a in self.p])
return [person for person in persons if isinstance(person, dict)][0]
def test():
val = asyncio.run(Persons().get_persons(['Bob', 'Alice', 'Jack']))
print(val)
脚本的输出看起来像
Setting tasks...
Gathering async results...
Searching Bob
Returning Bob
asyncio.exceptions.CancelledError
我希望输出看起来像
Setting tasks...
Gathering async results...
Searching Bob
Searching Alice
Searching Jack
Returning Bob
{'firstname': 'Bob', 'surname': 'Murphy'}
这里有 2 个问题:
- 为什么每个 get_person 任务不是 运行 异步执行的?
- 如何处理
gather()
不允许取消任务的异常?
您可以使用 asyncio.create_task
生成可并行 运行 的可取消任务。你可以把这些任务存成一个列表,然后当get_person
记录一个姓"Murphy"
的时候,剩下的都可以取消了。
根据您的完整示例编辑的解决方案:
import asyncio, time
class Persons:
def __init__(self):
self.p = []
async def get_person_request(self, name):
if name == "Alice":
print("Searching Alice")
await asyncio.sleep(6)
print("Returning Alice")
return {'firstname': "Alice", 'surname': "Donnelly"}
if name == "Bob":
print("Searching Bob")
await asyncio.sleep(1)
print("Returning Bob")
return {'firstname': "Bob", 'surname': "Murphy"}
if name == "Jack":
print("Searching Jack")
await asyncio.sleep(3)
print("Returning Jack")
return {'firstname': "Jack", 'surname': "Connell"}
async def get_person(self, n, _id):
if (person:=await self.get_person_request(n))["surname"] == "Murphy": #the process for checking if the request response returns a person with the target surname
for i, a in self.p:
if i != _id:
a.cancel()
return person
async def get_persons(self, names):
self.p = [(i, asyncio.create_task(self.get_person(a, i))) for i, a in enumerate(names)]
return await asyncio.gather(*[a for _, a in self.p], return_exceptions=True)
t = time.time()
asyncio.run(Persons().get_persons(['Jack', 'Alice', 'Bob']))
print(time.time() - t)
输出:
1.0074191093444824 #taking ~1 second to produce the desired result, as expected
这是您可以执行的操作的简单示例。当然,在这种情况下,没有实际的服务请求,只是休眠 - 但无论如何 .cancel()
都应该工作。
from random import choice, randint
from datetime import datetime
import asyncio
async def retrieve_person():
# this just generates a random first and last name combo
first = choice(['Alice', 'Bob', 'Charlie', 'Dave'])
# giving 'Murphy' a decent chance of showing up
last = choice(['Baker', 'Murphy', 'Smith', 'Murphy'])
# anywhere between 3 and 8 seconds for each 'request'
duration = randint(3, 8)
print(f'Taking {duration} seconds to get {first} {last}')
await asyncio.sleep(duration)
return {'first': first, 'last': last}
async def main():
# kick off all the asynchronous tasks, without knowing which will finish
# first and whether any of them will get us a result we actually need.
aws = [
asyncio.create_task(retrieve_person()),
asyncio.create_task(retrieve_person()),
asyncio.create_task(retrieve_person())
]
print(f'Starting {datetime.now()}')
person = None
for coro in asyncio.as_completed(aws):
person = await coro
if person['last'] == 'Murphy':
# cancel the rest and stop looping
for other_coro in aws:
other_coro.cancel()
break
else:
person = None
print(f'Done {datetime.now()}: {person}')
asyncio.run(main())
如果您不幸看到带有“Murphy”的清晰示例,您可能需要 运行 几次,我没有努力总是包含一个,但代码确实表明如果没有积极因素,最后一个请求只会让你得到 None
.
示例输出:
Starting 2021-10-10 14:23:26.764063
Taking 8 seconds to get Alice Murphy
Taking 6 seconds to get Bob Murphy
Taking 3 seconds to get Alice Baker
Done 2021-10-10 14:23:32.762239: {'first': 'Bob', 'last': 'Murphy'}
请注意,在这个例子中,有两个可能的匹配项,但 Bob 在大约 6 秒后获胜,程序完成,永远不会找到 Alice。
下面的示例纯粹是理论上的,用于传达我在这里想要实现的目标。
我有几个名字 - 杰克、爱丽丝、鲍勃
其中一个人姓墨菲
我可以通过网络调用来检索全名。一找到姓“墨菲”的人就开心
async def get_persons():
persons = await asyncio.gather(
get_person("Jack"),
get_person("Alice"),
get_person("Bob"))
for person in persons:
if person.surname == "Murphy":
return person
def main():
person = asyncio.run(get_persons())
print(f"{person.first_name} has surname {person.last_name}")
当然这里的问题是我们必须等待所有 3 个请求完成。
所以最短等待时间是所有 3 个请求中的最大请求时间。
有3个网络请求。
假设第一个需要3秒,第二个需要1秒,第三个需要6秒
运行此代码需要 6 秒。
但是我们看到第二个请求(Alice)的姓氏是Murphy,而且显然是在1秒后完成。
我们可以忽略其他两个网络请求,此时只 return 吗?
所以最终,整个过程只需要 1 秒,而不是 6 秒。
编辑:
(代码更新以反映 Ajax1234 的解决方案)
class Persons:
def __init__(self):
self.p = []
def get_person_request(self, name):
if name == "Alice":
print("Searching Alice")
time.sleep(6)
print("Returning Alice")
return {'firstname': "Alice", 'surname': "Donnelly"}
if name == "Bob":
print("Searching Bob")
time.sleep(3)
print("Returning Bob")
return {'firstname': "Bob", 'surname': "Murphy"}
if name == "Jack":
print("Searching Jack")
time.sleep(8)
print("Returning Jack")
return {'firstname': "Jack", 'surname': "Connell"}
return None
async def get_person(self, n, _id):
# the process for checking if the request response returns a person with the target surname
if (person := self.get_person_request(n))["surname"] == "Murphy":
for i, a in self.p:
if i != _id:
a.cancel()
return person
async def get_persons(self, names):
print("Setting tasks...")
self.p = [(i, asyncio.create_task(self.get_person(a, i)))
for i, a in enumerate(names)]
print("Gathering async results...")
persons = await asyncio.gather(*[a for _, a in self.p])
return [person for person in persons if isinstance(person, dict)][0]
def test():
val = asyncio.run(Persons().get_persons(['Bob', 'Alice', 'Jack']))
print(val)
脚本的输出看起来像
Setting tasks...
Gathering async results...
Searching Bob
Returning Bob
asyncio.exceptions.CancelledError
我希望输出看起来像
Setting tasks...
Gathering async results...
Searching Bob
Searching Alice
Searching Jack
Returning Bob
{'firstname': 'Bob', 'surname': 'Murphy'}
这里有 2 个问题:
- 为什么每个 get_person 任务不是 运行 异步执行的?
- 如何处理
gather()
不允许取消任务的异常?
您可以使用 asyncio.create_task
生成可并行 运行 的可取消任务。你可以把这些任务存成一个列表,然后当get_person
记录一个姓"Murphy"
的时候,剩下的都可以取消了。
根据您的完整示例编辑的解决方案:
import asyncio, time
class Persons:
def __init__(self):
self.p = []
async def get_person_request(self, name):
if name == "Alice":
print("Searching Alice")
await asyncio.sleep(6)
print("Returning Alice")
return {'firstname': "Alice", 'surname': "Donnelly"}
if name == "Bob":
print("Searching Bob")
await asyncio.sleep(1)
print("Returning Bob")
return {'firstname': "Bob", 'surname': "Murphy"}
if name == "Jack":
print("Searching Jack")
await asyncio.sleep(3)
print("Returning Jack")
return {'firstname': "Jack", 'surname': "Connell"}
async def get_person(self, n, _id):
if (person:=await self.get_person_request(n))["surname"] == "Murphy": #the process for checking if the request response returns a person with the target surname
for i, a in self.p:
if i != _id:
a.cancel()
return person
async def get_persons(self, names):
self.p = [(i, asyncio.create_task(self.get_person(a, i))) for i, a in enumerate(names)]
return await asyncio.gather(*[a for _, a in self.p], return_exceptions=True)
t = time.time()
asyncio.run(Persons().get_persons(['Jack', 'Alice', 'Bob']))
print(time.time() - t)
输出:
1.0074191093444824 #taking ~1 second to produce the desired result, as expected
这是您可以执行的操作的简单示例。当然,在这种情况下,没有实际的服务请求,只是休眠 - 但无论如何 .cancel()
都应该工作。
from random import choice, randint
from datetime import datetime
import asyncio
async def retrieve_person():
# this just generates a random first and last name combo
first = choice(['Alice', 'Bob', 'Charlie', 'Dave'])
# giving 'Murphy' a decent chance of showing up
last = choice(['Baker', 'Murphy', 'Smith', 'Murphy'])
# anywhere between 3 and 8 seconds for each 'request'
duration = randint(3, 8)
print(f'Taking {duration} seconds to get {first} {last}')
await asyncio.sleep(duration)
return {'first': first, 'last': last}
async def main():
# kick off all the asynchronous tasks, without knowing which will finish
# first and whether any of them will get us a result we actually need.
aws = [
asyncio.create_task(retrieve_person()),
asyncio.create_task(retrieve_person()),
asyncio.create_task(retrieve_person())
]
print(f'Starting {datetime.now()}')
person = None
for coro in asyncio.as_completed(aws):
person = await coro
if person['last'] == 'Murphy':
# cancel the rest and stop looping
for other_coro in aws:
other_coro.cancel()
break
else:
person = None
print(f'Done {datetime.now()}: {person}')
asyncio.run(main())
如果您不幸看到带有“Murphy”的清晰示例,您可能需要 运行 几次,我没有努力总是包含一个,但代码确实表明如果没有积极因素,最后一个请求只会让你得到 None
.
示例输出:
Starting 2021-10-10 14:23:26.764063
Taking 8 seconds to get Alice Murphy
Taking 6 seconds to get Bob Murphy
Taking 3 seconds to get Alice Baker
Done 2021-10-10 14:23:32.762239: {'first': 'Bob', 'last': 'Murphy'}
请注意,在这个例子中,有两个可能的匹配项,但 Bob 在大约 6 秒后获胜,程序完成,永远不会找到 Alice。