如何使用 AsyncioSelectorReactor 从扭曲的延迟回调中访问 return 值?

How can I access the return value from twisted deferred callbacks using AsyncioSelectorReactor?

使用 Python 3.7.7,Twisted 20.3.0(和 Scrapy 2.1.0),当我尝试...

doc_link = await self.upload_reseller_document(doc_request, self.create_id(contract))

我得到的是延迟而不是字符串。也没有等待我的回电。

预期:https://s3.amazonaws.com/some-bucket/some_file.csvNone

收到:<Deferred at 0x11ae61dd0 current result: None>

    async def conditional_upload(request):
        docs_bucket = 'some-bucket'
        key = f'some-prefix/some_file.csv'
        url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'
        async def cb(obj):
            print('found key, returning url')
            return defer.success(url)

        async def upload_doc():
            print('called upload_doc')
            response = await self.crawler.engine.download(request, self)
            if response.status != 200:
                # Error happened, return item.
                print('could not download reseller csv')
                return defer.error(None)
            print('uploading to', docs_bucket, key)
            return threads.deferToThread(
                self.s3client.put_object,
                Bucket=docs_bucket,
                Key=key,
                Body=response.body)

        async def eb(failure):
            print('did not find key')
            if failure.type != ClientError:
                raise failure.value
            return upload_doc()

        return ensureDeferred(threads.deferToThread(
                self.s3client.head_object,
                Bucket=docs_bucket,
                Key=key).addCallbacks(cb, eb))

Internally Twisted 仅处理 returns 它的 Deferred 和函数,您不能将 async 函数作为回调传递给 Deferred(调用时,异步函数 returns a coroutine 对象),如果这样做,回调将无效,并且在反应器停止时您将收到警告 "coroutine x was never awaited".
使用异步函数时,你应该 await the Deferreds finish and handle their result instead of appending callbacks and returning them. The goal of async functions is to avoid the callback hell.

defer.ensureDeferred 用于将协程包装在延迟中并允许 Twisted 将它们安排为 运行,当您需要在非异步函数中调用异步函数时使用它。

使用try/catch来处理异常(相当于errback,但异常没有包裹在twisted的Failure):

async def conditional_upload(request):
    docs_bucket = 'some-bucket'
    key = f'some-prefix/some_file.csv'
    url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'

    async def upload_doc():
        print('called upload_doc')
        response = await self.crawler.engine.download(request, self)
        if response.status != 200:
            # Error happened, return item.
            print('could not download reseller csv')
            raise Exception('could not download reseller csv')
        print('uploading to', docs_bucket, key)
        return await threads.deferToThread(
            self.s3client.put_object, Bucket=docs_bucket, Key=key, Body=body
        )

    # propably here you want to check if something already exists
    try:
        await threads.deferToThread(self.s3client.head_object, Bucket=docs_bucket, Key=key)
        print('found key, returning url')
        return url
    except ClientError:
        print('did not find key, going to upload_doc ...')

    # if does not exists, then create it
    retry_attempts = 10 # avoid infinite loop
    for _ in range(retry_attempts):
        try:
            await upload_doc()
            print('Uploaded the key, returning url')
            return url
        except ClientError:
            print('Failed to upload the key, retrying...')

    print('Failed to upload the key, max attemps tried.')