如何使用 python 通过 Cognito 身份验证调用 AppSync 突变?

How to call an AppSync mutation with Cognito authentication using python?

是否可以使用 Python 通过 Cognito 身份验证调用 AppSync 变更?怎么样?

我正在尝试使用 boto3,但我没有找到执行 graphql 操作的方法。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appsync.html

您可以将 API 身份验证模式设为 "API KEY" 并使用 http.

调用 AppSync 变更

例如

import requests
import json

APPSYNC_API_KEY = 'da2-xxxxxxxxxxxxx'
APPSYNC_API_ENDPOINT_URL = 'https://xxxxxxxxxxxxx.appsync-api.us-west-2.amazonaws.com/graphql'

headers = {
    'Content-Type': "application/graphql",
    'x-api-key': APPSYNC_API_KEY,
    'cache-control': "no-cache",
}

def execute_gql(query):
    payload_obj = {"query": query}
    payload = json.dumps(payload_obj)
    response = requests.request("POST", APPSYNC_API_ENDPOINT_URL, data=payload, headers=headers)
    return response

假设您有一个名为 Items 的模型,您可以轻松地进行如下查询:

if __name__ == '__main__':
    print(execute_gql("query { listItems { items { id name } } }").json())

简单地用变异操作替换字符串。

graphql-python/gql supports AWS AppSync since version 3.0.0rc0.

它支持实时端点上的查询、变更甚至订阅。

它支持 IAM、api 密钥和 JWT(例如 Cognito)身份验证方法。

文档可用here

这是使用 API 密钥认证的变异示例:

import asyncio
import os
import sys
from urllib.parse import urlparse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication

# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)


async def main():

    # Should look like:
    # https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
    url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
    api_key = os.environ.get("AWS_GRAPHQL_API_KEY")

    if url is None or api_key is None:
        print("Missing environment variables")
        sys.exit()

    # Extract host from url
    host = str(urlparse(url).netloc)

    auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)

    transport = AIOHTTPTransport(url=url, auth=auth)

    async with Client(
        transport=transport, fetch_schema_from_transport=False,
    ) as session:

        query = gql(
            """
mutation createMessage($message: String!) {
  createMessage(input: {message: $message}) {
    id
    message
    createdAt
  }
}"""
        )

        variable_values = {"message": "Hello world!"}

        result = await session.execute(query, variable_values=variable_values)
        print(result)


asyncio.run(main())