使用 sqlalchemy 进行多处理
Multi Processing with sqlalchemy
我有一个 python 脚本,它通过 sqlalchemy 使用以下方式处理数据事务:
def save_update(args):
session, engine = create_session(config["DATABASE"])
try:
instance = get_record(session)
if instance is None:
instance = create_record(session)
else:
instance = update_record(session, instance)
sync_errors(session, instance)
sync_expressions(session, instance)
sync_part(session, instance)
session.commit()
except:
session.rollback()
write_error(config)
raise
finally:
session.close()
在数据交易之上,我还有一些与数据库无关的处理——数据交易前的数据准备。这些预先要求的任务需要一些时间,所以我想并行执行这个完整脚本的多个实例(数据准备 + 使用 sqlalchemy 的数据事务)。
因此我在不同的脚本中做,此处简化示例:
process1 = Thread(target=call_script, args=[["python", python_file_path,
"-xml", xml_path,
"-o", args.outputFolder,
"-l", log_path]])
process2 = Thread(target=call_script, args=[["python", python_file_path,
"-xml", xml_path,
"-o", args.outputFolder,
"-l", log_path]])
process1.start()
process2.start()
process1.join()
process2.join()
目标函数“call_script”执行上面第一个提到的脚本(数据准备+用sqlalchemy进行数据交易):
def call_script(args):
status = subprocess.call(args, shell=True)
print(status)
所以现在总结一下,例如我将有 2 个子线程 + 主线程 运行。这些子线程中的每一个都在单独的进程中执行 sqlalchemy 代码。
因此,我的问题是我是否应该注意有关使用 sqlalchemy 的代码的多处理方面的任何特定注意事项?对我来说答案是否定的,因为这是多处理而不是多线程,因为使用 subprocess.call() 来执行我的代码。
现在在现实中,我时不时地感觉在执行过程中我有数据库锁。不确定这是否与我的代码有关,或者其他人在我处理它时也在访问数据库,但我希望每个子进程在开始工作时实际上锁定数据库,以便其他子进程等待当前session 关闭。
编辑
我已经使用多处理代替多线程进行测试:
processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]
我仍然有同样的问题,我得到了更多的细节:
我看到 SQL 服务器显示状态“AWAITING COMMAND”,只有当我终止执行命令的相关 python 进程时,这才会消失。
我觉得它出现在我高度并行化子流程时,但真的不确定。
在此先感谢您的支持。
这是一个有趣的情况。看起来也许你可以回避一些手动 process/thread 处理并利用诸如多处理的 Pool. I made an example based on some other data initializing code I had. This delegates creating test data and inserting it for each of 10 "devices" to a pool of 3 processes. One caveat that seems necessary is to dispose of the engine before it is shared across fork()
, ie. before the Pool tasks are created, this is mentioned here: engine-disposal
from random import randint
from datetime import datetime
from multiprocessing import Pool
from sqlalchemy import (
create_engine,
Integer,
DateTime,
String,
)
from sqlalchemy.schema import (
Column,
MetaData,
ForeignKey,
)
from sqlalchemy.orm import declarative_base, relationship, Session, backref
db_uri = 'postgresql+psycopg2://username:password@/database'
engine = create_engine(db_uri, echo=False)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True, index=True)
created_on = Column(DateTime, nullable=False, index=True)
device_id = Column(Integer, ForeignKey('devices.id'), nullable=True)
device = relationship('Device', backref=backref("events"))
class Device(Base):
__tablename__ = "devices"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
def get_test_data(device_num):
""" Generate a test device and its test events for the given device number. """
device_dict = dict(name=f'device-{device_num}')
event_dicts = []
for day in range(1, 5):
for hour in range(0, 24):
for _ in range(0, randint(0, 50)):
event_dicts.append({
"created_on": datetime(day=day, month=1, year=2022, hour=hour),
})
return (device_dict, event_dicts)
def create_test_data(device_num):
""" Actually write the test data to the database. """
device_dict, event_dicts = get_test_data(device_num)
print (f"creating test data for {device_dict['name']}")
with Session(engine) as session:
device = Device(**device_dict)
session.add(device)
session.flush()
events = [Event(**event_dict) for event_dict in event_dicts]
event_count = len(events)
device.events.extend(events)
session.add_all(events)
session.commit()
return event_count
if __name__ == '__main__':
metadata.create_all(engine)
# Throw this away before fork.
engine.dispose()
# I have a 4-core processor, so I chose 3.
with Pool(3) as p:
print (p.map(create_test_data, range(0, 10)))
# Accessing engine here should still work
# but a new connection will be created.
with Session(engine) as session:
print (session.query(Event).count())
输出
creating test data for device-1
creating test data for device-0
creating test data for device-2
creating test data for device-3
creating test data for device-4
creating test data for device-5
creating test data for device-6
creating test data for device-7
creating test data for device-8
creating test data for device-9
[2511, 2247, 2436, 2106, 2244, 2464, 2358, 2512, 2267, 2451]
23596
我正在回答我的问题,因为它最终与 SQLAlchemy 完全无关。
执行时:
processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]
在一个特定的批次上,没有直接原因,其中一个子进程没有正确退出,尽管它调用的脚本已到达末尾。
我搜索并发现这是使用 p.wait() 和具有 shell=True.
的 Popen 的问题
我设置 shell=False 并使用 Pipes 到 stdout 和 stderr,还在子进程执行的 python 脚本的末尾添加了一个 sys.exit(0) 到确保它正确终止执行。
希望能帮到别人!感谢伊恩的支持。
我有一个 python 脚本,它通过 sqlalchemy 使用以下方式处理数据事务:
def save_update(args):
session, engine = create_session(config["DATABASE"])
try:
instance = get_record(session)
if instance is None:
instance = create_record(session)
else:
instance = update_record(session, instance)
sync_errors(session, instance)
sync_expressions(session, instance)
sync_part(session, instance)
session.commit()
except:
session.rollback()
write_error(config)
raise
finally:
session.close()
在数据交易之上,我还有一些与数据库无关的处理——数据交易前的数据准备。这些预先要求的任务需要一些时间,所以我想并行执行这个完整脚本的多个实例(数据准备 + 使用 sqlalchemy 的数据事务)。
因此我在不同的脚本中做,此处简化示例:
process1 = Thread(target=call_script, args=[["python", python_file_path,
"-xml", xml_path,
"-o", args.outputFolder,
"-l", log_path]])
process2 = Thread(target=call_script, args=[["python", python_file_path,
"-xml", xml_path,
"-o", args.outputFolder,
"-l", log_path]])
process1.start()
process2.start()
process1.join()
process2.join()
目标函数“call_script”执行上面第一个提到的脚本(数据准备+用sqlalchemy进行数据交易):
def call_script(args):
status = subprocess.call(args, shell=True)
print(status)
所以现在总结一下,例如我将有 2 个子线程 + 主线程 运行。这些子线程中的每一个都在单独的进程中执行 sqlalchemy 代码。
因此,我的问题是我是否应该注意有关使用 sqlalchemy 的代码的多处理方面的任何特定注意事项?对我来说答案是否定的,因为这是多处理而不是多线程,因为使用 subprocess.call() 来执行我的代码。
现在在现实中,我时不时地感觉在执行过程中我有数据库锁。不确定这是否与我的代码有关,或者其他人在我处理它时也在访问数据库,但我希望每个子进程在开始工作时实际上锁定数据库,以便其他子进程等待当前session 关闭。
编辑
我已经使用多处理代替多线程进行测试:
processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]
我仍然有同样的问题,我得到了更多的细节: 我看到 SQL 服务器显示状态“AWAITING COMMAND”,只有当我终止执行命令的相关 python 进程时,这才会消失。 我觉得它出现在我高度并行化子流程时,但真的不确定。
在此先感谢您的支持。
这是一个有趣的情况。看起来也许你可以回避一些手动 process/thread 处理并利用诸如多处理的 Pool. I made an example based on some other data initializing code I had. This delegates creating test data and inserting it for each of 10 "devices" to a pool of 3 processes. One caveat that seems necessary is to dispose of the engine before it is shared across fork()
, ie. before the Pool tasks are created, this is mentioned here: engine-disposal
from random import randint
from datetime import datetime
from multiprocessing import Pool
from sqlalchemy import (
create_engine,
Integer,
DateTime,
String,
)
from sqlalchemy.schema import (
Column,
MetaData,
ForeignKey,
)
from sqlalchemy.orm import declarative_base, relationship, Session, backref
db_uri = 'postgresql+psycopg2://username:password@/database'
engine = create_engine(db_uri, echo=False)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True, index=True)
created_on = Column(DateTime, nullable=False, index=True)
device_id = Column(Integer, ForeignKey('devices.id'), nullable=True)
device = relationship('Device', backref=backref("events"))
class Device(Base):
__tablename__ = "devices"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
def get_test_data(device_num):
""" Generate a test device and its test events for the given device number. """
device_dict = dict(name=f'device-{device_num}')
event_dicts = []
for day in range(1, 5):
for hour in range(0, 24):
for _ in range(0, randint(0, 50)):
event_dicts.append({
"created_on": datetime(day=day, month=1, year=2022, hour=hour),
})
return (device_dict, event_dicts)
def create_test_data(device_num):
""" Actually write the test data to the database. """
device_dict, event_dicts = get_test_data(device_num)
print (f"creating test data for {device_dict['name']}")
with Session(engine) as session:
device = Device(**device_dict)
session.add(device)
session.flush()
events = [Event(**event_dict) for event_dict in event_dicts]
event_count = len(events)
device.events.extend(events)
session.add_all(events)
session.commit()
return event_count
if __name__ == '__main__':
metadata.create_all(engine)
# Throw this away before fork.
engine.dispose()
# I have a 4-core processor, so I chose 3.
with Pool(3) as p:
print (p.map(create_test_data, range(0, 10)))
# Accessing engine here should still work
# but a new connection will be created.
with Session(engine) as session:
print (session.query(Event).count())
输出
creating test data for device-1
creating test data for device-0
creating test data for device-2
creating test data for device-3
creating test data for device-4
creating test data for device-5
creating test data for device-6
creating test data for device-7
creating test data for device-8
creating test data for device-9
[2511, 2247, 2436, 2106, 2244, 2464, 2358, 2512, 2267, 2451]
23596
我正在回答我的问题,因为它最终与 SQLAlchemy 完全无关。 执行时:
processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]
在一个特定的批次上,没有直接原因,其中一个子进程没有正确退出,尽管它调用的脚本已到达末尾。 我搜索并发现这是使用 p.wait() 和具有 shell=True.
的 Popen 的问题我设置 shell=False 并使用 Pipes 到 stdout 和 stderr,还在子进程执行的 python 脚本的末尾添加了一个 sys.exit(0) 到确保它正确终止执行。
希望能帮到别人!感谢伊恩的支持。