使用多处理使处理列表的速度加倍
Using multiprocessing to double the speed of working on a list
假设我有一个这样的列表:
list_base = ['a','b','c','d']
如果我使用 for xxx in list_base:
,循环将一次解析列表的一个值。如果我想将这项工作的速度提高一倍,我将创建一个包含两个值的列表以一次迭代并调用 multiprocessing
.
基本示例
代码 1 (main_code.py
):
import api_values
if __name__ == '__main__':
list_base = ['a','b','c','d']
api_values.main(list_base)
代码 2 (api_values.py
):
import multiprocessing
import datetime
def add_hour(x):
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
for a, b in a_pairs:
mp_1 = multiprocessing.Process(target=add_hour, args=(a,))
mp_2 = multiprocessing.Process(target=add_hour, args=(b,))
mp_1.start()
mp_2.start()
mp_1.join()
mp_2.join()
final_list.append(mp_1)
final_list.append(mp_2)
print(final_list)
当我分析 final_list
打印时,它提供如下值:
[
<Process name='Process-1' pid=9564 parent=19136 stopped exitcode=0>,
<Process name='Process-2' pid=5400 parent=19136 stopped exitcode=0>,
<Process name='Process-3' pid=13396 parent=19136 stopped exitcode=0>,
<Process name='Process-4' pid=5132 parent=19136 stopped exitcode=0>
]
我无法通过调用 add_hour(x)
函数获得我想要征服的 return 值。
我在这个问题中找到了一些答案:
How can I recover the return value of a function passed to multiprocessing.Process?
但是我无法将我正在使用的场景带到需要函数内部而不是 if __name__ == '__main__':
的地方 multiprocessing
尝试使用它时,它总是会生成与创建的代码结构的位置相关的错误,我需要一些帮助,以便能够根据我的需要可视化使用。
注:
这些代码是一个基本的示例,我的实际用途是从允许最多同时调用两个的 API 中提取数据。
附加代码:
根据@Timus 的评论 (You might want to look into a **Pool** and **.apply_async**
),我看到这段代码,在我看来它可以工作,但我不知道它是否可靠,是否需要对其进行任何改进使用并且这个选项是最好的,随时在答案中更新:
import multiprocessing
import datetime
final_list = []
def foo_pool(x):
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
def log_result(result):
final_list.append(result)
def main(list_base):
pool = multiprocessing.Pool()
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
for a, b in a_pairs:
pool.apply_async(foo_pool, args = (a, ), callback = log_result)
pool.apply_async(foo_pool, args = (b, ), callback = log_result)
pool.close()
pool.join()
print(final_list)
我认为您需要在进程之间共享字符串。它们可以从 multiprocessing.Manager()
.
获得
您的 api_values.py
应如下所示:
import multiprocessing
import datetime
from ctypes import c_wchar_p
def add_hour(x, ret_str):
ret_str.value = str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
manager = multiprocessing.Manager()
for a, b in a_pairs:
ret_str_a = manager.Value(c_wchar_p, "")
ret_str_b = manager.Value(c_wchar_p, "")
mp_1 = multiprocessing.Process(target=add_hour, args=(a, ret_str_a))
mp_2 = multiprocessing.Process(target=add_hour, args=(b, ret_str_b))
mp_1.start()
mp_2.start()
mp_1.join()
mp_2.join()
final_list.append(ret_str_a.value)
final_list.append(ret_str_b.value)
print(final_list)
来源:How to share a string amongst multiple processes using Managers() in Python?
你不必使用回调:Pool.apply_async()
给你一个 return(一个 AsyncResult
对象),它有一个 .get()
方法来检索结果的提交。延长您的尝试时间:
import time
import multiprocessing
import datetime
from os import getpid
def foo_pool(x):
print(getpid())
time.sleep(2)
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
with multiprocessing.Pool(processes=2) as pool:
for a, b in a_pairs:
res_1 = pool.apply_async(foo_pool, args=(a,))
res_2 = pool.apply_async(foo_pool, args=(b,))
final_list.extend([res_1.get(), res_2.get()])
print(final_list)
if __name__ == '__main__':
list_base = ['a','b','c','d']
start = time.perf_counter()
main(list_base)
end = time.perf_counter()
print(end - start)
我已将 print(getpid())
添加到 foo_pool
以表明您实际上使用的是不同的进程。我用 time
来说明,尽管 foo_pool
中有 time.sleep(2)
,但 main
的总持续时间不超过 2 秒。
假设我有一个这样的列表:
list_base = ['a','b','c','d']
如果我使用 for xxx in list_base:
,循环将一次解析列表的一个值。如果我想将这项工作的速度提高一倍,我将创建一个包含两个值的列表以一次迭代并调用 multiprocessing
.
基本示例
代码 1 (main_code.py
):
import api_values
if __name__ == '__main__':
list_base = ['a','b','c','d']
api_values.main(list_base)
代码 2 (api_values.py
):
import multiprocessing
import datetime
def add_hour(x):
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
for a, b in a_pairs:
mp_1 = multiprocessing.Process(target=add_hour, args=(a,))
mp_2 = multiprocessing.Process(target=add_hour, args=(b,))
mp_1.start()
mp_2.start()
mp_1.join()
mp_2.join()
final_list.append(mp_1)
final_list.append(mp_2)
print(final_list)
当我分析 final_list
打印时,它提供如下值:
[
<Process name='Process-1' pid=9564 parent=19136 stopped exitcode=0>,
<Process name='Process-2' pid=5400 parent=19136 stopped exitcode=0>,
<Process name='Process-3' pid=13396 parent=19136 stopped exitcode=0>,
<Process name='Process-4' pid=5132 parent=19136 stopped exitcode=0>
]
我无法通过调用 add_hour(x)
函数获得我想要征服的 return 值。
我在这个问题中找到了一些答案:
How can I recover the return value of a function passed to multiprocessing.Process?
但是我无法将我正在使用的场景带到需要函数内部而不是 if __name__ == '__main__':
multiprocessing
尝试使用它时,它总是会生成与创建的代码结构的位置相关的错误,我需要一些帮助,以便能够根据我的需要可视化使用。
注:
这些代码是一个基本的示例,我的实际用途是从允许最多同时调用两个的 API 中提取数据。
附加代码:
根据@Timus 的评论 (You might want to look into a **Pool** and **.apply_async**
),我看到这段代码,在我看来它可以工作,但我不知道它是否可靠,是否需要对其进行任何改进使用并且这个选项是最好的,随时在答案中更新:
import multiprocessing
import datetime
final_list = []
def foo_pool(x):
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
def log_result(result):
final_list.append(result)
def main(list_base):
pool = multiprocessing.Pool()
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
for a, b in a_pairs:
pool.apply_async(foo_pool, args = (a, ), callback = log_result)
pool.apply_async(foo_pool, args = (b, ), callback = log_result)
pool.close()
pool.join()
print(final_list)
我认为您需要在进程之间共享字符串。它们可以从 multiprocessing.Manager()
.
您的 api_values.py
应如下所示:
import multiprocessing
import datetime
from ctypes import c_wchar_p
def add_hour(x, ret_str):
ret_str.value = str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
manager = multiprocessing.Manager()
for a, b in a_pairs:
ret_str_a = manager.Value(c_wchar_p, "")
ret_str_b = manager.Value(c_wchar_p, "")
mp_1 = multiprocessing.Process(target=add_hour, args=(a, ret_str_a))
mp_2 = multiprocessing.Process(target=add_hour, args=(b, ret_str_b))
mp_1.start()
mp_2.start()
mp_1.join()
mp_2.join()
final_list.append(ret_str_a.value)
final_list.append(ret_str_b.value)
print(final_list)
来源:How to share a string amongst multiple processes using Managers() in Python?
你不必使用回调:Pool.apply_async()
给你一个 return(一个 AsyncResult
对象),它有一个 .get()
方法来检索结果的提交。延长您的尝试时间:
import time
import multiprocessing
import datetime
from os import getpid
def foo_pool(x):
print(getpid())
time.sleep(2)
return str(x) + ' - ' + datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
def main(list_base):
a = list_base
a_pairs = [a[i:i+2] for i in range(0, len(a)-1, 2)]
if (len(a) % 2) != 0:
a_pairs.append([a[-1]])
final_list = []
with multiprocessing.Pool(processes=2) as pool:
for a, b in a_pairs:
res_1 = pool.apply_async(foo_pool, args=(a,))
res_2 = pool.apply_async(foo_pool, args=(b,))
final_list.extend([res_1.get(), res_2.get()])
print(final_list)
if __name__ == '__main__':
list_base = ['a','b','c','d']
start = time.perf_counter()
main(list_base)
end = time.perf_counter()
print(end - start)
我已将 print(getpid())
添加到 foo_pool
以表明您实际上使用的是不同的进程。我用 time
来说明,尽管 foo_pool
中有 time.sleep(2)
,但 main
的总持续时间不超过 2 秒。