在 Python Graphene 中使用 graphene.test 测试订阅

Testing subscriptions using graphene.test in Python Graphene

graphene-python 中测试订阅的惯用方法是什么? graphene.test中的client.execute选项似乎只适用于Query测试

P.S。文档中有一个订阅执行示例,但它似乎不是测试库的一部分 (https://docs.graphene-python.org/en/latest/execution/subscriptions/)。

预发布版graphene(3)支持订阅方式:

import asyncio
from datetime import datetime
from graphene import ObjectType, String, Schema, Field

class Query(ObjectType):
    hello = String()

    def resolve_hello(root, info):
        return 'Hello, world!'

class Subscription(ObjectType):
    time_of_day = Field(String)

    async def resolve_time_of_day(root, info):
        while True:
            yield datetime.now().isoformat()
            await asyncio.sleep(1)

schema = Schema(query=Query, subscription=Subscription)

async def main():
    subscription = 'subscription { timeOfDay }'
    result = await schema.execute_async(subscription)
    async for item in result:
        print(item)

asyncio.run(main())

来源: https://github.com/graphql-python/graphene/issues/1099.