Python 3.8 脚本在获取数据库连接时冻结(psycopg2 和多处理)- Windows 7
Python 3.8 script freezes on getting database connection (psycopg2 and multiprocessing) - Windows 7
我有一个脚本可以启动四个进程来对数据库进行插入。他们使用不同的参数共享相同的目标代码。
经过一些调试,我发现程序卡住的那一刻是在第2步:进程请求数据库连接的那一步。问题不会立即发生。该程序在冻结之前完成了一些插入。我试图将步骤2,3和4锁定在一起,但并没有解决问题。我不知道如何解决这个问题。
records 是一个对象数组,其中包含将要用于将 INSERT 插入数据库的数据。
计数器是验证进度的共享值。
total 用于计算进度百分比。
锁用于独占访问变量和数据库。
dataBaseAdapter 是一个 python 模块,它导入 psycopg2 以与数据库建立连接
数据库是 PostgreSQL
我不知道这是否有任何区别,但我在 Visual Studio 代码
上 运行
进程的创建:
len_records = len(records)
counter = Value('i', 0)
lock = Lock()
total = Value('i', len_records)
for index in range(0,len_records,4):
p1 = encapsulatedProcess(index,jogos,len_records,counter,lock)
p2 = encapsulatedProcess(index+1,jogos,len_records,counter,lock)
p3 = encapsulatedProcess(index+2,jogos,len_records,counter,lock)
p4 = encapsulatedProcess(index+3,jogos,len_records,counter,lock)
holdProcess(index+1,p1)
holdProcess(index+2,p2)
holdProcess(index+3,p3)
holdProcess(index+4,p4)
封装过程:
def encapsulatedProcess(ind,records,len_records,counter,lock):
if(ind > len_records): return None
record = records[ind]
process = Process(target=my_function, args=(ind,record,counter,lock,), daemon=True)
process.start()
return process
保持进程:
def holdProcess(number,process):
if(process == None): return
if(process.is_alive()):
process.join()
process.close()
print(number)
my_function:
def dramaAnalizerLocal(idt,record,counter,lock):
lock.acquire()
print(idt," 1 - load model")
auxiliar = Model(record) #connects with database and closes
lock.release()
lock.acquire()
print(idt," 2 - getting connection")
conn = dataBaseAdapter.getConnection()
lock.release()
lock.acquire()
print(idt, " 3 - saving data")
record.store(InsertIntoType1(model=auxiliar, ignored=1), conn)
record.store(IntertIntoType2(model=auxiliar, ignored=1),conn)
record.store(InsertIntoType3(model=auxiliar, ignored=1),conn)
lock.release()
lock.acquire()
print(idt," 4 - closing connection")
dataBaseAdapter.closeConnection(conn)
lock.release()
lock.acquire()
print(idt," 5 - increment counter")
counter.value += 1
lock.release()
lock.acquire()
print(idt," 6 - return")
lock.release()
在网上搜索后,问题似乎是程序打开和关闭连接太快了。因此,建议在所有操作后立即关闭。我能够通过单个进程传递连接,但是,我不知道该进程如何继续请求连接。所以,为了解决这个问题,我做了我不敢做的事情,因为我担心这会很难:安装 pgbouncer。结果很容易。我只需要更改程序使用的端口,为 pgbouncer 提供数据库别名并重新启动 pgbouncer。
TLDR:安装了 pgbouncer 并配置了程序和 pgbouncer
我有一个脚本可以启动四个进程来对数据库进行插入。他们使用不同的参数共享相同的目标代码。
经过一些调试,我发现程序卡住的那一刻是在第2步:进程请求数据库连接的那一步。问题不会立即发生。该程序在冻结之前完成了一些插入。我试图将步骤2,3和4锁定在一起,但并没有解决问题。我不知道如何解决这个问题。
records 是一个对象数组,其中包含将要用于将 INSERT 插入数据库的数据。
计数器是验证进度的共享值。
total 用于计算进度百分比。
锁用于独占访问变量和数据库。
dataBaseAdapter 是一个 python 模块,它导入 psycopg2 以与数据库建立连接
数据库是 PostgreSQL
我不知道这是否有任何区别,但我在 Visual Studio 代码
进程的创建:
len_records = len(records)
counter = Value('i', 0)
lock = Lock()
total = Value('i', len_records)
for index in range(0,len_records,4):
p1 = encapsulatedProcess(index,jogos,len_records,counter,lock)
p2 = encapsulatedProcess(index+1,jogos,len_records,counter,lock)
p3 = encapsulatedProcess(index+2,jogos,len_records,counter,lock)
p4 = encapsulatedProcess(index+3,jogos,len_records,counter,lock)
holdProcess(index+1,p1)
holdProcess(index+2,p2)
holdProcess(index+3,p3)
holdProcess(index+4,p4)
封装过程:
def encapsulatedProcess(ind,records,len_records,counter,lock):
if(ind > len_records): return None
record = records[ind]
process = Process(target=my_function, args=(ind,record,counter,lock,), daemon=True)
process.start()
return process
保持进程:
def holdProcess(number,process):
if(process == None): return
if(process.is_alive()):
process.join()
process.close()
print(number)
my_function:
def dramaAnalizerLocal(idt,record,counter,lock):
lock.acquire()
print(idt," 1 - load model")
auxiliar = Model(record) #connects with database and closes
lock.release()
lock.acquire()
print(idt," 2 - getting connection")
conn = dataBaseAdapter.getConnection()
lock.release()
lock.acquire()
print(idt, " 3 - saving data")
record.store(InsertIntoType1(model=auxiliar, ignored=1), conn)
record.store(IntertIntoType2(model=auxiliar, ignored=1),conn)
record.store(InsertIntoType3(model=auxiliar, ignored=1),conn)
lock.release()
lock.acquire()
print(idt," 4 - closing connection")
dataBaseAdapter.closeConnection(conn)
lock.release()
lock.acquire()
print(idt," 5 - increment counter")
counter.value += 1
lock.release()
lock.acquire()
print(idt," 6 - return")
lock.release()
在网上搜索后,问题似乎是程序打开和关闭连接太快了。因此,建议在所有操作后立即关闭。我能够通过单个进程传递连接,但是,我不知道该进程如何继续请求连接。所以,为了解决这个问题,我做了我不敢做的事情,因为我担心这会很难:安装 pgbouncer。结果很容易。我只需要更改程序使用的端口,为 pgbouncer 提供数据库别名并重新启动 pgbouncer。
TLDR:安装了 pgbouncer 并配置了程序和 pgbouncer