如何使用 PyGreSQL 执行并行查询?
How to execute parallel queries with PyGreSQL?
我正在尝试 运行 多个查询与 PyGreSQL 和多处理并行,但下面的代码挂起而没有返回:
from pg import DB
from multiprocessing import Pool
from functools import partial
def create_query(table_name):
return f"""create table {table_name} (id integer);
CREATE INDEX ON {table_name} USING BTREE (id);"""
my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]
def execute_query(conn_string, query):
con = DB(conn_string)
con.query(query)
con.close()
rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)
有什么办法让它起作用吗?是否也可以在同一个 "transaction" 中进行 3 运行ning 查询,以防一个查询失败而另一个查询回滚?
一个明显的问题是您总是 运行 pool.map
,不仅在主进程中,而且在并行子进程中使用的解释器导入脚本时也是如此。你应该这样做:
def run_all():
with Pool(processes=len(my_queries)) as pool:
pool.map(partial(execute_query,rs_conn_string), my_queries)
if __name__ == '__main__':
run_all()
关于你的第二个问题,这是不可能的,因为事务是按连接进行的,如果你这样做的话,它们会存在于不同的进程中。
异步命令处理可能是你想要的,但它不是yet supported by PyGreSQL. Psygopg + aiopg可能更适合做这样的事情。
PyGreSql 添加了与 connection.poll() 方法的异步。就池而言,我喜欢重写 MySQL.connectors 池包装器来处理 pgdb 连接对象。有一些“可选”连接方法调用会失败,您必须注释掉它们(即检查连接状态等。如果需要,这些可以在 Pgdb 连接对象级别实现,但调用不匹配 MySQL.connectors api 界面)。可能存在一些 low-level 错误,因为库仅以类似方式抽象,但此解决方案已经 运行 投入生产几个月了,现在没有任何问题。
我正在尝试 运行 多个查询与 PyGreSQL 和多处理并行,但下面的代码挂起而没有返回:
from pg import DB
from multiprocessing import Pool
from functools import partial
def create_query(table_name):
return f"""create table {table_name} (id integer);
CREATE INDEX ON {table_name} USING BTREE (id);"""
my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]
def execute_query(conn_string, query):
con = DB(conn_string)
con.query(query)
con.close()
rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)
有什么办法让它起作用吗?是否也可以在同一个 "transaction" 中进行 3 运行ning 查询,以防一个查询失败而另一个查询回滚?
一个明显的问题是您总是 运行 pool.map
,不仅在主进程中,而且在并行子进程中使用的解释器导入脚本时也是如此。你应该这样做:
def run_all():
with Pool(processes=len(my_queries)) as pool:
pool.map(partial(execute_query,rs_conn_string), my_queries)
if __name__ == '__main__':
run_all()
关于你的第二个问题,这是不可能的,因为事务是按连接进行的,如果你这样做的话,它们会存在于不同的进程中。
异步命令处理可能是你想要的,但它不是yet supported by PyGreSQL. Psygopg + aiopg可能更适合做这样的事情。
PyGreSql 添加了与 connection.poll() 方法的异步。就池而言,我喜欢重写 MySQL.connectors 池包装器来处理 pgdb 连接对象。有一些“可选”连接方法调用会失败,您必须注释掉它们(即检查连接状态等。如果需要,这些可以在 Pgdb 连接对象级别实现,但调用不匹配 MySQL.connectors api 界面)。可能存在一些 low-level 错误,因为库仅以类似方式抽象,但此解决方案已经 运行 投入生产几个月了,现在没有任何问题。