多处理池:Python
Multiprocessing Pool: Python
我想使用 Python 并行 运行 SAP 报告。我已经弄清楚如何在不共享资源池的情况下 运行 并行处理所有内容,但我不知道如何创建资源池来共享资源。
示例:
有 6 个可用会话可以运行 报告。 k = [1:6] 但我有 8 份报告要 运行。第一份报告 k=1,第二份报告 k=2,依此类推,但在第 7 份报告中,它需要等到其中一个 k 可用,然后在第一份报告中 运行s可用 k。
下面是我的代码:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)
Shipments.start()
Invoiced.start()
如有任何帮助,我们将不胜感激!
我正在使用 python 2.7
根据下面的评论更新了代码(仍然没有正确处理,目前在两个函数的管理器列表中使用相同的 i。需要第一个函数使用 i = 0,第二个函数使用 i = 1. 然后在函数的末尾,将 i 追加回管理器列表)
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
#Code to start SAP
def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
multiprocessing.freeze.support()
with Manager() as manager:
list = manager.list(range(maxSess - 1))
I = list.pop(0)
with contextlib.closing(Pool(maxSess)) as pool:
pool.apply(_start_SAP)
pool.apply_async(_zmrosales_ship_month,[i])
pool.apply_async(_zmrosales_inv_month, [i])
pool.close()
pool.join()
为最终答案编辑
我无法使用下面提供的代码来解决我的情况,但逻辑和思维过程是有意义的,它可能会对其他人有所帮助,所以我将其标记为正确。
我找到了解决问题的办法,代码如下。这是使用队列与管理器和池的不同方法。
import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime
def _start_SAP():
def test1(q, lock):
print 'starting test1 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test1 shipments'
time.sleep(15)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test1 report'
def test2(q, lock):
print 'starting test2 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test2 shipments'
time.sleep(30)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test2 report'
def test3(q, lock):
print 'starting test3 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test3 shipments'
time.sleep(20)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test3 report'
def test4(q, lock):
print 'starting test4 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test4 shipments'
time.sleep(45)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test4 report'
def test5(q, lock):
print 'starting test5 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test5 shipments'
time.sleep(10)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test5 report'
def _end_SAP():
if __name__ == '__main__':
lock = multiprocessing.Lock() # creating a lock in multiprocessing
shared_list = range(6) # creating a shared list for all functions to use
q = multiprocessing.Queue() # creating an empty queue in mulitprocessing
for n in shared_list: # putting list into the queue
q.put(n)
print 'loaded queue to start the program'
StartSAP = Process(target=_start_SAP)
StartSAP.start()
StartSAP.join()
Test1 = Process(target=test1, args=(q, lock))
Test2 = Process(target=test2, args=(q, lock))
Test3 = Process(target=test3, args=(q, lock))
Test4 = Process(target=test4, args=(q, lock))
Test5 = Process(target=test5, args=(q, lock))
Test1.start()
Test2.start()
Test3.start()
Test4.start()
Test5.start()
Test1.join()
Test2.join()
Test3.join()
Test4.join()
Test5.join()
EndSAP = Process(target=_close_SAP)
EndSAP.start()
EndSAP.join()
while q.empty() is False:
print(q.get())
您可以采用以下伪代码来达到想要的效果:
from multiprocessing.pool import Pool
import multiprocessing
shared_list = multiprocessing.Manager().list()
def pool_function(i):
shared_list.append([multiprocessing.current_process().name, i])
with Pool(6) as pool:
for i in range(8):
pool.apply(
pool_function,
args=(i, )
)
print(shared_list)
输出:
[
['ForkPoolWorker-2', 0],
['ForkPoolWorker-5', 1],
['ForkPoolWorker-3', 2],
['ForkPoolWorker-4', 3],
['ForkPoolWorker-2', 4],
['ForkPoolWorker-6', 5],
['ForkPoolWorker-7', 6],
['ForkPoolWorker-5', 7]
]
合并代码:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
# Define shared resources using multiprocessing.Manager()
resource_manager = multiprocessing.Manager()
# FOLLOWING IS JUST FOR EXAMPLE PURPOSES
shared_list = resource_manager.list()
shared_dict = resource_manager.dict()
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
with Pool(maxSess) as pool:
pool.apply_async(
_zmrosales_ship_month
)
pool.apply_async(
_zmrosales_inv_month
)
pool.close()
pool.join()
如果您需要按顺序执行函数 - 将 apply_async
替换为 apply
或将 .get()
添加到每个调用(如 pool.apply_async(f).get()
)
有关多处理中共享资源的更多信息 - 请参阅 managers 参考资料
最终答案:
import contextlib
import datetime
import multiprocessing
import os
import time
from multiprocessing import Pool
from subprocess import call
import win32com.client
maxSess = 6 # max number of sessions allowed by license
num_reports = 8
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_start_SAP', k)
while True:
pass
def _zmrosales_ship_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
def _zmrosales_inv_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
# N times more def's with SAP scripts ####
def worker_init(shared_dict, shared_list):
"""
Map processes names to k
:param shared_dict: multiprocessing.Manager().dict()
:param shared_list: multiprocessing.Manager().list()
"""
shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)
if __name__ == '__main__':
multiprocessing.freeze.support()
with multiprocessing.Manager() as manager:
shared_list = manager.list(range(maxSess))
shared_dict = manager.dict()
p = Pool(
maxSess, # count of workers
initializer=worker_init, # each worker will call this on spawn
initargs=(shared_dict, shared_list,) # arguments for initializer
)
with contextlib.closing(p) as pool:
pool.apply_async(_start_SAP)
for i in range(num_reports):
pool.apply_async(_zmrosales_ship_month, args=(i,))
pool.apply_async(_zmrosales_inv_month, args=(i,))
p.close()
p.join()
输出:
ForkPoolWorker-2 _start_SAP 0
ForkPoolWorker-3 _zmrosales_ship_month 1 0
ForkPoolWorker-4 _zmrosales_inv_month 3 0
ForkPoolWorker-7 _zmrosales_ship_month 2 1
ForkPoolWorker-5 _zmrosales_inv_month 4 1
ForkPoolWorker-6 _zmrosales_ship_month 5 2
ForkPoolWorker-3 _zmrosales_inv_month 1 2
ForkPoolWorker-4 _zmrosales_ship_month 3 3
ForkPoolWorker-7 _zmrosales_inv_month 2 3
ForkPoolWorker-5 _zmrosales_ship_month 4 4
ForkPoolWorker-6 _zmrosales_inv_month 5 4
ForkPoolWorker-3 _zmrosales_ship_month 1 5
ForkPoolWorker-4 _zmrosales_inv_month 3 5
ForkPoolWorker-7 _zmrosales_ship_month 2 6
ForkPoolWorker-5 _zmrosales_inv_month 4 6
ForkPoolWorker-6 _zmrosales_ship_month 5 7
ForkPoolWorker-3 _zmrosales_inv_month 1 7
我想使用 Python 并行 运行 SAP 报告。我已经弄清楚如何在不共享资源池的情况下 运行 并行处理所有内容,但我不知道如何创建资源池来共享资源。
示例:
有 6 个可用会话可以运行 报告。 k = [1:6] 但我有 8 份报告要 运行。第一份报告 k=1,第二份报告 k=2,依此类推,但在第 7 份报告中,它需要等到其中一个 k 可用,然后在第一份报告中 运行s可用 k。
下面是我的代码:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)
Shipments.start()
Invoiced.start()
如有任何帮助,我们将不胜感激!
我正在使用 python 2.7
根据下面的评论更新了代码(仍然没有正确处理,目前在两个函数的管理器列表中使用相同的 i。需要第一个函数使用 i = 0,第二个函数使用 i = 1. 然后在函数的末尾,将 i 追加回管理器列表)
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
#Code to start SAP
def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
multiprocessing.freeze.support()
with Manager() as manager:
list = manager.list(range(maxSess - 1))
I = list.pop(0)
with contextlib.closing(Pool(maxSess)) as pool:
pool.apply(_start_SAP)
pool.apply_async(_zmrosales_ship_month,[i])
pool.apply_async(_zmrosales_inv_month, [i])
pool.close()
pool.join()
为最终答案编辑
我无法使用下面提供的代码来解决我的情况,但逻辑和思维过程是有意义的,它可能会对其他人有所帮助,所以我将其标记为正确。
我找到了解决问题的办法,代码如下。这是使用队列与管理器和池的不同方法。
import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime
def _start_SAP():
def test1(q, lock):
print 'starting test1 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test1 shipments'
time.sleep(15)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test1 report'
def test2(q, lock):
print 'starting test2 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test2 shipments'
time.sleep(30)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test2 report'
def test3(q, lock):
print 'starting test3 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test3 shipments'
time.sleep(20)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test3 report'
def test4(q, lock):
print 'starting test4 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test4 shipments'
time.sleep(45)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test4 report'
def test5(q, lock):
print 'starting test5 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test5 shipments'
time.sleep(10)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test5 report'
def _end_SAP():
if __name__ == '__main__':
lock = multiprocessing.Lock() # creating a lock in multiprocessing
shared_list = range(6) # creating a shared list for all functions to use
q = multiprocessing.Queue() # creating an empty queue in mulitprocessing
for n in shared_list: # putting list into the queue
q.put(n)
print 'loaded queue to start the program'
StartSAP = Process(target=_start_SAP)
StartSAP.start()
StartSAP.join()
Test1 = Process(target=test1, args=(q, lock))
Test2 = Process(target=test2, args=(q, lock))
Test3 = Process(target=test3, args=(q, lock))
Test4 = Process(target=test4, args=(q, lock))
Test5 = Process(target=test5, args=(q, lock))
Test1.start()
Test2.start()
Test3.start()
Test4.start()
Test5.start()
Test1.join()
Test2.join()
Test3.join()
Test4.join()
Test5.join()
EndSAP = Process(target=_close_SAP)
EndSAP.start()
EndSAP.join()
while q.empty() is False:
print(q.get())
您可以采用以下伪代码来达到想要的效果:
from multiprocessing.pool import Pool
import multiprocessing
shared_list = multiprocessing.Manager().list()
def pool_function(i):
shared_list.append([multiprocessing.current_process().name, i])
with Pool(6) as pool:
for i in range(8):
pool.apply(
pool_function,
args=(i, )
)
print(shared_list)
输出:
[
['ForkPoolWorker-2', 0],
['ForkPoolWorker-5', 1],
['ForkPoolWorker-3', 2],
['ForkPoolWorker-4', 3],
['ForkPoolWorker-2', 4],
['ForkPoolWorker-6', 5],
['ForkPoolWorker-7', 6],
['ForkPoolWorker-5', 7]
]
合并代码:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
# Define shared resources using multiprocessing.Manager()
resource_manager = multiprocessing.Manager()
# FOLLOWING IS JUST FOR EXAMPLE PURPOSES
shared_list = resource_manager.list()
shared_dict = resource_manager.dict()
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
with Pool(maxSess) as pool:
pool.apply_async(
_zmrosales_ship_month
)
pool.apply_async(
_zmrosales_inv_month
)
pool.close()
pool.join()
如果您需要按顺序执行函数 - 将 apply_async
替换为 apply
或将 .get()
添加到每个调用(如 pool.apply_async(f).get()
)
有关多处理中共享资源的更多信息 - 请参阅 managers 参考资料
最终答案:
import contextlib
import datetime
import multiprocessing
import os
import time
from multiprocessing import Pool
from subprocess import call
import win32com.client
maxSess = 6 # max number of sessions allowed by license
num_reports = 8
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_start_SAP', k)
while True:
pass
def _zmrosales_ship_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
def _zmrosales_inv_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
# N times more def's with SAP scripts ####
def worker_init(shared_dict, shared_list):
"""
Map processes names to k
:param shared_dict: multiprocessing.Manager().dict()
:param shared_list: multiprocessing.Manager().list()
"""
shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)
if __name__ == '__main__':
multiprocessing.freeze.support()
with multiprocessing.Manager() as manager:
shared_list = manager.list(range(maxSess))
shared_dict = manager.dict()
p = Pool(
maxSess, # count of workers
initializer=worker_init, # each worker will call this on spawn
initargs=(shared_dict, shared_list,) # arguments for initializer
)
with contextlib.closing(p) as pool:
pool.apply_async(_start_SAP)
for i in range(num_reports):
pool.apply_async(_zmrosales_ship_month, args=(i,))
pool.apply_async(_zmrosales_inv_month, args=(i,))
p.close()
p.join()
输出:
ForkPoolWorker-2 _start_SAP 0
ForkPoolWorker-3 _zmrosales_ship_month 1 0
ForkPoolWorker-4 _zmrosales_inv_month 3 0
ForkPoolWorker-7 _zmrosales_ship_month 2 1
ForkPoolWorker-5 _zmrosales_inv_month 4 1
ForkPoolWorker-6 _zmrosales_ship_month 5 2
ForkPoolWorker-3 _zmrosales_inv_month 1 2
ForkPoolWorker-4 _zmrosales_ship_month 3 3
ForkPoolWorker-7 _zmrosales_inv_month 2 3
ForkPoolWorker-5 _zmrosales_ship_month 4 4
ForkPoolWorker-6 _zmrosales_inv_month 5 4
ForkPoolWorker-3 _zmrosales_ship_month 1 5
ForkPoolWorker-4 _zmrosales_inv_month 3 5
ForkPoolWorker-7 _zmrosales_ship_month 2 6
ForkPoolWorker-5 _zmrosales_inv_month 4 6
ForkPoolWorker-6 _zmrosales_ship_month 5 7
ForkPoolWorker-3 _zmrosales_inv_month 1 7