使用 asyncio 在 Python 中进行 a/b 测试

Using asyncio for doing a/b testing in Python

假设有一些 API 已经在生产中 运行,而您创建了另一个 API,您有点想 A/B 使用传入的请求进行测试生产-api。现在我想知道,是否可以做这样的事情,(我知道人们通过为 A/B 测试等保留两个不同的 API 版本来进行流量拆分)

As soon as you get the incoming request for your production-api, you make an async request to your new API and then carry on with the rest of the code for the production-api and then, just before returning the final response to the caller back, you check whether you have the results computed for that async task that you had created before. If it's available, then you return that instead of the current API.

我想知道,做这样的事情最好的方法是什么?我们是否尝试为此或其他东西编写一个装饰器?我有点担心如果我们在这里使用异步会发生很多边缘情况。有人对改进代码或整个方法有任何指示吗?

感谢您的宝贵时间!


上述方法的一些伪代码,

import asyncio

def call_old_api():
    pass

async def call_new_api():
    pass

async def main():
    task = asyncio.Task(call_new_api())

    oldResp = call_old_api()
    resp = await task

    if task.done():
        return resp
    else:
        task.cancel() # maybe
        return oldResp

asyncio.run(main())

你不能只在 asyncio 的协同程序中执行 call_old_api(). Please, ensure you understand it, because depending on how your server works you may not be able to do what you want (to run async API on a sync server preserving the point of writing an async code, for example).

有详细解释

如果您了解自己在做什么,并且您有一个异步服务器,您可以在线程中调用旧的同步 API 并使用一个任务来 运行 新的 API:

task = asyncio.Task(call_new_api())
oldResp = await in_thread(call_old_api())

if task.done():
    return task.result()  # here you should keep in mind that task.result() may raise exception if the new api request failed, but that's probably ok for you
else:
    task.cancel() # yes, but you should take care of the cancelling, see - 
    return oldResp

我认为你可以走得更远,而不是总是等待旧的 API 完成,你可以 运行 同时 API 和 return第一个完成(如果新 api 比旧的工作得更快)。通过上面的所有检查和建议,它应该看起来像这样:

import asyncio
import random
import time
from contextlib import suppress


def call_old_api():
    time.sleep(random.randint(0, 2))
    return "OLD"


async def call_new_api():
    await asyncio.sleep(random.randint(0, 2))
    return "NEW"


async def in_thread(func):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, func)


async def ensure_cancelled(task):
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


async def main():
    old_api_task = asyncio.Task(in_thread(call_old_api))
    new_api_task = asyncio.Task(call_new_api())

    done, pending = await asyncio.wait(
        [old_api_task, new_api_task], return_when=asyncio.FIRST_COMPLETED
    )

    if pending:
        for task in pending:
            await ensure_cancelled(task)

    finished_task = done.pop()
    res = finished_task.result()
    print(res)


asyncio.run(main())