使用 UPDATE ... RETURNING 的多个数据库连接,似乎没有更新任务 table 中的行
Multiple Database connections using UPDATE ... RETURNING, seem to not update rows in tasks table
前言
我想并行处理数据库 table 中列出的任务。不寻找工作代码。
设置
- 1 PostgreSQL 数据库服务器 D
- 1 处理服务器 P
- 1 用户终端T
使用 Python 3.6、psycopg2.7.6、PostgreSQL 11
D 保存 tables 要处理的数据和 tasks
table。 T ssh
的用户进入 P,可以发出以下命令:
python -m core.utils.task
这个 task.py
脚本本质上是一个 while
循环,它从 D[ 上的 tasks
table 获取任务 t
=90=],状态为 'new',直到没有新任务为止。任务 t
基本上是另一个函数 do_something(t)
的一组参数。 do_something(t)
本身将与 D 建立许多连接以获取需要处理的数据,并在任务完成后将其设置为状态 'done' – while
循环从头开始并获得新任务。
为了运行python -m core.utils.task
多次我打开了多个ssh
连接。不太好,我知道; threading
或 multiprocessing
会更好。但他只是为了测试我是否可以 运行 提到的命令两次。
有一个名为 pgsql.py
的管理所有数据库交互的脚本需要它来获取任务,然后由 do_something(t)
执行。我从 this SE post 改编了一个单例模式。
伪代码(大部分)
task.py
import mymodule
import pgsql
def main():
while True:
r, c = pgsql.SQL.select_task() # rows and columns
task = dotdict(dict(zip(c, r[0])))
mymodule.do_something(task)
if __name__ == "__main__":
main()
mymodule.py
import pgsql
def do_something(t):
input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
some_other_function(input)
pgsql.SQL.task_status(t.task_id,'done')
pgsql.py
import psycopg2 as pg
class Postgres(object):
"""Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
try:
print('connecting to PostgreSQL database...')
connection = Postgres._instance.connection = pg.connect(**db_config)
connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
except Exception as error:
print('Error: connection not established {}'.format(error))
Postgres._instance = None
else:
print('connection established')
return cls._instance
def __init__(self):
self.connection = self._instance.connection
def query(self, query):
try:
with self.connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
except Exception as error:
print('error execting query "{}", error: {}'.format(query, error))
return None
else:
return rows, cols
def __del__(self):
self.connection.close()
db = Postgres()
class SQL():
def select_task():
s = """
UPDATE schema.tasks
SET status = 'ready'
WHERE task_id = ( SELECT task_id
FROM schema.tasks
WHERE tasks.status = 'new'
LIMIT 1)
RETURNING *
;
""".format(m=mode)
return Postgres.query(db, s)
def task_status(id,status):
s = """
UPDATE
schema.tasks
SET
status = '{s}'
WHERE
tasks.task_id = '{id}'
;
""".format(s=status,
id=id)
return Postgres.query(db, s)
问题
这适用于一个 ssh
连接。任务从数据库中检索并处理,完成后任务设置为 'done'。一旦我在第二个终端中打开第二个 ssh
到 运行 python -m core.utils.task
的连接(可以说是并行的),任务 table 的完全相同的行在两个终端中都会被处理- 忽略它们已更新。
问题
你有什么建议让它发挥作用?有数百万个任务,我需要 运行 并行处理它们。在实施 threading
或 multiprocessing
之前,我想先用多个 ssh
连接测试它,坏主意吗?我在 psycopg2
的 set_session()
中摆弄了 isolation levels
和 autocommit
设置,但没有成功。我检查了数据库服务器中的会话,可以看到 python -m core.utils.task
的每个进程都有自己的 PID,只连接一次,就像这种单例模式应该起作用一样。非常感谢任何想法或指示如何处理这个问题!
主要问题是执行一项任务不是原子操作。因此,在不同的 ssh 会话中,同一个任务可以被处理多次。
In this implementation, you can try to use an "INPROGRESS"
status for task so as not to retrieve tasks that are already being processed (with "INPROGRESS"
status). But be sure to use autocommit.
但我会使用线程和数据库连接池来实现它。并会使用 OFFSET
和 LIMIT
分批提取任务。 do_something
、select_task
和 task_status
函数将用于批量任务。
此外,没有必要将 Postgres
class 实现为单例。
已修改(见下方评论)
- 您可以将
FOR UPDATE SKIP LOCKED
添加到当前实现中的 SQL 查询(参见 url)。
- 如果你想使用批处理,然后用一些连续的列分隔数据(好吧,或者只是将数据排序在 table 中)。
- My implementation using batches.
- 这可以使用
ThreadPoolExecutor
和 PersistentConnectionPool
来实现。
前言
我想并行处理数据库 table 中列出的任务。不寻找工作代码。
设置
- 1 PostgreSQL 数据库服务器 D
- 1 处理服务器 P
- 1 用户终端T
使用 Python 3.6、psycopg2.7.6、PostgreSQL 11
D 保存 tables 要处理的数据和 tasks
table。 T ssh
的用户进入 P,可以发出以下命令:
python -m core.utils.task
这个 task.py
脚本本质上是一个 while
循环,它从 D[ 上的 tasks
table 获取任务 t
=90=],状态为 'new',直到没有新任务为止。任务 t
基本上是另一个函数 do_something(t)
的一组参数。 do_something(t)
本身将与 D 建立许多连接以获取需要处理的数据,并在任务完成后将其设置为状态 'done' – while
循环从头开始并获得新任务。
为了运行python -m core.utils.task
多次我打开了多个ssh
连接。不太好,我知道; threading
或 multiprocessing
会更好。但他只是为了测试我是否可以 运行 提到的命令两次。
有一个名为 pgsql.py
的管理所有数据库交互的脚本需要它来获取任务,然后由 do_something(t)
执行。我从 this SE post 改编了一个单例模式。
伪代码(大部分)
task.py
import mymodule
import pgsql
def main():
while True:
r, c = pgsql.SQL.select_task() # rows and columns
task = dotdict(dict(zip(c, r[0])))
mymodule.do_something(task)
if __name__ == "__main__":
main()
mymodule.py
import pgsql
def do_something(t):
input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
some_other_function(input)
pgsql.SQL.task_status(t.task_id,'done')
pgsql.py
import psycopg2 as pg
class Postgres(object):
"""Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
try:
print('connecting to PostgreSQL database...')
connection = Postgres._instance.connection = pg.connect(**db_config)
connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
except Exception as error:
print('Error: connection not established {}'.format(error))
Postgres._instance = None
else:
print('connection established')
return cls._instance
def __init__(self):
self.connection = self._instance.connection
def query(self, query):
try:
with self.connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
except Exception as error:
print('error execting query "{}", error: {}'.format(query, error))
return None
else:
return rows, cols
def __del__(self):
self.connection.close()
db = Postgres()
class SQL():
def select_task():
s = """
UPDATE schema.tasks
SET status = 'ready'
WHERE task_id = ( SELECT task_id
FROM schema.tasks
WHERE tasks.status = 'new'
LIMIT 1)
RETURNING *
;
""".format(m=mode)
return Postgres.query(db, s)
def task_status(id,status):
s = """
UPDATE
schema.tasks
SET
status = '{s}'
WHERE
tasks.task_id = '{id}'
;
""".format(s=status,
id=id)
return Postgres.query(db, s)
问题
这适用于一个 ssh
连接。任务从数据库中检索并处理,完成后任务设置为 'done'。一旦我在第二个终端中打开第二个 ssh
到 运行 python -m core.utils.task
的连接(可以说是并行的),任务 table 的完全相同的行在两个终端中都会被处理- 忽略它们已更新。
问题
你有什么建议让它发挥作用?有数百万个任务,我需要 运行 并行处理它们。在实施 threading
或 multiprocessing
之前,我想先用多个 ssh
连接测试它,坏主意吗?我在 psycopg2
的 set_session()
中摆弄了 isolation levels
和 autocommit
设置,但没有成功。我检查了数据库服务器中的会话,可以看到 python -m core.utils.task
的每个进程都有自己的 PID,只连接一次,就像这种单例模式应该起作用一样。非常感谢任何想法或指示如何处理这个问题!
主要问题是执行一项任务不是原子操作。因此,在不同的 ssh 会话中,同一个任务可以被处理多次。
In this implementation, you can try to use an
"INPROGRESS"
status for task so as not to retrieve tasks that are already being processed (with"INPROGRESS"
status). But be sure to use autocommit.
但我会使用线程和数据库连接池来实现它。并会使用 OFFSET
和 LIMIT
分批提取任务。 do_something
、select_task
和 task_status
函数将用于批量任务。
此外,没有必要将 Postgres
class 实现为单例。
已修改(见下方评论)
- 您可以将
FOR UPDATE SKIP LOCKED
添加到当前实现中的 SQL 查询(参见 url)。 - 如果你想使用批处理,然后用一些连续的列分隔数据(好吧,或者只是将数据排序在 table 中)。
- My implementation using batches.
- 这可以使用
ThreadPoolExecutor
和PersistentConnectionPool
来实现。