Apollo 客户端订阅通过 Django Channels 中间件处理的 JWT 令牌

Apollo client subscription pass JWT token handled by Django Channels middleware

我在 Vue3 应用程序上使用 Graphql 订阅和 Apollo 客户端,在我的后端应用程序中使用 Django graphQL Channels 和 DjangoGraphqlJWT 包。

我正在尝试通过 connectionParams 在 Apollo 订阅上传递 JWT 令牌。

以下this解决方案。我实现了一个中间件。但是,Apollo 将 connectionParams 作为有效负载传递。我找不到在中间件级别访问有效负载的方法,只能在消费者上访问。

我可以从中间件的范围参数访问查询字符串 属性。但是,我找不到在启动订阅后传递查询参数的方法。

客户端:

import { setContext } from "apollo-link-context";
import { Storage } from "@capacitor/storage";

import {
  ApolloClient,
  createHttpLink,
  InMemoryCache,
  split,
} from "@apollo/client/core";
import { getMainDefinition } from "@apollo/client/utilities";
import { WebSocketLink } from "@apollo/client/link/ws";

const authLink = setContext(async (_: any, { headers }: any) => {
  const { value: authStr } = await Storage.get({ key: "auth" });

  let token;
  if (authStr) {
    const auth = JSON.parse(authStr);
    token = auth.token;
  }

  // return the headers to the context so HTTP link can read them
  return {
    headers: {
      ...headers,
      authorization: token ? `JWT ${token}` : null,
    },
  };
});

const httpLink = createHttpLink({
  uri: process.env.VUE_APP_GRAPHQL_URL || "http://0.0.0.0:8000/graphql",
});

const wsLink = new WebSocketLink({
  uri: process.env.VUE_APP_WS_GRAPHQL_URL || "ws://0.0.0.0:8000/ws/graphql/",
  options: {
    reconnect: true,
    connectionParams: async () => {
      const { value: authStr } = await Storage.get({ key: "auth" });
      let token;
      if (authStr) {
        const auth = JSON.parse(authStr);
        token = auth.token;
        console.log(token); // So far so good the token is logged.
        return {
          token: token,
        };
      }

      return {};
    },
  },
});

const link = split(
  // split based on operation type
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === "OperationDefinition" &&
      definition.operation === "subscription"
    );
  },
  wsLink,
  httpLink
);

const cache = new InMemoryCache();
export default new ApolloClient({
  // @ts-ignore
  link: authLink.concat(link),
  cache,
});

后端:

asgy.py

from tinga.routing import MyGraphqlWsConsumer
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from tinga.channels_middleware import JwtAuthMiddlewareStack
import os

from django.core.asgi import get_asgi_application
from django.conf.urls import url

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tinga.settings')

application = get_asgi_application()

# import websockets.routing

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": JwtAuthMiddlewareStack(
        URLRouter([
            url(r"^ws/graphql/$", MyGraphqlWsConsumer.as_asgi()),
        ])
    ),
})

channels_middleware.py

@database_sync_to_async
def get_user(email):
    try:
        user = User.objects.get(email=email)
        return user

    except User.DoesNotExist:
        return AnonymousUser()


class JwtAuthMiddleware(BaseMiddleware):
    def __init__(self, inner):
        self.inner = inner

    async def __call__(self, scope, receive, send):
        # Close old database connections to prevent usage of timed out connections
        close_old_connections()

        # Either find a way to get the payload from Apollo in order to get the token.
        # OR
        # Pass pass the token in query string in apollo when subscription is initiated.
        # print(scope) # query_string, headers, etc.

        # Get the token
        # decoded_data = jwt_decode(payload['token'])

        # scope["user"] = await get_user(email=decoded_data['email'])
        return await super().__call__(scope, receive, send)


def JwtAuthMiddlewareStack(inner):
    return JwtAuthMiddleware(AuthMiddlewareStack(inner))

据我了解,我只能访问中间件中的查询字符串/URL 参数,而不能访问 Apollo 负载。现在可以在查询字符串中传递令牌吗?但是,由于在提供 Apollo 客户端时令牌可能不存在,因此需要像 connectionParams 一样重新评估。

任何解决方法?

我设法在消费者负载中获取令牌并将用户注入到上下文中。

from tinga.schema import schema
import channels_graphql_ws
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from graphql_jwt.utils import jwt_decode
from core.models import User
from channels_graphql_ws.scope_as_context import ScopeAsContext


@database_sync_to_async
def get_user(email):
    try:
        user = User.objects.get(email=email)
        return user

    except User.DoesNotExist:
        return AnonymousUser()


class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
    """Channels WebSocket consumer which provides GraphQL API."""
    schema = schema

    # Uncomment to send keepalive message every 42 seconds.
    # send_keepalive_every = 42

    # Uncomment to process requests sequentially (useful for tests).
    # strict_ordering = True

    async def on_connect(self, payload):
        """New client connection handler."""
        # You can `raise` from here to reject the connection.
        print("New client connected!")

        # Create object-like context (like in `Query` or `Mutation`)
        # from the dict-like one provided by the Channels.
        context = ScopeAsContext(self.scope)

        if 'token' in payload:
            # Decode the token
            decoded_data = jwt_decode(payload['token'])

            # Inject the user
            context.user = await get_user(email=decoded_data['email'])

        else:
            context.user = AnonymousUser

然后在connectionParams中传递token

const wsLink = new WebSocketLink({
  uri: process.env.VUE_APP_WS_GRAPHQL_URL || "ws://0.0.0.0:8000/ws/graphql/",
  options: {
    reconnect: true,
    connectionParams: async () => {
      const { value: authStr } = await Storage.get({ key: "auth" });
      let token;
      if (authStr) {
        const auth = JSON.parse(authStr);
        token = auth.token;
        console.log(token); // So far so good the token is logged.
        return {
          token: token,
        };
      }

      return {};
    },
  },
});