如何使用 python 通过 Cognito 身份验证调用 AppSync 突变?
How to call an AppSync mutation with Cognito authentication using python?
是否可以使用 Python 通过 Cognito 身份验证调用 AppSync 变更?怎么样?
我正在尝试使用 boto3,但我没有找到执行 graphql 操作的方法。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appsync.html
您可以将 API 身份验证模式设为 "API KEY" 并使用 http.
调用 AppSync 变更
例如
import requests
import json
APPSYNC_API_KEY = 'da2-xxxxxxxxxxxxx'
APPSYNC_API_ENDPOINT_URL = 'https://xxxxxxxxxxxxx.appsync-api.us-west-2.amazonaws.com/graphql'
headers = {
'Content-Type': "application/graphql",
'x-api-key': APPSYNC_API_KEY,
'cache-control': "no-cache",
}
def execute_gql(query):
payload_obj = {"query": query}
payload = json.dumps(payload_obj)
response = requests.request("POST", APPSYNC_API_ENDPOINT_URL, data=payload, headers=headers)
return response
假设您有一个名为 Items
的模型,您可以轻松地进行如下查询:
if __name__ == '__main__':
print(execute_gql("query { listItems { items { id name } } }").json())
简单地用变异操作替换字符串。
graphql-python/gql supports AWS AppSync since version 3.0.0rc0.
它支持实时端点上的查询、变更甚至订阅。
它支持 IAM、api 密钥和 JWT(例如 Cognito)身份验证方法。
文档可用here
这是使用 API 密钥认证的变异示例:
import asyncio
import os
import sys
from urllib.parse import urlparse
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication
# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)
async def main():
# Should look like:
# https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
api_key = os.environ.get("AWS_GRAPHQL_API_KEY")
if url is None or api_key is None:
print("Missing environment variables")
sys.exit()
# Extract host from url
host = str(urlparse(url).netloc)
auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)
transport = AIOHTTPTransport(url=url, auth=auth)
async with Client(
transport=transport, fetch_schema_from_transport=False,
) as session:
query = gql(
"""
mutation createMessage($message: String!) {
createMessage(input: {message: $message}) {
id
message
createdAt
}
}"""
)
variable_values = {"message": "Hello world!"}
result = await session.execute(query, variable_values=variable_values)
print(result)
asyncio.run(main())
是否可以使用 Python 通过 Cognito 身份验证调用 AppSync 变更?怎么样?
我正在尝试使用 boto3,但我没有找到执行 graphql 操作的方法。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appsync.html
您可以将 API 身份验证模式设为 "API KEY" 并使用 http.
调用 AppSync 变更例如
import requests
import json
APPSYNC_API_KEY = 'da2-xxxxxxxxxxxxx'
APPSYNC_API_ENDPOINT_URL = 'https://xxxxxxxxxxxxx.appsync-api.us-west-2.amazonaws.com/graphql'
headers = {
'Content-Type': "application/graphql",
'x-api-key': APPSYNC_API_KEY,
'cache-control': "no-cache",
}
def execute_gql(query):
payload_obj = {"query": query}
payload = json.dumps(payload_obj)
response = requests.request("POST", APPSYNC_API_ENDPOINT_URL, data=payload, headers=headers)
return response
假设您有一个名为 Items
的模型,您可以轻松地进行如下查询:
if __name__ == '__main__':
print(execute_gql("query { listItems { items { id name } } }").json())
简单地用变异操作替换字符串。
graphql-python/gql supports AWS AppSync since version 3.0.0rc0.
它支持实时端点上的查询、变更甚至订阅。
它支持 IAM、api 密钥和 JWT(例如 Cognito)身份验证方法。
文档可用here
这是使用 API 密钥认证的变异示例:
import asyncio
import os
import sys
from urllib.parse import urlparse
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication
# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)
async def main():
# Should look like:
# https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
api_key = os.environ.get("AWS_GRAPHQL_API_KEY")
if url is None or api_key is None:
print("Missing environment variables")
sys.exit()
# Extract host from url
host = str(urlparse(url).netloc)
auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)
transport = AIOHTTPTransport(url=url, auth=auth)
async with Client(
transport=transport, fetch_schema_from_transport=False,
) as session:
query = gql(
"""
mutation createMessage($message: String!) {
createMessage(input: {message: $message}) {
id
message
createdAt
}
}"""
)
variable_values = {"message": "Hello world!"}
result = await session.execute(query, variable_values=variable_values)
print(result)
asyncio.run(main())