多处理池:Python

Multiprocessing Pool: Python

我想使用 Python 并行 运行 SAP 报告。我已经弄清楚如何在不共享资源池的情况下 运行 并行处理所有内容,但我不知道如何创建资源池来共享资源。

示例:

有 6 个可用会话可以运行 报告。 k = [1:6] 但我有 8 份报告要 运行。第一份报告 k=1,第二份报告 k=2,依此类推,但在第 7 份报告中,它需要等到其中一个 k 可用,然后在第一份报告中 运行s可用 k。

下面是我的代码:

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _zmrosales_ship_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

def _zmrosales_inv_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
    Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)

    Shipments.start()
    Invoiced.start()

如有任何帮助,我们将不胜感激!

我正在使用 python 2.7

根据下面的评论更新了代码(仍然没有正确处理,目前在两个函数的管理器列表中使用相同的 i。需要第一个函数使用 i = 0,第二个函数使用 i = 1. 然后在函数的末尾,将 i 追加回管理器列表)

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _start_SAP():
#Code to start SAP

def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    multiprocessing.freeze.support()
    with Manager() as manager:
        list = manager.list(range(maxSess - 1))
        I = list.pop(0)
        with contextlib.closing(Pool(maxSess)) as pool:
            pool.apply(_start_SAP)
            pool.apply_async(_zmrosales_ship_month,[i])
            pool.apply_async(_zmrosales_inv_month, [i])
            pool.close()
            pool.join()

为最终答案编辑

我无法使用下面提供的代码来解决我的情况,但逻辑和思维过程是有意义的,它可能会对其他人有所帮助,所以我将其标记为正确。

我找到了解决问题的办法,代码如下。这是使用队列与管理器和池的不同方法。

import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime

def _start_SAP():

def test1(q, lock):

    print 'starting test1 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test1 shipments'
            time.sleep(15)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test1 report'


def test2(q, lock):
    print 'starting test2 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test2 shipments'
            time.sleep(30)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test2 report'


def test3(q, lock):
    print 'starting test3 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test3 shipments'
            time.sleep(20)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test3 report'


def test4(q, lock):
    print 'starting test4 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test4 shipments'
            time.sleep(45)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test4 report'


def test5(q, lock):
    print 'starting test5 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test5 shipments'
            time.sleep(10)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test5 report'

def _end_SAP():


if __name__ == '__main__':

    lock = multiprocessing.Lock()  # creating a lock in multiprocessing

    shared_list = range(6)  # creating a shared list for all functions to use
    q = multiprocessing.Queue()  # creating an empty queue in mulitprocessing
    for n in shared_list:  # putting list into the queue
        q.put(n)
    print 'loaded queue to start the program'

    StartSAP = Process(target=_start_SAP)
    StartSAP.start()
    StartSAP.join()

    Test1 = Process(target=test1, args=(q, lock))
    Test2 = Process(target=test2, args=(q, lock))
    Test3 = Process(target=test3, args=(q, lock))
    Test4 = Process(target=test4, args=(q, lock))
    Test5 = Process(target=test5, args=(q, lock))

    Test1.start()
    Test2.start()
    Test3.start()
    Test4.start()
    Test5.start()
    Test1.join()
    Test2.join()
    Test3.join()
    Test4.join()
    Test5.join()

    EndSAP = Process(target=_close_SAP)
    EndSAP.start()
    EndSAP.join()

    while q.empty() is False:
        print(q.get())

您可以采用以下伪代码来达到想要的效果:

from multiprocessing.pool import  Pool
import multiprocessing

shared_list = multiprocessing.Manager().list()


def pool_function(i):
    shared_list.append([multiprocessing.current_process().name, i])


with Pool(6) as pool:
    for i in range(8):
        pool.apply(
            pool_function,
            args=(i, )
        )


print(shared_list)

输出:

[
    ['ForkPoolWorker-2', 0],
    ['ForkPoolWorker-5', 1],
    ['ForkPoolWorker-3', 2],
    ['ForkPoolWorker-4', 3],
    ['ForkPoolWorker-2', 4],
    ['ForkPoolWorker-6', 5],
    ['ForkPoolWorker-7', 6],
    ['ForkPoolWorker-5', 7]
]

合并代码:

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime

# Define shared resources using multiprocessing.Manager()

resource_manager = multiprocessing.Manager()

# FOLLOWING IS JUST FOR EXAMPLE PURPOSES
shared_list = resource_manager.list()
shared_dict = resource_manager.dict()

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _zmrosales_ship_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

def _zmrosales_inv_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    with Pool(maxSess) as pool:
        pool.apply_async(
            _zmrosales_ship_month
        )
        pool.apply_async(
            _zmrosales_inv_month
        )
        pool.close()
        pool.join()

如果您需要按顺序执行函数 - 将 apply_async 替换为 apply 或将 .get() 添加到每个调用(如 pool.apply_async(f).get()

有关多处理中共享资源的更多信息 - 请参阅 managers 参考资料

最终答案:

import contextlib
import datetime
import multiprocessing
import os
import time
from multiprocessing import Pool
from subprocess import call

import win32com.client

maxSess = 6  # max number of sessions allowed by license
num_reports = 8
filePath = os.path.join(os.getcwd(), 'sap_files')


def _start_SAP():
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_start_SAP', k)
    while True:
        pass


def _zmrosales_ship_month(i):
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
    time.sleep(1)
    # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
    # SAP script


def _zmrosales_inv_month(i):
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
    time.sleep(1)

    # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
    # SAP script


# N times more def's with SAP scripts ####

def worker_init(shared_dict, shared_list):
    """
    Map processes names to k
    :param shared_dict: multiprocessing.Manager().dict()
    :param shared_list: multiprocessing.Manager().list()
    """
    shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)


if __name__ == '__main__':

    multiprocessing.freeze.support()

    with multiprocessing.Manager() as manager:
        shared_list = manager.list(range(maxSess))
        shared_dict = manager.dict()

        p = Pool(
            maxSess,  # count of workers
            initializer=worker_init,  # each worker will call this on spawn
            initargs=(shared_dict, shared_list,)  # arguments for initializer
        )

        with contextlib.closing(p) as pool:
            pool.apply_async(_start_SAP)

            for i in range(num_reports):
                pool.apply_async(_zmrosales_ship_month, args=(i,))
                pool.apply_async(_zmrosales_inv_month, args=(i,))

        p.close()
        p.join()

输出:

ForkPoolWorker-2 _start_SAP 0
ForkPoolWorker-3 _zmrosales_ship_month 1 0
ForkPoolWorker-4 _zmrosales_inv_month 3 0
ForkPoolWorker-7 _zmrosales_ship_month 2 1
ForkPoolWorker-5 _zmrosales_inv_month 4 1
ForkPoolWorker-6 _zmrosales_ship_month 5 2
ForkPoolWorker-3 _zmrosales_inv_month 1 2
ForkPoolWorker-4 _zmrosales_ship_month 3 3
ForkPoolWorker-7 _zmrosales_inv_month 2 3
ForkPoolWorker-5 _zmrosales_ship_month 4 4
ForkPoolWorker-6 _zmrosales_inv_month 5 4
ForkPoolWorker-3 _zmrosales_ship_month 1 5
ForkPoolWorker-4 _zmrosales_inv_month 3 5
ForkPoolWorker-7 _zmrosales_ship_month 2 6
ForkPoolWorker-5 _zmrosales_inv_month 4 6
ForkPoolWorker-6 _zmrosales_ship_month 5 7
ForkPoolWorker-3 _zmrosales_inv_month 1 7