运行 python 来自动态 sql 列表的多进程

Running python multiprocesses from a dynamic sql list

您好,我正在尝试让我的代码更加动态和智能,为此我想通过动态列表 运行 调用我想要的函数,而不是将它们硬编码进去。这将清理上传代码并帮助自动重新运行 失败的脚本。

下面是我一直在处理的代码片段。 我通常在运行宁它

时得到这个错误

类型错误:'str'对象不可调用

 cursor = conn.cursor()
    cursor.execute(
        f'''select distinct caller from {db}.log a where a.log_text like 'Failed:%' and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.log a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
    df = as_pandas(cursor)
    print('The following scripts will be rerun')
    print(df)

    c = df['caller']

    processes = []
    # Loop over failed scripts/modules
    for mod in (c):  
        print(f'Rerun of {c}')
        p = multiprocessing.Process(target=mod, args=(db,))
        time.sleep(10)
        p.start()
        processes.append(p)

    for process in processes:
        process.join()

完整的回溯错误

Traceback (most recent call last): File "/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) TypeError: 'str' object is not callable Process Process-2: Traceback (most recent call last): File "/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) TypeError: 'str' object is not callable Process finished with exit code 0

所以我找到了解决问题的方法,并张贴在这里以备将来有人使用。

基本上我使用 getattr 和 importlib.import_module 函数。然后我通过 for 循环启动它们。

所以我使用 sql 获取所有失败模块的名称,然后将这些模块加载到 df 列表,然后通过 for 循环迭代该列表,这将启动失败的模块。

cursor.execute(
            f'''select distinct caller from {db}.sch_log_python a where a.log_text like 'Failed:%' and lower(caller) in ('st_%','ctrl_%','ar%') and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.sch_log_python a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
        df = as_pandas(cursor)
        print('The following scripts will be rerun')
        print(df)

        sch_log_func(caller, 'Rerun of failed scripts', db)

        c = df
        # Loop over failed scripts/modules
        for i in (c):
            cstr = c.to_string(index=False, header=False)
            print(f'Initialise ' + cstr)
            cstr = cstr.lower()
            print(cstr + ' Module to import')
            print('Trying to run ' + cstr)
            cls = getattr(importlib.import_module(cstr), cstr)
            cls(db)
            print(f'Started {cstr}')