Python 运行 同时进行两个循环,其中一个循环受限并且依赖于另一个循环的数据
Python run two loops at the same time where one is rate limited and depends on data from the other
我在 python 中遇到问题,我想同时 运行 两个循环。我觉得我需要这样做,因为第二个循环需要进行速率限制,但第一个循环实际上不应该进行速率限制。此外,第二个循环从第一个循环获取输入。
我正在寻找类似这样的东西:
for line in file:
do some stuff
list = []
list.append("an_item")
Rate limited:
for x in list:
do some stuff simultaneously
你需要做两件事:
- 将需要来自其他进程的数据的函数放在它自己的进程中
- 实现一种在两个进程之间进行通信的方法(例如Queue)
所有这一切都必须感谢GIL。
有两种具有不同权衡的基本方法:在任务之间同步切换,以及 运行在线程或子进程中连接。首先,一些常见的设置:
from queue import Queue # or Queue, if python 2
work = Queue()
def fast_task():
""" Do the fast thing """
if done:
return None
else:
return result
def slow_task(arg):
""" Do the slow thing """
RATE_LIMIT = 30 # seconds
现在,同步方法。它的优点是更简单,更容易调试,但代价是有点慢。慢多少取决于您的任务的细节。它是如何工作的,我们 运行 一个紧密的循环,每次调用快速作业,只有经过足够的时间才调用慢速作业。如果快速作业不再产生工作并且队列为空,我们将退出。
import time
last_call = 0
while True:
next_job = fast_task()
if next_job:
work.put(next_job)
elif work.empty():
# nothing left to do
break
else:
# fast task has done all its work - short sleep to slow the spin
time.sleep(.1)
now = time.time()
if now - last_call > RATE_LIMIT:
last_call = now
slow_task(work.get())
如果您觉得这种方法不够快,您可以尝试 multiprocessing
方法。您可以使用相同的结构来处理线程或进程,具体取决于您是从 multiprocessing.dummy
还是从 multiprocessing
本身导入。我们使用 multiprocessing.Queue
代替 queue.Queue
进行通信。
def do_the_fast_loop(work_queue):
while True:
next_job = fast_task()
if next_job:
work_queue.put(next_job)
else:
work_queue.put(None) # sentinel - tells slow process to quit
break
def do_the_slow_loop(work_queue):
next_call = time.time()
while True:
job = work_queue.get()
if job is None: # sentinel seen - no more work to do
break
time.sleep(max(0, next_call - time.time()))
next_call = time.time() + RATE_LIMIT
slow_task(job)
if __name__ == '__main__':
# from multiprocessing.dummy import Queue, Process # for threads
from multiprocessing import Queue, Process # for processes
work = Queue()
fast = Process(target=fast_task, args=(work,))
slow = Process(target=slow_task, args=(work,))
fast.start()
slow.start()
fast.join()
slow.join()
如您所见,有更多的机制供您实现,但速度会快一些。同样,速度的多少在很大程度上取决于您的任务。我会尝试所有三种方法 - 同步、线程和多进程 - 看看你最喜欢哪一种。
我在 python 中遇到问题,我想同时 运行 两个循环。我觉得我需要这样做,因为第二个循环需要进行速率限制,但第一个循环实际上不应该进行速率限制。此外,第二个循环从第一个循环获取输入。
我正在寻找类似这样的东西:
for line in file:
do some stuff
list = []
list.append("an_item")
Rate limited:
for x in list:
do some stuff simultaneously
你需要做两件事:
- 将需要来自其他进程的数据的函数放在它自己的进程中
- 实现一种在两个进程之间进行通信的方法(例如Queue)
所有这一切都必须感谢GIL。
有两种具有不同权衡的基本方法:在任务之间同步切换,以及 运行在线程或子进程中连接。首先,一些常见的设置:
from queue import Queue # or Queue, if python 2
work = Queue()
def fast_task():
""" Do the fast thing """
if done:
return None
else:
return result
def slow_task(arg):
""" Do the slow thing """
RATE_LIMIT = 30 # seconds
现在,同步方法。它的优点是更简单,更容易调试,但代价是有点慢。慢多少取决于您的任务的细节。它是如何工作的,我们 运行 一个紧密的循环,每次调用快速作业,只有经过足够的时间才调用慢速作业。如果快速作业不再产生工作并且队列为空,我们将退出。
import time
last_call = 0
while True:
next_job = fast_task()
if next_job:
work.put(next_job)
elif work.empty():
# nothing left to do
break
else:
# fast task has done all its work - short sleep to slow the spin
time.sleep(.1)
now = time.time()
if now - last_call > RATE_LIMIT:
last_call = now
slow_task(work.get())
如果您觉得这种方法不够快,您可以尝试 multiprocessing
方法。您可以使用相同的结构来处理线程或进程,具体取决于您是从 multiprocessing.dummy
还是从 multiprocessing
本身导入。我们使用 multiprocessing.Queue
代替 queue.Queue
进行通信。
def do_the_fast_loop(work_queue):
while True:
next_job = fast_task()
if next_job:
work_queue.put(next_job)
else:
work_queue.put(None) # sentinel - tells slow process to quit
break
def do_the_slow_loop(work_queue):
next_call = time.time()
while True:
job = work_queue.get()
if job is None: # sentinel seen - no more work to do
break
time.sleep(max(0, next_call - time.time()))
next_call = time.time() + RATE_LIMIT
slow_task(job)
if __name__ == '__main__':
# from multiprocessing.dummy import Queue, Process # for threads
from multiprocessing import Queue, Process # for processes
work = Queue()
fast = Process(target=fast_task, args=(work,))
slow = Process(target=slow_task, args=(work,))
fast.start()
slow.start()
fast.join()
slow.join()
如您所见,有更多的机制供您实现,但速度会快一些。同样,速度的多少在很大程度上取决于您的任务。我会尝试所有三种方法 - 同步、线程和多进程 - 看看你最喜欢哪一种。