Uvicorn 异步工作者仍在同步工作

Uvicorn async workers are still working synchronously

简答题

我已将我的项目从 Django 2.2 迁移到 Django 3.2,现在我想开始使用异步视图的可能性。我创建了一个异步视图,设置了 asgi 配置,并使用 Uvicorn worker 运行 gunicorn。当 10 个用户同时聚集在这个服务器上时,他们会被同步服务。我需要配置什么才能为 10 个并发用户提供异步视图?

问题详细

这是我目前在本地环境中所做的:

    import os
    from django.core.asgi import get_asgi_application
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyService.settings.local')
    application = get_asgi_application()
    import asyncio
    import json
    from django.http import HttpResponse
    
    async def async_sleep(request):
        await asyncio.sleep(1)
        return HttpResponse(json.dumps({'mode': 'async', 'time': 1).encode())
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8732] [INFO] Starting gunicorn 20.1.0
[2022-01-26 14:37:14 +0100] [8732] [INFO] Listening at: http://127.0.0.1:8000 (8732)
[2022-01-26 14:37:14 +0100] [8732] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8733] [INFO] Booting worker with pid: 8733
[2022-01-26 13:37:15 +0000] [8733] [INFO] Started server process [8733]
[2022-01-26 13:37:15 +0000] [8733] [INFO] Waiting for application startup.
[2022-01-26 13:37:15 +0000] [8733] [INFO] ASGI 'lifespan' protocol appears unsupported.
[2022-01-26 13:37:15 +0000] [8733] [INFO] Application startup complete.

这最后一件事不是我所期望的。我希望工作人员在异步睡眠时已经接收到下一个请求。我是否缺少某些配置?

我也试过用Daphne代替Uvicorn,结果一样。

蝗虫

这就是我设置蝗虫的方式。

from locust import HttpUser, task
class SleepUser(HttpUser):
    @task
    def async_sleep(self):
        self.client.get('/api/async_sleep/')

中间件

这些是我的中间件设置

MIDDLEWARE = [
    'django_prometheus.middleware.PrometheusBeforeMiddleware',
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.gzip.GZipMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'shared.common.middleware.ApiLoggerMiddleware',
    'django_prometheus.middleware.PrometheusAfterMiddleware',
]

shared的ApiLoggerMiddleware是我们自己的代码,我先研究这个。这是它的实现。

import logging
import os
from typing import List

from django.http import HttpRequest, HttpResponse
from django.utils import timezone

from shared.common.authentication_service import BaseAuthenticationService


class ApiLoggerMiddleware:
    TOO_BIG_FOR_LOG_BYTES = 2 * 1024

    def __init__(self, get_response):
        # The get_response callable is provided by Django, it is a function
        # that takes a request and returns a response. Plainly put, once we're
        # done with the incoming request, we need to pass it along to get the
        # response which we need to ultimately return.
        self._get_response = get_response
        self.logger = logging.getLogger('api')
        self.pid = os.getpid()
        self.request_time = None
        self.response_time = None

    def __call__(self, request: HttpRequest) -> HttpResponse:
        common_data = self.on_request(request)
        response = self._get_response(request)
        self.on_response(response, common_data)
        return response

    def truncate_body(self, request: HttpRequest) -> str:
        return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"

    def on_request(self, request: HttpRequest) -> List[str]:
        self.request_time = timezone.now()

        remote_address = self.get_remote_address(request)
        user_agent = request.headers.get('User-Agent') or ''
        customer_uuid = self.get_customer_from_request_auth(request)
        method = request.method
        uri = request.get_raw_uri()

        common = [
            remote_address,
            user_agent,
            customer_uuid,
            method,
            uri
        ]

        in_line = [
                      "IN",
                      str(self.pid),
                      str(self.request_time),
                  ] + common + [
                      self.truncate_body(request)
                  ]

        self.logger.info(', '.join(in_line))
        return common

    def on_response(self, response: HttpResponse, common: List[str]) -> None:
        self.response_time = timezone.now()

        out_line = [
                       "OUT",
                       str(self.pid),
                       str(self.response_time)
                   ] + common + [
                       str(self.response_time - self.request_time),
                       str(response.status_code),
                   ]
        self.logger.info(", ".join(out_line))

    @classmethod
    def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
        token = request.headers.get('Authorization')
        if not token:
            return 'no token'
        try:
            payload = BaseAuthenticationService.validate_access_token(token)
            return payload.get('amsOrganizationId', '')
        except Exception:
            return 'unknown'

    @classmethod
    def get_remote_address(cls, request: HttpRequest) -> str:
        if 'X-Forwarded-For' in request.headers:
            # in case the request comes in through a proxy, the remote address
            # will be just the last proxy that passed it along, that's why we
            # have to get the remote from X-Forwarded-For
            # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
            addresses = request.headers['X-Forwarded-For'].split(',')
            client = addresses[0]
            return client
        else:
            return request.META.get('REMOTE_ADDR', '')

来源

我使用的来源:

