Uvicorn 异步工作者仍在同步工作
Uvicorn async workers are still working synchronously
简答题
我已将我的项目从 Django 2.2 迁移到 Django 3.2,现在我想开始使用异步视图的可能性。我创建了一个异步视图,设置了 asgi 配置,并使用 Uvicorn worker 运行 gunicorn。当 10 个用户同时聚集在这个服务器上时,他们会被同步服务。我需要配置什么才能为 10 个并发用户提供异步视图?
问题详细
这是我目前在本地环境中所做的:
- 我正在使用 Django 3.2.10 和 Python 3.9.
- 我已经通过 pip 安装了
gunicorn
和 uvicorn
- 我创建了一个包含以下内容的
asgi.py
文件
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyService.settings.local')
application = get_asgi_application()
- 我创建了一个具有以下实现的视图,并将其连接到
urlpatterns
:
import asyncio
import json
from django.http import HttpResponse
async def async_sleep(request):
await asyncio.sleep(1)
return HttpResponse(json.dumps({'mode': 'async', 'time': 1).encode())
- 我在本地 运行 一个带有 Uvicorn worker 的 gunicorn 服务器:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8732] [INFO] Starting gunicorn 20.1.0
[2022-01-26 14:37:14 +0100] [8732] [INFO] Listening at: http://127.0.0.1:8000 (8732)
[2022-01-26 14:37:14 +0100] [8732] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8733] [INFO] Booting worker with pid: 8733
[2022-01-26 13:37:15 +0000] [8733] [INFO] Started server process [8733]
[2022-01-26 13:37:15 +0000] [8733] [INFO] Waiting for application startup.
[2022-01-26 13:37:15 +0000] [8733] [INFO] ASGI 'lifespan' protocol appears unsupported.
[2022-01-26 13:37:15 +0000] [8733] [INFO] Application startup complete.
- 我从本地客户端点击了一次 API。 1 秒后,我得到了 200 OK,正如预期的那样。
- 我设置了一个 Locust 服务器来生成并发用户。当我让它向 1 个并发用户发出请求时,每 1 秒完成一次 API 调用。
- 当我让它向 10 个并发用户发出请求时,每 1 秒完成一个 API 调用。所有其他请求正在等待。
这最后一件事不是我所期望的。我希望工作人员在异步睡眠时已经接收到下一个请求。我是否缺少某些配置?
我也试过用Daphne代替Uvicorn,结果一样。
蝗虫
这就是我设置蝗虫的方式。
- 启动一个新的虚拟环境
pip install locust
- 创建一个
locustfile.py
,内容如下:
from locust import HttpUser, task
class SleepUser(HttpUser):
@task
def async_sleep(self):
self.client.get('/api/async_sleep/')
- 运行 来自 shell
的 locust 可执行文件
- 在浏览器中访问http://0.0.0.0:8089
- 将工人数量设置为 10,产卵率设置为 1,宿主设置为 http://127.0.0.1:8000
中间件
这些是我的中间件设置
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.gzip.GZipMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.security.SecurityMiddleware',
'shared.common.middleware.ApiLoggerMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware',
]
shared的ApiLoggerMiddleware是我们自己的代码,我先研究这个。这是它的实现。
import logging
import os
from typing import List
from django.http import HttpRequest, HttpResponse
from django.utils import timezone
from shared.common.authentication_service import BaseAuthenticationService
class ApiLoggerMiddleware:
TOO_BIG_FOR_LOG_BYTES = 2 * 1024
def __init__(self, get_response):
# The get_response callable is provided by Django, it is a function
# that takes a request and returns a response. Plainly put, once we're
# done with the incoming request, we need to pass it along to get the
# response which we need to ultimately return.
self._get_response = get_response
self.logger = logging.getLogger('api')
self.pid = os.getpid()
self.request_time = None
self.response_time = None
def __call__(self, request: HttpRequest) -> HttpResponse:
common_data = self.on_request(request)
response = self._get_response(request)
self.on_response(response, common_data)
return response
def truncate_body(self, request: HttpRequest) -> str:
return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
def on_request(self, request: HttpRequest) -> List[str]:
self.request_time = timezone.now()
remote_address = self.get_remote_address(request)
user_agent = request.headers.get('User-Agent') or ''
customer_uuid = self.get_customer_from_request_auth(request)
method = request.method
uri = request.get_raw_uri()
common = [
remote_address,
user_agent,
customer_uuid,
method,
uri
]
in_line = [
"IN",
str(self.pid),
str(self.request_time),
] + common + [
self.truncate_body(request)
]
self.logger.info(', '.join(in_line))
return common
def on_response(self, response: HttpResponse, common: List[str]) -> None:
self.response_time = timezone.now()
out_line = [
"OUT",
str(self.pid),
str(self.response_time)
] + common + [
str(self.response_time - self.request_time),
str(response.status_code),
]
self.logger.info(", ".join(out_line))
@classmethod
def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
token = request.headers.get('Authorization')
if not token:
return 'no token'
try:
payload = BaseAuthenticationService.validate_access_token(token)
return payload.get('amsOrganizationId', '')
except Exception:
return 'unknown'
@classmethod
def get_remote_address(cls, request: HttpRequest) -> str:
if 'X-Forwarded-For' in request.headers:
# in case the request comes in through a proxy, the remote address
# will be just the last proxy that passed it along, that's why we
# have to get the remote from X-Forwarded-For
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
addresses = request.headers['X-Forwarded-For'].split(',')
client = addresses[0]
return client
else:
return request.META.get('REMOTE_ADDR', '')
来源
我使用的来源:
当运行 gunicorn
命令时,您可以尝试使用选项 -w
或 --workers
.
添加 workers
参数
它默认为 1
,如 gunicorn documentation 中所述。您可能想尝试增加该值。
用法示例:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker -w 10
此外,您可能需要检查文档中的其他工作人员属性,例如 worker_class
和 threads
你的ApiLoggerMiddleware
是一个同步中间件。
来自https://docs.djangoproject.com/en/4.0/topics/async/#async-views,强调我的:
You will only get the benefits of a fully-asynchronous request stack if you have no synchronous middleware loaded into your site. If there is a piece of synchronous middleware, then Django must use a thread per request to safely emulate a synchronous environment for it.
Middleware can be built to support both sync and async contexts. Some of Django’s middleware is built like this, but not all. To see what middleware Django has to adapt, you can turn on debug logging for the django.request
logger and look for log messages about “Synchronous middleware … adapted”.
(日志消息当前显示“异步中间件...已适配”,错误报告在 #33495。)
通过将此添加到您的 LOGGING
设置来为 django.request
记录器打开调试日志记录:
'django.request': {
'handlers': ['console'],
'level': 'DEBUG',
},
解决方案
要使 ApiLoggerMiddleware
异步:
- 继承
django.utils.deprecation.MiddlewareMixin
。
- 在
__init__
中调用 super().__init__(get_response)
。
- 删除
__call__
; MiddlewareMixin.__call__
使您的中间件异步。
- 将
on_request
重构为 process_request
。
- return
None
而不是 common
.
- 改为将
common
附加到 request
:request.common = common
.
记得更新对 request.common
. 的引用
- 将
request_time
附加到 request
而不是 self
以使其(和中间件)成为 thread-safe.
记得更新对 request.request_time
. 的引用
- 将
on_response(self, response, common)
重构为 process_response(self, request, response)
。
- return
response
.
- 不要将
response_time
附加到 self
;将其保留为变量,因为它未在其他函数中使用。
结果:
class ApiLoggerMiddleware(MiddlewareMixin):
TOO_BIG_FOR_LOG_BYTES = 2 * 1024
def __init__(self, get_response):
# The get_response callable is provided by Django, it is a function
# that takes a request and returns a response. Plainly put, once we're
# done with the incoming request, we need to pass it along to get the
# response which we need to ultimately return.
super().__init__(get_response) # +
self._get_response = get_response
self.logger = logging.getLogger('api')
self.pid = os.getpid()
# self.request_time = None # -
# self.response_time = None # -
# def __call__(self, request: HttpRequest) -> HttpResponse: # -
# common_data = self.on_request(request) # -
# response = self._get_response(request) # -
# self.on_response(response, common_data) # -
# return response # -
def truncate_body(self, request: HttpRequest) -> str:
return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
# def on_request(self, request: HttpRequest) -> List[str]: # -
def process_request(self, request: HttpRequest) -> None: # +
# self.request_time = timezone.now() # -
request.request_time = timezone.now() # +
remote_address = self.get_remote_address(request)
user_agent = request.headers.get('User-Agent') or ''
customer_uuid = self.get_customer_from_request_auth(request)
method = request.method
uri = request.get_raw_uri()
common = [
remote_address,
user_agent,
customer_uuid,
method,
uri
]
in_line = [
"IN",
str(self.pid),
# str(self.request_time), # -
str(request.request_time), # +
] + common + [
self.truncate_body(request)
]
self.logger.info(', '.join(in_line))
# return common # -
request.common = common # +
return None # +
# def on_response(self, response: HttpResponse, common: List[str]) -> None: # -
def process_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse: # +
# self.response_time = timezone.now() # -
response_time = timezone.now() # +
out_line = [
"OUT",
str(self.pid),
# str(self.response_time) # -
str(response_time) # +
# ] + common + [ # -
] + getattr(request, 'common', []) + [ # +
# str(self.response_time - self.request_time), # -
str(response_time - getattr(request, 'request_time', 0)), # +
str(response.status_code),
]
self.logger.info(", ".join(out_line))
return response # +
@classmethod
def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
token = request.headers.get('Authorization')
if not token:
return 'no token'
try:
payload = BaseAuthenticationService.validate_access_token(token)
return payload.get('amsOrganizationId', '')
except Exception:
return 'unknown'
@classmethod
def get_remote_address(cls, request: HttpRequest) -> str:
if 'X-Forwarded-For' in request.headers:
# in case the request comes in through a proxy, the remote address
# will be just the last proxy that passed it along, that's why we
# have to get the remote from X-Forwarded-For
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
addresses = request.headers['X-Forwarded-For'].split(',')
client = addresses[0]
return client
else:
return request.META.get('REMOTE_ADDR', '')
简答题
我已将我的项目从 Django 2.2 迁移到 Django 3.2,现在我想开始使用异步视图的可能性。我创建了一个异步视图,设置了 asgi 配置,并使用 Uvicorn worker 运行 gunicorn。当 10 个用户同时聚集在这个服务器上时,他们会被同步服务。我需要配置什么才能为 10 个并发用户提供异步视图?
问题详细
这是我目前在本地环境中所做的:
- 我正在使用 Django 3.2.10 和 Python 3.9.
- 我已经通过 pip 安装了
gunicorn
和uvicorn
- 我创建了一个包含以下内容的
asgi.py
文件
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyService.settings.local')
application = get_asgi_application()
- 我创建了一个具有以下实现的视图,并将其连接到
urlpatterns
:
import asyncio
import json
from django.http import HttpResponse
async def async_sleep(request):
await asyncio.sleep(1)
return HttpResponse(json.dumps({'mode': 'async', 'time': 1).encode())
- 我在本地 运行 一个带有 Uvicorn worker 的 gunicorn 服务器:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8732] [INFO] Starting gunicorn 20.1.0
[2022-01-26 14:37:14 +0100] [8732] [INFO] Listening at: http://127.0.0.1:8000 (8732)
[2022-01-26 14:37:14 +0100] [8732] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8733] [INFO] Booting worker with pid: 8733
[2022-01-26 13:37:15 +0000] [8733] [INFO] Started server process [8733]
[2022-01-26 13:37:15 +0000] [8733] [INFO] Waiting for application startup.
[2022-01-26 13:37:15 +0000] [8733] [INFO] ASGI 'lifespan' protocol appears unsupported.
[2022-01-26 13:37:15 +0000] [8733] [INFO] Application startup complete.
- 我从本地客户端点击了一次 API。 1 秒后,我得到了 200 OK,正如预期的那样。
- 我设置了一个 Locust 服务器来生成并发用户。当我让它向 1 个并发用户发出请求时,每 1 秒完成一次 API 调用。
- 当我让它向 10 个并发用户发出请求时,每 1 秒完成一个 API 调用。所有其他请求正在等待。
这最后一件事不是我所期望的。我希望工作人员在异步睡眠时已经接收到下一个请求。我是否缺少某些配置?
我也试过用Daphne代替Uvicorn,结果一样。
蝗虫
这就是我设置蝗虫的方式。
- 启动一个新的虚拟环境
pip install locust
- 创建一个
locustfile.py
,内容如下:
from locust import HttpUser, task
class SleepUser(HttpUser):
@task
def async_sleep(self):
self.client.get('/api/async_sleep/')
- 运行 来自 shell 的 locust 可执行文件
- 在浏览器中访问http://0.0.0.0:8089
- 将工人数量设置为 10,产卵率设置为 1,宿主设置为 http://127.0.0.1:8000
中间件
这些是我的中间件设置
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.gzip.GZipMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.security.SecurityMiddleware',
'shared.common.middleware.ApiLoggerMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware',
]
shared的ApiLoggerMiddleware是我们自己的代码,我先研究这个。这是它的实现。
import logging
import os
from typing import List
from django.http import HttpRequest, HttpResponse
from django.utils import timezone
from shared.common.authentication_service import BaseAuthenticationService
class ApiLoggerMiddleware:
TOO_BIG_FOR_LOG_BYTES = 2 * 1024
def __init__(self, get_response):
# The get_response callable is provided by Django, it is a function
# that takes a request and returns a response. Plainly put, once we're
# done with the incoming request, we need to pass it along to get the
# response which we need to ultimately return.
self._get_response = get_response
self.logger = logging.getLogger('api')
self.pid = os.getpid()
self.request_time = None
self.response_time = None
def __call__(self, request: HttpRequest) -> HttpResponse:
common_data = self.on_request(request)
response = self._get_response(request)
self.on_response(response, common_data)
return response
def truncate_body(self, request: HttpRequest) -> str:
return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
def on_request(self, request: HttpRequest) -> List[str]:
self.request_time = timezone.now()
remote_address = self.get_remote_address(request)
user_agent = request.headers.get('User-Agent') or ''
customer_uuid = self.get_customer_from_request_auth(request)
method = request.method
uri = request.get_raw_uri()
common = [
remote_address,
user_agent,
customer_uuid,
method,
uri
]
in_line = [
"IN",
str(self.pid),
str(self.request_time),
] + common + [
self.truncate_body(request)
]
self.logger.info(', '.join(in_line))
return common
def on_response(self, response: HttpResponse, common: List[str]) -> None:
self.response_time = timezone.now()
out_line = [
"OUT",
str(self.pid),
str(self.response_time)
] + common + [
str(self.response_time - self.request_time),
str(response.status_code),
]
self.logger.info(", ".join(out_line))
@classmethod
def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
token = request.headers.get('Authorization')
if not token:
return 'no token'
try:
payload = BaseAuthenticationService.validate_access_token(token)
return payload.get('amsOrganizationId', '')
except Exception:
return 'unknown'
@classmethod
def get_remote_address(cls, request: HttpRequest) -> str:
if 'X-Forwarded-For' in request.headers:
# in case the request comes in through a proxy, the remote address
# will be just the last proxy that passed it along, that's why we
# have to get the remote from X-Forwarded-For
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
addresses = request.headers['X-Forwarded-For'].split(',')
client = addresses[0]
return client
else:
return request.META.get('REMOTE_ADDR', '')
来源
我使用的来源:
当运行 gunicorn
命令时,您可以尝试使用选项 -w
或 --workers
.
workers
参数
它默认为 1
,如 gunicorn documentation 中所述。您可能想尝试增加该值。
用法示例:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker -w 10
此外,您可能需要检查文档中的其他工作人员属性,例如 worker_class
和 threads
你的ApiLoggerMiddleware
是一个同步中间件。
来自https://docs.djangoproject.com/en/4.0/topics/async/#async-views,强调我的:
You will only get the benefits of a fully-asynchronous request stack if you have no synchronous middleware loaded into your site. If there is a piece of synchronous middleware, then Django must use a thread per request to safely emulate a synchronous environment for it.
Middleware can be built to support both sync and async contexts. Some of Django’s middleware is built like this, but not all. To see what middleware Django has to adapt, you can turn on debug logging for the
django.request
logger and look for log messages about “Synchronous middleware … adapted”.
(日志消息当前显示“异步中间件...已适配”,错误报告在 #33495。)
通过将此添加到您的 LOGGING
设置来为 django.request
记录器打开调试日志记录:
'django.request': {
'handlers': ['console'],
'level': 'DEBUG',
},
解决方案
要使 ApiLoggerMiddleware
异步:
- 继承
django.utils.deprecation.MiddlewareMixin
。- 在
__init__
中调用super().__init__(get_response)
。 - 删除
__call__
;MiddlewareMixin.__call__
使您的中间件异步。
- 在
- 将
on_request
重构为process_request
。- return
None
而不是common
. - 改为将
common
附加到request
:request.common = common
.
记得更新对request.common
. 的引用
- 将
request_time
附加到request
而不是self
以使其(和中间件)成为 thread-safe.
记得更新对request.request_time
. 的引用
- return
- 将
on_response(self, response, common)
重构为process_response(self, request, response)
。- return
response
. - 不要将
response_time
附加到self
;将其保留为变量,因为它未在其他函数中使用。
- return
结果:
class ApiLoggerMiddleware(MiddlewareMixin):
TOO_BIG_FOR_LOG_BYTES = 2 * 1024
def __init__(self, get_response):
# The get_response callable is provided by Django, it is a function
# that takes a request and returns a response. Plainly put, once we're
# done with the incoming request, we need to pass it along to get the
# response which we need to ultimately return.
super().__init__(get_response) # +
self._get_response = get_response
self.logger = logging.getLogger('api')
self.pid = os.getpid()
# self.request_time = None # -
# self.response_time = None # -
# def __call__(self, request: HttpRequest) -> HttpResponse: # -
# common_data = self.on_request(request) # -
# response = self._get_response(request) # -
# self.on_response(response, common_data) # -
# return response # -
def truncate_body(self, request: HttpRequest) -> str:
return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
# def on_request(self, request: HttpRequest) -> List[str]: # -
def process_request(self, request: HttpRequest) -> None: # +
# self.request_time = timezone.now() # -
request.request_time = timezone.now() # +
remote_address = self.get_remote_address(request)
user_agent = request.headers.get('User-Agent') or ''
customer_uuid = self.get_customer_from_request_auth(request)
method = request.method
uri = request.get_raw_uri()
common = [
remote_address,
user_agent,
customer_uuid,
method,
uri
]
in_line = [
"IN",
str(self.pid),
# str(self.request_time), # -
str(request.request_time), # +
] + common + [
self.truncate_body(request)
]
self.logger.info(', '.join(in_line))
# return common # -
request.common = common # +
return None # +
# def on_response(self, response: HttpResponse, common: List[str]) -> None: # -
def process_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse: # +
# self.response_time = timezone.now() # -
response_time = timezone.now() # +
out_line = [
"OUT",
str(self.pid),
# str(self.response_time) # -
str(response_time) # +
# ] + common + [ # -
] + getattr(request, 'common', []) + [ # +
# str(self.response_time - self.request_time), # -
str(response_time - getattr(request, 'request_time', 0)), # +
str(response.status_code),
]
self.logger.info(", ".join(out_line))
return response # +
@classmethod
def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
token = request.headers.get('Authorization')
if not token:
return 'no token'
try:
payload = BaseAuthenticationService.validate_access_token(token)
return payload.get('amsOrganizationId', '')
except Exception:
return 'unknown'
@classmethod
def get_remote_address(cls, request: HttpRequest) -> str:
if 'X-Forwarded-For' in request.headers:
# in case the request comes in through a proxy, the remote address
# will be just the last proxy that passed it along, that's why we
# have to get the remote from X-Forwarded-For
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
addresses = request.headers['X-Forwarded-For'].split(',')
client = addresses[0]
return client
else:
return request.META.get('REMOTE_ADDR', '')