使用异步 Python 的并发 HTTP 和 SQL 请求 3

Concurrent HTTP and SQL requests using async Python 3

第一次尝试 asyncioaiohttp。 我有以下代码从 MySQL 数据库获取 urls 以获取 GET 请求。获取响应并将它们推送到 MySQL 数据库。

if __name__ == "__main__":
    database_name = 'db_name'
    company_name = 'company_name'

    my_db = Db(database=database_name) # wrapper class for mysql.connector
    urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
    update_id = my_db.get_updateid()
    my_db.get_connection(dictionary=True)

    for url in urls_dict:
        url_id = url['id']
        url = url['url']
        table_name = my_db.make_sql_table_name_by_url(url)
        insert_query = my_db.get_sql_for_insert(table_name)
        r = requests.get(url=url).json() # make the request
        args = [json.dumps(r), update_id, url_id]
        my_db.db_execute_one(insert_query, args, close_conn=False)

    my_db.close_conn()

这工作正常,但要加快速度我怎样才能 运行 它 asynchronously

我看过 here, and here 但似乎无法理解它。

这是我根据@Raphael Medaer 的回答尝试过的方法。

async def fetch(url):
    async with ClientSession() as session:
        async with session.request(method='GET', url=url) as response:
            json = await response.json()
            return json


async def process(url, update_id):
    table_name = await db.make_sql_table_name_by_url(url)
    result = await fetch(url)
    print(url, result)

if __name__ == "__main__":
    """Get urls from DB"""
    db = Db(database="fuse_src")
    urls = db.get_rest_api_urls()  # This returns list of dictionary
    update_id = db.get_updateid()
    url_list = []
    for url in urls:
        url_list.append(url['url'])
    print(update_id)
    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url, update_id) for url in url_list]))

我在 process 方法中遇到错误:

TypeError: object str can't be used in 'await' expression

不确定是什么问题?

任何特定于此的代码示例将不胜感激。

使这段代码异步根本不会加速。除非您考虑 运行 您在 "parallel" 中的一部分代码。例如,您可以 运行 在 "same time" 中进行多个(SQL 或 HTTP)查询。通过进行异步编程,您将不会执行 "same time" 中的代码。尽管在等待 IOs.

时,您将受益于执行代码的其他部分的长 IO 任务

首先,您必须使用 异步 库(而不是 同步 库)。

  • mysql.connector 可以替换为来自 aio-libs 的 aiomysql
  • requests 可以替换为 aiohttp

要在 "parallel" 中执行多个异步任务(例如替换循环 for url in urls_dict:),您必须仔细阅读 asyncio tasks and function gather

我不会(重新)以异步方式编写您的代码,但是这里有几行伪代码可以帮助您:

async def process(url):
    result = await fetch(url)
    await db.commit(result)

if __name__ == "__main__":
    db = MyDbConnection()
    urls = await db.fetch_all_urls()

    asyncio.get_event_loop().run_until_complete(
        asyncio.gather(*[process(url) for url in urls]))