具有 python 个子进程的 PostgreSQL 数据库并发
Concurrency on PostgreSQL Database with python subprocesses
我使用 python 多处理进程通过 psycopg 建立到 postgreSQL 数据库的多个连接。
每个进程都会建立一个连接,创建一个游标,从 mp.Queue
中获取一个对象并对数据库进行一些操作。如果一切正常,则提交更改并关闭连接。
如果其中一个进程发生错误(例如,添加列请求失败,因为列已经存在),所有进程似乎都停止工作。
import psycopg2
import multiprocessing as mp
import Queue
def connect():
C = psycopg2.connect(host = "myhost", user = "myuser", password = "supersafe", port = 62013, database = "db")
cur = C.cursor()
return C,cur
def commit_and_close(C,cur):
C.commit()
cur.close()
C.close()
def commit(C):
C.commit()
def sub(queue):
C,cur = connect()
while not queue.empty():
work_element = queue.get(timeout=1)
#do something with the work element, that might produce an SQL error
commit_and_close(C,cur)
return 0
if __name__ == '__main__':
job_queue = mp.Queue()
#Fill Job_queue
print 'Run'
for i in range(20):
p=mp.Process(target=sub, args=(job_queue))
p.start()
我可以看到,进程仍然存在(因为 job_queue 仍然是满的),但是没有网络流量/SQL 操作正在发生。 SQL 错误是否可能阻止与其他子进程的通信?我怎样才能防止这种情况发生?
碰巧,我今天也在做类似的事情。
一个连接的状态不应该影响另一个,所以我认为我们不应该从那里开始。
您的队列处理中显然存在竞争条件。您检查队列是否为空,然后尝试获取要执行的语句。对于多个读者,其中一个可以清空队列,让其他人都阻塞在他们的 queue.get
上。如果当他们都锁定时队列是空的,那么我会怀疑这一点。
您也永远不会 join
您的流程在完成后返回。我不确定在更大的画面中会产生什么影响,但清理可能是个好习惯。
可能发生的另一件事是您的出错进程没有正确回滚。这可能会让其他事务等待查看它是否完成或回滚。默认情况下,他们可以等待很长时间,但您可以 configure it.
要查看发生了什么,启动 psql 并查看两个有用的系统视图 pg_stat_activity
和 pg_locks
。这应该表明原因所在。
我使用 python 多处理进程通过 psycopg 建立到 postgreSQL 数据库的多个连接。
每个进程都会建立一个连接,创建一个游标,从 mp.Queue
中获取一个对象并对数据库进行一些操作。如果一切正常,则提交更改并关闭连接。
如果其中一个进程发生错误(例如,添加列请求失败,因为列已经存在),所有进程似乎都停止工作。
import psycopg2
import multiprocessing as mp
import Queue
def connect():
C = psycopg2.connect(host = "myhost", user = "myuser", password = "supersafe", port = 62013, database = "db")
cur = C.cursor()
return C,cur
def commit_and_close(C,cur):
C.commit()
cur.close()
C.close()
def commit(C):
C.commit()
def sub(queue):
C,cur = connect()
while not queue.empty():
work_element = queue.get(timeout=1)
#do something with the work element, that might produce an SQL error
commit_and_close(C,cur)
return 0
if __name__ == '__main__':
job_queue = mp.Queue()
#Fill Job_queue
print 'Run'
for i in range(20):
p=mp.Process(target=sub, args=(job_queue))
p.start()
我可以看到,进程仍然存在(因为 job_queue 仍然是满的),但是没有网络流量/SQL 操作正在发生。 SQL 错误是否可能阻止与其他子进程的通信?我怎样才能防止这种情况发生?
碰巧,我今天也在做类似的事情。
一个连接的状态不应该影响另一个,所以我认为我们不应该从那里开始。
您的队列处理中显然存在竞争条件。您检查队列是否为空,然后尝试获取要执行的语句。对于多个读者,其中一个可以清空队列,让其他人都阻塞在他们的 queue.get
上。如果当他们都锁定时队列是空的,那么我会怀疑这一点。
您也永远不会 join
您的流程在完成后返回。我不确定在更大的画面中会产生什么影响,但清理可能是个好习惯。
可能发生的另一件事是您的出错进程没有正确回滚。这可能会让其他事务等待查看它是否完成或回滚。默认情况下,他们可以等待很长时间,但您可以 configure it.
要查看发生了什么,启动 psql 并查看两个有用的系统视图 pg_stat_activity
和 pg_locks
。这应该表明原因所在。