用于通道 websocket 身份验证的 Django jwt 中间件

Django jwt middleware for channels websocket authentication

我正在尝试为 Django 通道设置身份验证中间件。我希望这个中间件只对 websocket 请求有效。

似乎在这种情况下我没有获得完整的中间件功能。例如,我无法让 response = self.get_response(scope) 工作:

'TokenAuthMiddleware' object has no attribute 'get_response'

现在这个中间件一切正常(它仅针对 websocket 请求激活,未在 settings.py 中注册),除了我需要一种方法来修改响应状态代码(阻止匿名用户并设置ExpiredSignatureError 的错误代码)。任何帮助表示赞赏。我使用 Django 2.0.6 和渠道 2.1.1djangorestframework-jwt

jwt 认证

中间件:

import jwt, re
import traceback
import logging

from channels.auth import AuthMiddlewareStack
from django.contrib.auth.models import AnonymousUser
from django.conf import LazySettings
from jwt import InvalidSignatureError, ExpiredSignatureError, DecodeError

from project.models import MyUser

settings = LazySettings()
logger = logging.getLogger(__name__)


class TokenAuthMiddleware:
    """
    Token authorization middleware for Django Channels 2

    """

    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        headers = dict(scope['headers'])
        auth_header = None
        if b'authorization' in headers:
            auth_header = headers[b'authorization'].decode()
        else:
            try:
                auth_header = _str_to_dict(headers[b'cookie'].decode())['X-Authorization']
            except:
                pass

        logger.info(auth_header)

        if auth_header:
            try:
                user_jwt = jwt.decode(
                    auth_header,
                    settings.SECRET_KEY,
                )
                scope['user'] = MyUser.objects.get(
                    id=user_jwt['user_id']
                )
            except (InvalidSignatureError, KeyError, ExpiredSignatureError, DecodeError):
                traceback.print_exc()
                pass
            except Exception as e:  # NoQA
                logger.error(scope)
                traceback.print_exc()

        return self.inner(scope)


TokenAuthMiddlewareStack = lambda inner: TokenAuthMiddleware(AuthMiddlewareStack(inner))


def _str_to_dict(str):
    return {k: v.strip('"') for k, v in re.findall(r'(\S+)=(".*?"|\S+)', str)}

routing.py

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': TokenAuthMiddlewareStack(
        URLRouter(
            cmonitorserv.routing.websocket_urlpatterns
        )
    ),
})

无法找到使用中间件的解决方案。 现在通过在 consumers.py

中处理 auth 权限来解决
def _is_authenticated(self):
    if hasattr(self.scope, 'auth_error'):
        return False
    if not self.scope['user'] or self.scope['user'] is AnonymousUser:
        return False
    return True

另一件重要的事情似乎没有在任何地方记录 - 要拒绝与自定义错误代码的连接,我们需要先接受它。

class WebConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
        if self._is_authenticated():
              ....
        else:
            logger.error("ws client auth error")
            self.close(code=4003)