Python 多处理循环
Python Multiprocessing Loop
我希望使用 multiprocessing
来加速缓慢的循环。但是,根据我所看到的多处理示例,我不确定这种实现是否是好的做法、是否可行或可能。
循环大致分为两部分:data ingestion
和 data processing
。我希望在处理过程中开始下一部分数据摄取,以便数据尽快可用。
伪代码:
d = get_data(n)
for n in range(N):
p = process_data(d)
d = get_data(n+1) #prepare data for next process loop
- 多处理适合这种功能吗?
- 如何做到这一点?
提前致谢。
如您所说,多处理基本上是调度和收集工作。
正如您所澄清的,您基本上希望 process_data
和 get_data
并行工作。
这是我的解决方案
import multiprocessing as mp
# create pool for dispatching work
pool = mp.Pool()
# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))
# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()
# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done
你可以把它包装在你的循环中。
希望这能回答您的问题!
假设您希望单个 thread/process 摄取数据,因为它将 I/O 而不是 CPU 绑定。在将数据传递到处理层之前,您只对数据进行了最少的解析 and/or 验证。
让我们进一步假设您可以完全并行地对每个输入项进行数据处理;这些输入项之间没有排序,也没有 time/sequencing 依赖性。
在那种情况下,您的任务基本上是 "fan out" 处理模型的典型代表。您创建了一个 multiprocessing.Queue object. Then you create a multiprocessing.Pool。然后,此初始化代码成为摄取处理任务(队列的 "producer"),进程池全部成为消费者,执行处理。
网上有很多这样的例子,第一个 link 可能有几个使用这种模式。
当然,剩下的问题是您将如何处理结果。
如果他们需要序列化回某个单个文件,那么显而易见的方法是创建两个 Queue 对象......一个用于工作队列(摄取进程提供它,池进程从中消耗)另一个是输出队列(池馈入其中,然后一个进程从中消费以将结果连贯地写入您的输出)。请注意,让您的主(摄取)进程多路复用是可能的,有时效率很高。它可以将输入数据读取与输出队列上的轮询交错以写出结果。但是,当然,您也可以启动另一个专门用于输出处理的进程。
另一方面,您的结果可能是并行写入的,也许是由工作进程写入的。如果您将结果写入许多文件,或者将它们作为 INSERT 或 UPDATE 语句发布到某些 SQL 数据库,或者将它们提供给 Hadoop HDFS 或 Spark DataSet,这很好。有许多适合并行写入的输出形式。
您也可能想要分离处理层和 output/results 处理层。可能是您的应用程序将通过数据处理层中的大量进程和输出层中的少量进程进行优化调整。 (例如,如果每个项目的处理都是 CPU 密集的,并且您有很多内核,那么您可能会遇到太多进程阻塞 I/O 通道而 CPUs闲着)。
再次使用队列。它们旨在支持多生产者和多消费者的一致性。您可以摆脱并发锁定、死锁和活锁问题等问题的雷区。
我希望使用 multiprocessing
来加速缓慢的循环。但是,根据我所看到的多处理示例,我不确定这种实现是否是好的做法、是否可行或可能。
循环大致分为两部分:data ingestion
和 data processing
。我希望在处理过程中开始下一部分数据摄取,以便数据尽快可用。
伪代码:
d = get_data(n)
for n in range(N):
p = process_data(d)
d = get_data(n+1) #prepare data for next process loop
- 多处理适合这种功能吗?
- 如何做到这一点?
提前致谢。
如您所说,多处理基本上是调度和收集工作。
正如您所澄清的,您基本上希望 process_data
和 get_data
并行工作。
这是我的解决方案
import multiprocessing as mp
# create pool for dispatching work
pool = mp.Pool()
# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))
# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()
# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done
你可以把它包装在你的循环中。 希望这能回答您的问题!
假设您希望单个 thread/process 摄取数据,因为它将 I/O 而不是 CPU 绑定。在将数据传递到处理层之前,您只对数据进行了最少的解析 and/or 验证。
让我们进一步假设您可以完全并行地对每个输入项进行数据处理;这些输入项之间没有排序,也没有 time/sequencing 依赖性。
在那种情况下,您的任务基本上是 "fan out" 处理模型的典型代表。您创建了一个 multiprocessing.Queue object. Then you create a multiprocessing.Pool。然后,此初始化代码成为摄取处理任务(队列的 "producer"),进程池全部成为消费者,执行处理。
网上有很多这样的例子,第一个 link 可能有几个使用这种模式。
当然,剩下的问题是您将如何处理结果。
如果他们需要序列化回某个单个文件,那么显而易见的方法是创建两个 Queue 对象......一个用于工作队列(摄取进程提供它,池进程从中消耗)另一个是输出队列(池馈入其中,然后一个进程从中消费以将结果连贯地写入您的输出)。请注意,让您的主(摄取)进程多路复用是可能的,有时效率很高。它可以将输入数据读取与输出队列上的轮询交错以写出结果。但是,当然,您也可以启动另一个专门用于输出处理的进程。
另一方面,您的结果可能是并行写入的,也许是由工作进程写入的。如果您将结果写入许多文件,或者将它们作为 INSERT 或 UPDATE 语句发布到某些 SQL 数据库,或者将它们提供给 Hadoop HDFS 或 Spark DataSet,这很好。有许多适合并行写入的输出形式。
您也可能想要分离处理层和 output/results 处理层。可能是您的应用程序将通过数据处理层中的大量进程和输出层中的少量进程进行优化调整。 (例如,如果每个项目的处理都是 CPU 密集的,并且您有很多内核,那么您可能会遇到太多进程阻塞 I/O 通道而 CPUs闲着)。
再次使用队列。它们旨在支持多生产者和多消费者的一致性。您可以摆脱并发锁定、死锁和活锁问题等问题的雷区。