与 python Flask 异步
Async with python Flask
我有这个函数来呈现 API
页面:
@app.route('/api', methods=['GET', 'POST'])
async def index():
response = {}
if request.method == 'GET' and request.args.get('sku'):
u = "https://....."
r = async_store_check(u)
response['status'] = 200
response['data'] = await r
return response
..... # other code
和这个发出请求的函数:
def async_store_check(url):
"""
performs asynchronous get requests
"""
async def get_all(url):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
await parseResponse(response)
return await asyncio.gather(url)
return get_all(url)
并解析响应:
def parseResponse(res):
"""
parses response generated by the endpoint
"""
if res.status_code == 200:
# A successful response, parse the response
response = res.json()
return response
然后我 运行 使用女服务员的应用程序:
if __name__ == '__main__':
print("Server is running on port 8000, Url: http://localhost:8000")
print("Press Ctrl+C to quit")
serve(app, host="0.0.0.0", port=8000)
但我得到:
TypeError: An asyncio.Future, a coroutine or an awaitable is required
请注意,我对 asyncio 和一般的 flask 还很陌生,因此非常感谢任何有关解决方案的建议。
回溯
ERROR:main:Exception on /api [GET] Traceback (most recent call last):
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 2070, in wsgi_app
response = self.full_dispatch_request()
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1515, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1513, in full_dispatch_request
rv = self.dispatch_request()
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1499, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "/Library/Python/3.8/site-packages/asgiref/sync.py", line 223, in __call__
return call_result.result()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/Library/Python/3.8/site-packages/asgiref/sync.py", line 292, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/Users/s/Library/Mobile Documents/com~apple~CloudDocs/s/s/main.py", line 22, in index
**response['data'] = await r
File "/Users/s/Library/Mobile Documents/com~apple~CloudDocs/s/s/main.py", line 58, in get_all
return await asyncio.gather(url)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 824, in gather
fut = ensure_future(arg, loop=loop)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 684, in ensure_future
raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' TypeError: An asyncio.Future, a coroutine or an awaitable is required
Flas 是同步库。您最好使用请求重写您的异步函数。 https://docs.python-requests.org/en/latest/
示例:
import requests
def store_check(url):
response = requests.get(url)
return response.text # return response.json()
或者你可以使用 FastApi。它是用于 Web 开发的异步库。
https://fastapi.tiangolo.com/
我有这个函数来呈现 API
页面:
@app.route('/api', methods=['GET', 'POST'])
async def index():
response = {}
if request.method == 'GET' and request.args.get('sku'):
u = "https://....."
r = async_store_check(u)
response['status'] = 200
response['data'] = await r
return response
..... # other code
和这个发出请求的函数:
def async_store_check(url):
"""
performs asynchronous get requests
"""
async def get_all(url):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
await parseResponse(response)
return await asyncio.gather(url)
return get_all(url)
并解析响应:
def parseResponse(res):
"""
parses response generated by the endpoint
"""
if res.status_code == 200:
# A successful response, parse the response
response = res.json()
return response
然后我 运行 使用女服务员的应用程序:
if __name__ == '__main__':
print("Server is running on port 8000, Url: http://localhost:8000")
print("Press Ctrl+C to quit")
serve(app, host="0.0.0.0", port=8000)
但我得到:
TypeError: An asyncio.Future, a coroutine or an awaitable is required
请注意,我对 asyncio 和一般的 flask 还很陌生,因此非常感谢任何有关解决方案的建议。
回溯
ERROR:main:Exception on /api [GET] Traceback (most recent call last):
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 2070, in wsgi_app
response = self.full_dispatch_request()
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1515, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1513, in full_dispatch_request
rv = self.dispatch_request()
File "/Users/s/Library/Python/3.8/lib/python/site-packages/flask/app.py", line 1499, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
File "/Library/Python/3.8/site-packages/asgiref/sync.py", line 223, in __call__
return call_result.result()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/Library/Python/3.8/site-packages/asgiref/sync.py", line 292, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/Users/s/Library/Mobile Documents/com~apple~CloudDocs/s/s/main.py", line 22, in index
**response['data'] = await r
File "/Users/s/Library/Mobile Documents/com~apple~CloudDocs/s/s/main.py", line 58, in get_all
return await asyncio.gather(url)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 824, in gather
fut = ensure_future(arg, loop=loop)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 684, in ensure_future
raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' TypeError: An asyncio.Future, a coroutine or an awaitable is required
Flas 是同步库。您最好使用请求重写您的异步函数。 https://docs.python-requests.org/en/latest/ 示例:
import requests
def store_check(url):
response = requests.get(url)
return response.text # return response.json()
或者你可以使用 FastApi。它是用于 Web 开发的异步库。 https://fastapi.tiangolo.com/