当运行 gunicorn 命令时,您可以尝试使用选项 -w--workers.

添加 workers 参数

它默认为 1,如 gunicorn documentation 中所述。您可能想尝试增加该值。

用法示例:

gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker -w 10

此外,您可能需要检查文档中的其他工作人员属性,例如 worker_classthreads

你的ApiLoggerMiddleware是一个同步中间件。

来自https://docs.djangoproject.com/en/4.0/topics/async/#async-views强调我的:

You will only get the benefits of a fully-asynchronous request stack if you have no synchronous middleware loaded into your site. If there is a piece of synchronous middleware, then Django must use a thread per request to safely emulate a synchronous environment for it.

Middleware can be built to support both sync and async contexts. Some of Django’s middleware is built like this, but not all. To see what middleware Django has to adapt, you can turn on debug logging for the django.request logger and look for log messages about “Synchronous middleware … adapted”.

(日志消息当前显示“异步中间件...已适配”,错误报告在 #33495。)

通过将此添加到您的 LOGGING 设置来为 django.request 记录器打开调试日志记录:

'django.request': {
    'handlers': ['console'],
    'level': 'DEBUG',
},

解决方案

要使 ApiLoggerMiddleware 异步:

  1. 继承django.utils.deprecation.MiddlewareMixin
    • __init__ 中调用 super().__init__(get_response)
    • 删除__call__MiddlewareMixin.__call__ 使您的中间件异步。
  2. on_request 重构为 process_request
    • return None 而不是 common.
    • 改为将 common 附加到 requestrequest.common = common.
      记得更新对 request.common.
    • 的引用
    • request_time 附加到 request 而不是 self 以使其(和中间件)成为 thread-safe.
      记得更新对 request.request_time.
    • 的引用
  3. on_response(self, response, common) 重构为 process_response(self, request, response)
    • return response.
    • 不要将 response_time 附加到 self;将其保留为变量,因为它未在其他函数中使用。

结果:

class ApiLoggerMiddleware(MiddlewareMixin):
    TOO_BIG_FOR_LOG_BYTES = 2 * 1024

    def __init__(self, get_response):
        # The get_response callable is provided by Django, it is a function
        # that takes a request and returns a response. Plainly put, once we're
        # done with the incoming request, we need to pass it along to get the
        # response which we need to ultimately return.
        super().__init__(get_response)  # +
        self._get_response = get_response
        self.logger = logging.getLogger('api')
        self.pid = os.getpid()
        # self.request_time = None   # -
        # self.response_time = None  # -

    # def __call__(self, request: HttpRequest) -> HttpResponse:  # -
    #     common_data = self.on_request(request)                 # -
    #     response = self._get_response(request)                 # -
    #     self.on_response(response, common_data)                # -
    #     return response                                        # -

    def truncate_body(self, request: HttpRequest) -> str:
        return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"

    # def on_request(self, request: HttpRequest) -> List[str]:  # -
    def process_request(self, request: HttpRequest) -> None:    # +
        # self.request_time = timezone.now()   # -
        request.request_time = timezone.now()  # +

        remote_address = self.get_remote_address(request)
        user_agent = request.headers.get('User-Agent') or ''
        customer_uuid = self.get_customer_from_request_auth(request)
        method = request.method
        uri = request.get_raw_uri()

        common = [
            remote_address,
            user_agent,
            customer_uuid,
            method,
            uri
        ]

        in_line = [
            "IN",
            str(self.pid),
            # str(self.request_time),   # -
            str(request.request_time),  # +
        ] + common + [
            self.truncate_body(request)
        ]

        self.logger.info(', '.join(in_line))
        # return common          # -
        request.common = common  # +
        return None              # +

    # def on_response(self, response: HttpResponse, common: List[str]) -> None:                # -
    def process_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse:  # +
        # self.response_time = timezone.now()  # -
        response_time = timezone.now()         # +

        out_line = [
            "OUT",
            str(self.pid),
            # str(self.response_time)  # -
            str(response_time)         # +
            # ] + common + [                    # -
        ] + getattr(request, 'common', []) + [  # +
            # str(self.response_time - self.request_time),             # -
            str(response_time - getattr(request, 'request_time', 0)),  # +
            str(response.status_code),
        ]
        self.logger.info(", ".join(out_line))
        return response  # +

    @classmethod
    def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
        token = request.headers.get('Authorization')
        if not token:
            return 'no token'
        try:
            payload = BaseAuthenticationService.validate_access_token(token)
            return payload.get('amsOrganizationId', '')
        except Exception:
            return 'unknown'

    @classmethod
    def get_remote_address(cls, request: HttpRequest) -> str:
        if 'X-Forwarded-For' in request.headers:
            # in case the request comes in through a proxy, the remote address
            # will be just the last proxy that passed it along, that's why we
            # have to get the remote from X-Forwarded-For
            # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
            addresses = request.headers['X-Forwarded-For'].split(',')
            client = addresses[0]
            return client
        else:
            return request.META.get('REMOTE_ADDR', '')