如何确保结果是由 async 而不是 pool 给出的

How to ensure that the result was given by async, not by pool

需要测试的功能(就是看不到代码,只能导入):

文件async_data.py

import asyncio
import socket
import aiohttp


async def get_json(client, uid):
    json_url = 'https://jsonplaceholder.typicode.com/todos/{uid}'.format(uid=uid)
    resp = await client.request('GET', json_url)
    data = await resp.json()
    return data


async def main_async(range_max):
    conn = aiohttp.TCPConnector(family=socket.AF_INET, verify_ssl=True)
    async with aiohttp.ClientSession(trust_env=True, connector=conn) as client:
        tasks = [get_json(client, x) for x in range(range_max)]
        data = await asyncio.gather(*tasks, return_exceptions=True)
        return data

秒(同步模式或使用池的相同任务)sync_data.py

import json
import urllib.request
from multiprocessing import Pool


def get_json_url(uid):
    json_url = 'https://jsonplaceholder.typicode.com/todos/{uid}'.format(uid=uid)
    jsondata = {}
    try:
        with urllib.request.urlopen(json_url) as url:
            jsondata = json.loads(url.read().decode())
    except urllib.error.HTTPError:
        pass
    return jsondata


def main_sync(range_max):
    return [get_json_url(uid) for uid in range(range_max)]


def main_pool(range_max):
    with Pool() as pool:
        result = pool.map(get_json_url, range(range_max))
    return result

主块,这里的函数 main_async、main_sync、main_pool 看起来像在黑框中,运行 测试:

import time
import asyncio
from async_data import main_async
from sync_data import main_sync, main_pool

def main():
    total_cnt = 200
    # async block
    async_start = time.clock()
    loop = asyncio.get_event_loop()
    try:
        async_data = loop.run_until_complete(main_async(total_cnt))
    finally:
        loop.close()
    async_time = time.clock() - async_start
    # pool block
    pool_start = time.clock()
    pool_data = main_pool(total_cnt)
    pool_time = time.clock() - pool_start
    # sync block
    sync_start = time.clock()
    sync_data = main_sync(total_cnt)
    sync_time = time.clock() - sync_start
    # assert data
    sorted_async = sorted([x.get('id', -1) for x in async_data])
    sorted_pool = sorted([x.get('id', -1) for x in pool_data])
    sorted_sync = sorted([x.get('id', -1) for x in sync_data])
    assert sorted_async == sorted_pool
    assert sorted_async == sorted_sync
    assert sync_time > async_time
    assert sync_time > pool_time
    # AND here i want to be ensure that the result was given by async not pool

if __name__ == '__main__':
    main()

测试数据是否被asyncsync方法接收的简单方法是检查执行时间。但是,您可以通过哪种方式测试代码使用的是 pool 还是 async

您可以为您的测试尝试一些模拟:

import multiprocessing.pool
from unittest.mock import patch

...

with patch(
    'multiprocessing.pool.ApplyResult.get',
    autospec=True,
    wraps=multiprocessing.pool.ApplyResult.get
) as patched:
    async_start = time.clock()
    loop = asyncio.get_event_loop()
    try:
        async_data = loop.run_until_complete(main_async(total_cnt))
    finally:
        loop.close()
    async_time = time.clock() - async_start
    patched.assert_not_called()

    ...

    pool_start = time.clock()
    pool_data = main_pool(total_cnt)
    pool_time = time.clock() - pool_start
    patched.assert_called()

pool.ApplyResult.get 是在从 pool.map 返回值之前调用的方法(以及从 apply、join 返回值,所以如果您不确定第二个 multiprocessing 的确切方法是什么测试模块使用,你可以坚持 pool.ApplyResult.get).

然后是unittest.mock.patch对象:它是一个用于测试的工具,其目的是替代标准库或第三方库中的某些方法或对象。通常,它会阻止调用修补的方法,只是 returns 一些预定义的值模仿原始方法的工作。

但是您可以使用 wraps 参数以不同的方式使用。如果你把原来的方法传给这个参数,原来的方法会在process中被调用。不过,pool.ApplyResult.get 将包含修补后的对象,而不是原始的 get 方法。但是原来的get是在补丁对象处理调用的时候调用的。因此,您可以同时获得该方法的结果和 unittest 库提供的一些额外统计信息,例如 assert_called.