在不等待完成的情况下在 proxychecker 中取消 coroutines/tasks
Canceling coroutines/tasks in proxychecker without waiting for completion
我创建了一个 proxychecker,它在单独检查所有代理时运行良好。但我想实现功能,以便在键盘中断时它取消所有挂起的代理检查协程并优雅地退出。在当前状态下,在键盘中断时程序不会正常退出,我收到错误消息 - "Task was destroyed but it is pending!"
经过一些研究,我意识到这是因为我在协程完成取消之前关闭了事件循环。我决定尝试并尝试自己实现在这个 Whosebug post 中找到的解决方案:
但是我的实现不起作用;似乎执行卡在 loop.run_forever() 中,因为在键盘中断时,我的终端卡在处理中。
如果可能的话,我将非常感谢一个不涉及等待待处理任务完成的解决方案。目标功能是在键盘中断时,程序丢弃所有内容,发出报告并退出。
另外,我是 asyncio 的新手,所以对我如何构建我的程序的任何建设性批评也非常感激。
async def check_proxy(self, id, session):
proxy = self.plist[0]
del self.plist[0]
try:
self.stats["tries"] += 1
async with session.head(self.test_url, proxy=proxy, timeout=self.timeout) as response:
if response and response.status == 200:
self.stats["alive"] += 1
self.alive.append(proxy)
print(f"{id} has found a live proxy : " + proxy)
except Exception:
pass
async def main(self):
tasks = []
connector = ProxyConnector()
async with aiohttp.ClientSession(connector=connector, request_class=ProxyClientRequest) as session:
while len(self.plist) > 0:
if len(self.plist) >= self.threads:
for i in range(self.threads):
tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
else:
for i in range(len(self.plist)):
tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
await asyncio.gather(*tasks)
def start(self):
self.load_proxies()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self.main())
except KeyboardInterrupt:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever()
asyncio.Task.all_tasks().exception()
finally:
loop.close()
self.report_stats()
理想情况下,输出应如下所示:
...
55 已找到一个实时代理:socks5://81.10.222.118:1080
83 已找到实时代理:socks5://173.245.239.223:16938
111 已找到实时代理:socks5://138.68.41.90:1080
^C
尝试次数:160
存活人数:32
我花了一些时间,因为我在错误的轨道上分支,但这是解决方案,以防其他人 运行 遇到同样的问题。
except KeyboardInterrupt:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop() #-- rather unintuitive in my opinion, works though.
loop.run_forever()
finally:
loop.close()
非常简单的修复;不幸的是我没有早点意识到。希望这可以防止其他人陷入类似的疯狂状态。干杯!
我创建了一个 proxychecker,它在单独检查所有代理时运行良好。但我想实现功能,以便在键盘中断时它取消所有挂起的代理检查协程并优雅地退出。在当前状态下,在键盘中断时程序不会正常退出,我收到错误消息 - "Task was destroyed but it is pending!"
经过一些研究,我意识到这是因为我在协程完成取消之前关闭了事件循环。我决定尝试并尝试自己实现在这个 Whosebug post 中找到的解决方案:
但是我的实现不起作用;似乎执行卡在 loop.run_forever() 中,因为在键盘中断时,我的终端卡在处理中。
如果可能的话,我将非常感谢一个不涉及等待待处理任务完成的解决方案。目标功能是在键盘中断时,程序丢弃所有内容,发出报告并退出。
另外,我是 asyncio 的新手,所以对我如何构建我的程序的任何建设性批评也非常感激。
async def check_proxy(self, id, session):
proxy = self.plist[0]
del self.plist[0]
try:
self.stats["tries"] += 1
async with session.head(self.test_url, proxy=proxy, timeout=self.timeout) as response:
if response and response.status == 200:
self.stats["alive"] += 1
self.alive.append(proxy)
print(f"{id} has found a live proxy : " + proxy)
except Exception:
pass
async def main(self):
tasks = []
connector = ProxyConnector()
async with aiohttp.ClientSession(connector=connector, request_class=ProxyClientRequest) as session:
while len(self.plist) > 0:
if len(self.plist) >= self.threads:
for i in range(self.threads):
tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
else:
for i in range(len(self.plist)):
tasks.append(asyncio.ensure_future(self.check_proxy(i+1, session)))
await asyncio.gather(*tasks)
def start(self):
self.load_proxies()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(self.main())
except KeyboardInterrupt:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever()
asyncio.Task.all_tasks().exception()
finally:
loop.close()
self.report_stats()
理想情况下,输出应如下所示:
...
55 已找到一个实时代理:socks5://81.10.222.118:1080
83 已找到实时代理:socks5://173.245.239.223:16938
111 已找到实时代理:socks5://138.68.41.90:1080
^C
尝试次数:160
存活人数:32
我花了一些时间,因为我在错误的轨道上分支,但这是解决方案,以防其他人 运行 遇到同样的问题。
except KeyboardInterrupt:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop() #-- rather unintuitive in my opinion, works though.
loop.run_forever()
finally:
loop.close()
非常简单的修复;不幸的是我没有早点意识到。希望这可以防止其他人陷入类似的疯狂状态。干杯!