使用 sqlalchemy 进行多处理

Multi Processing with sqlalchemy

我有一个 python 脚本,它通过 sqlalchemy 使用以下方式处理数据事务:


def save_update(args):
    session, engine = create_session(config["DATABASE"])

    try:
        instance = get_record(session)
        if instance is None:
            instance = create_record(session)
        else:
            instance = update_record(session, instance)

        sync_errors(session, instance)
        sync_expressions(session, instance)
        sync_part(session, instance)

        session.commit()
    except:
        session.rollback()
        write_error(config)
        raise
    finally:
        session.close()

在数据交易之上,我还有一些与数据库无关的处理——数据交易前的数据准备。这些预先要求的任务需要一些时间,所以我想并行执行这个完整脚本的多个实例(数据准备 + 使用 sqlalchemy 的数据事务)。

因此我在不同的脚本中做,此处简化示例:

process1 = Thread(target=call_script, args=[["python", python_file_path,
     "-xml", xml_path,
     "-o", args.outputFolder,
     "-l", log_path]])

process2 = Thread(target=call_script, args=[["python", python_file_path,
     "-xml", xml_path,
     "-o", args.outputFolder,
     "-l", log_path]])

process1.start()
process2.start()
process1.join()
process2.join()

目标函数“call_script”执行上面第一个提到的脚本(数据准备+用sqlalchemy进行数据交易):

def call_script(args):
    status = subprocess.call(args, shell=True)
    print(status)

所以现在总结一下,例如我将有 2 个子线程 + 主线程 运行。这些子线程中的每一个都在单独的进程中执行 sqlalchemy 代码。

因此,我的问题是我是否应该注意有关使用 sqlalchemy 的代码的多处理方面的任何特定注意事项?对我来说答案是否定的,因为这是多处理而不是多线程,因为使用 subprocess.call() 来执行我的代码。

现在在现实中,我时不时地感觉在执行过程中我有数据库锁。不确定这是否与我的代码有关,或者其他人在我处理它时也在访问数据库,但我希望每个子进程在开始工作时实际上锁定数据库,以便其他子进程等待当前session 关闭。

编辑

我已经使用多处理代替多线程进行测试:

    processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]

我仍然有同样的问题,我得到了更多的细节: 我看到 SQL 服务器显示状态“AWAITING COMMAND”,只有当我终止执行命令的相关 python 进程时,这才会消失。 我觉得它出现在我高度并行化子流程时,但真的不确定。

在此先感谢您的支持。

这是一个有趣的情况。看起来也许你可以回避一些手动 process/thread 处理并利用诸如多处理的 Pool. I made an example based on some other data initializing code I had. This delegates creating test data and inserting it for each of 10 "devices" to a pool of 3 processes. One caveat that seems necessary is to dispose of the engine before it is shared across fork(), ie. before the Pool tasks are created, this is mentioned here: engine-disposal

from random import randint
from datetime import datetime
from multiprocessing import Pool

from sqlalchemy import (
    create_engine,
    Integer,
    DateTime,
    String,
)
from sqlalchemy.schema import (
    Column,
    MetaData,
    ForeignKey,
)
from sqlalchemy.orm import declarative_base, relationship, Session, backref

db_uri = 'postgresql+psycopg2://username:password@/database'

engine = create_engine(db_uri, echo=False)

metadata = MetaData()

Base = declarative_base(metadata=metadata)

class Event(Base):
    __tablename__ = "events"
    id = Column(Integer, primary_key=True, index=True)
    created_on = Column(DateTime, nullable=False, index=True)
    device_id = Column(Integer, ForeignKey('devices.id'), nullable=True)
    device = relationship('Device', backref=backref("events"))


class Device(Base):
    __tablename__ = "devices"
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50))


def get_test_data(device_num):
    """ Generate a test device and its test events for the given device number. """
    device_dict = dict(name=f'device-{device_num}')
    event_dicts = []
    for day in range(1, 5):
        for hour in range(0, 24):
            for _ in range(0, randint(0, 50)):
                event_dicts.append({
                    "created_on": datetime(day=day, month=1, year=2022, hour=hour),
                })
    return (device_dict, event_dicts)


def create_test_data(device_num):
    """ Actually write the test data to the database. """
    device_dict, event_dicts = get_test_data(device_num)
    print (f"creating test data for {device_dict['name']}")

    with Session(engine) as session:
        device = Device(**device_dict)
        session.add(device)
        session.flush()
        events = [Event(**event_dict) for event_dict in event_dicts]
        event_count = len(events)
        device.events.extend(events)
        session.add_all(events)
        session.commit()
    return event_count


if __name__ == '__main__':

    metadata.create_all(engine)

    # Throw this away before fork.
    engine.dispose()

    # I have a 4-core processor, so I chose 3.
    with Pool(3) as p:
        print (p.map(create_test_data, range(0, 10)))

    # Accessing engine here should still work
    # but a new connection will be created.
    with Session(engine) as session:
        print (session.query(Event).count())


输出


creating test data for device-1
creating test data for device-0
creating test data for device-2
creating test data for device-3
creating test data for device-4
creating test data for device-5
creating test data for device-6
creating test data for device-7
creating test data for device-8
creating test data for device-9
[2511, 2247, 2436, 2106, 2244, 2464, 2358, 2512, 2267, 2451]
23596

我正在回答我的问题,因为它最终与 SQLAlchemy 完全无关。 执行时:

processes = [subprocess.Popen(cmd[0], shell=True) for cmd in commands]

在一个特定的批次上,没有直接原因,其中一个子进程没有正确退出,尽管它调用的脚本已到达末尾。 我搜索并发现这是使用 p.wait() 和具有 shell=True.

的 Popen 的问题

我设置 shell=False 并使用 Pipes 到 stdout 和 stderr,还在子进程执行的 python 脚本的末尾添加了一个 sys.exit(0) 到确保它正确终止执行。

希望能帮到别人!感谢伊恩的支持。