使用 pool.imap 时无法 pickle psycopg2.extensions.connection 对象,但可以在单个进程中完成
Can't pickle psycopg2.extensions.connection objects when using pool.imap, but can be done in individual processes
我正在尝试构建一个应用程序,它将 "check out" 一个单元格,这是一个覆盖地理数据库中一部分土地的正方形,并对该单元格内的要素进行分析。因为我有很多细胞要处理,所以我使用了多处理方法。
我让它在我的对象内部有点像这样工作:
class DistributedGeographicConstraintProcessor:
...
def _process_cell(self, conn_string):
conn = pg2.connect(conn_string)
try:
cur = conn.cursor()
cell_id = self._check_out_cell(cur)
conn.commit()
print(f"processing cell_id {cell_id}...")
for constraint in self.constraints:
# print(f"processing {constraint.name()}...")
query = constraint.prepare_distributed_query(self.job, self.grid)
cur.execute(query, {
"buffer": constraint.buffer(),
"cell_id": cell_id,
"name": constraint.name(),
"simplify_tolerance": constraint.simplify_tolerance()
})
# TODO: do a final race condition check to further suppress duplicates
self._check_in_cell(cur, cell_id)
conn.commit()
finally:
del cur
conn.close()
return None
def run(self):
while True:
if not self._job_finished():
params = [self.conn_string] * self.num_cores
processes = []
for param in params:
process = mp.Process(target=self._process_cell, args=(param,))
processes.append(process)
sleep(0.1) # Prevent multiple processes from checkout out the same grid square
process.start()
for process in processes:
process.join()
else:
self._finalize_job()
break
但问题是它只会启动四个进程,等到它们都完成后再启动四个新进程。
我想这样当一个进程完成它的工作时,它将立即开始处理下一个单元格,即使它的协同进程还没有完成。
我不确定如何实现它,我试过使用这样的池:
def run(self):
pool = mp.Pool(self.num_cores)
unprocessed_cells = self._unprocessed_cells()
for i in pool.imap(self._process_cell, unprocessed_cells):
print(i)
但这只是告诉我连接无法被 pickle:
TypeError: can't pickle psycopg2.extensions.connection objects
但我不明白为什么,因为它与我在 imap
函数中使用的函数与在 Process
目标中使用的函数完全相同。
我已经看过这些帖子,这就是他们不回答我的问题的原因:
- Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects - 这里的答案只是说明多个进程不能共享同一个连接。我知道这一点,并且正在初始化子进程中正在执行的函数内的进程。此外,正如我提到的,当我将函数映射到单个
Process
实例时,它会起作用,具有相同的函数和相同的输入。
- Multiprocessing result of a psycopg2 request. “Can't pickle psycopg2.extensions.connection objects” - 这个问题没有答案也没有任何评论,而且代码也不完整 - 作者引用了问题中未指定的函数,无论如何它很明显,他们公然试图在进程之间共享相同的游标。
我的猜测是您将一些连接对象附加到 self
;尝试仅使用函数重写您的解决方案(没有 classes/methods)。
这是我前段时间使用的 单个 producer/multiple worker 解决方案的简化版本:
def worker(param):
//connect to pg
//do work
def main():
pool = Pool(processes=NUM_PROC)
tasks = []
for param in params:
t = pool.apply_async(utils.process_month, args=(param, ))
tasks.append(t)
pool.close()
finished = false
while not finished:
finished = True
for t in tasks:
if not t.ready():
finished = False
break
time.sleep(1)
我正在尝试构建一个应用程序,它将 "check out" 一个单元格,这是一个覆盖地理数据库中一部分土地的正方形,并对该单元格内的要素进行分析。因为我有很多细胞要处理,所以我使用了多处理方法。
我让它在我的对象内部有点像这样工作:
class DistributedGeographicConstraintProcessor:
...
def _process_cell(self, conn_string):
conn = pg2.connect(conn_string)
try:
cur = conn.cursor()
cell_id = self._check_out_cell(cur)
conn.commit()
print(f"processing cell_id {cell_id}...")
for constraint in self.constraints:
# print(f"processing {constraint.name()}...")
query = constraint.prepare_distributed_query(self.job, self.grid)
cur.execute(query, {
"buffer": constraint.buffer(),
"cell_id": cell_id,
"name": constraint.name(),
"simplify_tolerance": constraint.simplify_tolerance()
})
# TODO: do a final race condition check to further suppress duplicates
self._check_in_cell(cur, cell_id)
conn.commit()
finally:
del cur
conn.close()
return None
def run(self):
while True:
if not self._job_finished():
params = [self.conn_string] * self.num_cores
processes = []
for param in params:
process = mp.Process(target=self._process_cell, args=(param,))
processes.append(process)
sleep(0.1) # Prevent multiple processes from checkout out the same grid square
process.start()
for process in processes:
process.join()
else:
self._finalize_job()
break
但问题是它只会启动四个进程,等到它们都完成后再启动四个新进程。
我想这样当一个进程完成它的工作时,它将立即开始处理下一个单元格,即使它的协同进程还没有完成。
我不确定如何实现它,我试过使用这样的池:
def run(self):
pool = mp.Pool(self.num_cores)
unprocessed_cells = self._unprocessed_cells()
for i in pool.imap(self._process_cell, unprocessed_cells):
print(i)
但这只是告诉我连接无法被 pickle:
TypeError: can't pickle psycopg2.extensions.connection objects
但我不明白为什么,因为它与我在 imap
函数中使用的函数与在 Process
目标中使用的函数完全相同。
我已经看过这些帖子,这就是他们不回答我的问题的原因:
- Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects - 这里的答案只是说明多个进程不能共享同一个连接。我知道这一点,并且正在初始化子进程中正在执行的函数内的进程。此外,正如我提到的,当我将函数映射到单个
Process
实例时,它会起作用,具有相同的函数和相同的输入。 - Multiprocessing result of a psycopg2 request. “Can't pickle psycopg2.extensions.connection objects” - 这个问题没有答案也没有任何评论,而且代码也不完整 - 作者引用了问题中未指定的函数,无论如何它很明显,他们公然试图在进程之间共享相同的游标。
我的猜测是您将一些连接对象附加到 self
;尝试仅使用函数重写您的解决方案(没有 classes/methods)。
这是我前段时间使用的 单个 producer/multiple worker 解决方案的简化版本:
def worker(param):
//connect to pg
//do work
def main():
pool = Pool(processes=NUM_PROC)
tasks = []
for param in params:
t = pool.apply_async(utils.process_month, args=(param, ))
tasks.append(t)
pool.close()
finished = false
while not finished:
finished = True
for t in tasks:
if not t.ready():
finished = False
break
time.sleep(1)