Using asyncio for doing a/b testing in Python

假设有一些 API 已经在生产中 运行,而您创建了另一个 API,您有点想 A/B 使用传入的请求进行测试生产-api。现在我想知道,是否可以做这样的事情,(我知道人们通过为 A/B 测试等保留两个不同的 API 版本来进行流量拆分)

As soon as you get the incoming request for your production-api, you make an async request to your new API and then carry on with the rest of the code for the production-api and then, just before returning the final response to the caller back, you check whether you have the results computed for that async task that you had created before. If it's available, then you return that instead of the current API.




import asyncio

def call_old_api():

async def call_new_api():

async def main():
    task = asyncio.Task(call_new_api())

    oldResp = call_old_api()
    resp = await task

    if task.done():
        return resp
        task.cancel() # maybe
        return oldResp


你不能只在 asyncio 的协同程序中执行 call_old_api(). Please, ensure you understand it, because depending on how your server works you may not be able to do what you want (to run async API on a sync server preserving the point of writing an async code, for example).


如果您了解自己在做什么,并且您有一个异步服务器,您可以在线程中调用旧的同步 API 并使用一个任务来 运行 新的 API:

task = asyncio.Task(call_new_api())
oldResp = await in_thread(call_old_api())

if task.done():
    return task.result()  # here you should keep in mind that task.result() may raise exception if the new api request failed, but that's probably ok for you
    task.cancel() # yes, but you should take care of the cancelling, see - 
    return oldResp

我认为你可以走得更远,而不是总是等待旧的 API 完成,你可以 运行 同时 API 和 return第一个完成(如果新 api 比旧的工作得更快)。通过上面的所有检查和建议,它应该看起来像这样:

import asyncio
import random
import time
from contextlib import suppress

def call_old_api():
    time.sleep(random.randint(0, 2))
    return "OLD"

async def call_new_api():
    await asyncio.sleep(random.randint(0, 2))
    return "NEW"

async def in_thread(func):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, func)

async def ensure_cancelled(task):
    with suppress(asyncio.CancelledError):
        await task

async def main():
    old_api_task = asyncio.Task(in_thread(call_old_api))
    new_api_task = asyncio.Task(call_new_api())

    done, pending = await asyncio.wait(
        [old_api_task, new_api_task], return_when=asyncio.FIRST_COMPLETED

    if pending:
        for task in pending:
            await ensure_cancelled(task)

    finished_task = done.pop()
    res = finished_task.result()
