Python API: 同步多线程数据库查询
Python API: synchronous multithreaded DB queries
我有一个 Python 烧瓶 API 可以对对象应用一些 SQL 过滤。
API 工作流程的步骤:
- 收到 POST 请求(带参数)
- 运行 多个 SQL 读取查询(针对 postgres 数据库)取决于一些发布的参数
- 对SQL结果应用一些简单的“纯python”规则以获得布尔结果
- 将布尔结果和相关的发布参数存储在 postgres 数据库中
- return 布尔结果
API的约束:
- API 需要 return 150 毫秒以下的布尔答案
- 我可以将布尔结果异步存储在数据库中,以避免在return输入布尔结果
之前等待写入查询完成
- 但是,如前所述,布尔值答案取决于 SQL 读取查询,因此我不能 运行 那些异步查询
进行的测试:
在进行一些测试时,我发现我可以并行进行读取查询。我做的测试是:
- 运行 下面的查询 2 次不使用多线程 => 代码 运行 大约 10 秒
from sqlalchemy import create_engine
import os
import time
engine = create_engine(
os.getenv("POSTGRES_URL")
)
def run_query():
with engine.connect() as conn:
rs = conn.execute(f"""
SELECT
*
, pg_sleep(5)
FROM users
""")
for row in rs:
print(row)
if __name__ == "__main__":
start = time.time()
for i in range(5):
run_query()
end = time.time() - start
- 运行 使用多线程的查询 => 代码 运行 大约需要 5 秒
from sqlalchemy import create_engine
import os
import threading
import time
engine = create_engine(
os.getenv("POSTGRES_URL")
)
def run_query():
with engine.connect() as conn:
rs = conn.execute(f"""
SELECT
*
, pg_sleep(5)
FROM users
""")
for row in rs:
print(row)
if __name__ == "__main__":
start = time.time()
threads = []
for i in range(5):
t = threading.Thread(target=run_query)
t.start()
threads.append(t)
for t in threads:
t.join()
end = time.time() - start
问题:
- 代码的瓶颈是什么?我确信在 1 个 API 调用中,我可以 运行 并行读取查询的数量必须达到上限。但是我想知道是什么决定了这些限制。
非常感谢您的帮助!
这远远超出了合理的范围。通过对内置连接池的 pool_size 进行一些调整,您可以轻松地同时进行 100 个 pg_sleep。但是一旦你改变它来做真正的工作而不仅仅是睡觉,它就会分崩离析。你只有那么多 CPU 和那么多磁盘驱动器,而且这个数字可能还不到 100。
您应该首先查看那些读取的查询,看看为什么它们很慢,以及是否不能通过索引或其他方式使它们更快。
我有一个 Python 烧瓶 API 可以对对象应用一些 SQL 过滤。
API 工作流程的步骤:
- 收到 POST 请求(带参数)
- 运行 多个 SQL 读取查询(针对 postgres 数据库)取决于一些发布的参数
- 对SQL结果应用一些简单的“纯python”规则以获得布尔结果
- 将布尔结果和相关的发布参数存储在 postgres 数据库中
- return 布尔结果
API的约束:
- API 需要 return 150 毫秒以下的布尔答案
- 我可以将布尔结果异步存储在数据库中,以避免在return输入布尔结果 之前等待写入查询完成
- 但是,如前所述,布尔值答案取决于 SQL 读取查询,因此我不能 运行 那些异步查询
进行的测试: 在进行一些测试时,我发现我可以并行进行读取查询。我做的测试是:
- 运行 下面的查询 2 次不使用多线程 => 代码 运行 大约 10 秒
from sqlalchemy import create_engine
import os
import time
engine = create_engine(
os.getenv("POSTGRES_URL")
)
def run_query():
with engine.connect() as conn:
rs = conn.execute(f"""
SELECT
*
, pg_sleep(5)
FROM users
""")
for row in rs:
print(row)
if __name__ == "__main__":
start = time.time()
for i in range(5):
run_query()
end = time.time() - start
- 运行 使用多线程的查询 => 代码 运行 大约需要 5 秒
from sqlalchemy import create_engine
import os
import threading
import time
engine = create_engine(
os.getenv("POSTGRES_URL")
)
def run_query():
with engine.connect() as conn:
rs = conn.execute(f"""
SELECT
*
, pg_sleep(5)
FROM users
""")
for row in rs:
print(row)
if __name__ == "__main__":
start = time.time()
threads = []
for i in range(5):
t = threading.Thread(target=run_query)
t.start()
threads.append(t)
for t in threads:
t.join()
end = time.time() - start
问题:
- 代码的瓶颈是什么?我确信在 1 个 API 调用中,我可以 运行 并行读取查询的数量必须达到上限。但是我想知道是什么决定了这些限制。
非常感谢您的帮助!
这远远超出了合理的范围。通过对内置连接池的 pool_size 进行一些调整,您可以轻松地同时进行 100 个 pg_sleep。但是一旦你改变它来做真正的工作而不仅仅是睡觉,它就会分崩离析。你只有那么多 CPU 和那么多磁盘驱动器,而且这个数字可能还不到 100。
您应该首先查看那些读取的查询,看看为什么它们很慢,以及是否不能通过索引或其他方式使它们更快。