2个for循环可以同时运行,一个接一个循环吗?

Can 2 for loops be run simultaneously, looping one after the other?

我只是想知道,您将如何创建一个循环,使每次迭代都一个接一个地发生?我知道多线程是一回事,而且我很熟悉它。我无法弄清楚的一件事是如何 运行 一个接一个地循环。

例如,假设我有 2 个函数:

def loop_a():
    while True:
        time.sleep(1)
        print("a")

def loop_b():
    while True:
        print("b")

如何使输出为 ababababababababa,即使第一个函数中存在 time.sleep(1)

我正在使用 mpi4py,并且想知道是否有任何方法可以使用此库来执行此操作。 我的实际程序需要在函数之间发送消息。否则,使用任何其他 python 库(例如 multiprocessing 应该没问题。

有没有办法使用 threading 来做到这一点?

您可以使用协程:

import asyncio

q = asyncio.Queue()

async def loop_a(q):
  for i in range(10):
    value = await q.get()
    print(value)

async def loop_b(q):
  for i in range(10):
    await q.put("a")
    print("b")


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(loop_a(q), loop_b(q)))

这里有 live example

唯一的想法是,除非您以某种方式同步它们,否则无法保证执行顺序。

这是您问题第一部分的解决方案 - 如何 运行 并行处理,以便每个进程等待前一个进程完成后开始处理任务。我没有在这里解决消息传递方面的问题,因为它对我来说似乎有点模糊,并且可以根据问题陈述以不同的方式实现。在这个例子中,我们创建了 运行 三个工人,它们通过简单的时间延迟来模拟执行。代码片段应保存到一个文件中,可以从命令行 运行 。

我们首先导入所需的模块:

#!/usr/bin/env python3
import time

from multiprocessing import Process, Event

并实施 WorkerQueue class。这 class 使工作人员保持正确的顺序,并负责启动和终止他们。 Worker 之间的通信是使用事件实现的。每个工人都有 other_readyready Event 字段,指示前一个工人和当前工人的完成状态,因此。注意,如果队列中只有一个worker,那么它的other_readyready是一样的

class WorkerQueue(object):

    def __init__(self):
        self._workers = []

    def add_worker(self, worker):

        if self._workers:
            worker.other_ready = self._workers[-1].ready
            self._workers[0].other_ready = worker.ready
        else:
            worker.other_ready = worker.ready

        self._workers.append(worker)

    def start_workers(self):

        if not self._workers:
            return

        self._workers[0].other_ready.set()

        for w in self._workers:
            w.start()

    def stop_workers(self):

        for w in self._workers:
            w.join()

然后,我们通过继承Processclass来实现worker本身。请注意,也可以使用 threading 而不是 multiprocessing。在这种情况下,唯一改变的是 Worker 父 class、Thread 而不是 Process.

class Worker(Process):

    def __init__(self, delay, name=None):
        super().__init__(name=name)
        self.delay = delay
        self.other_ready = Event()
        self.other_ready.set()
        self.ready = Event()
        self.stop = Event()

    def run(self):

        while not self.stop.is_set():

            try:
                self.other_ready.wait()

                t = time.strftime('%H:%M:%S')
                print('Started:', self.name, t, flush=True)

                time.sleep(self.delay)

                t = time.strftime('%H:%M:%S')
                print('Finished:', self.name, t, flush=True)
            except:
                break

            self.other_ready.clear()
            self.ready.set()

    def join(self, timeout=None):
        self.stop.set()
        super().join(timeout)

在这里你可以看到,每个worker在开始执行命令之前等待前一个worker准备就绪。默认情况下,设置 other_ready 这样我们就不会在队列中只有一个 worker 的情况下遇到死锁。

最后,我们实现了一个 main 函数,我们在其中定义了 worker,将它们添加到 worker 队列并启动它们。

def main():
    first = Worker(delay=1, name='first')
    second = Worker(delay=3, name='second')
    third = Worker(delay=2, name='third')

    queue = WorkerQueue()

    for w in (first, second, third):
        queue.add_worker(w)

    queue.start_workers()

    try:

        # The main infinite loop, do something useful:
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        pass
    finally:
        queue.stop_workers()

不要忘记将以下行添加到文件末尾:

if __name__ == '__main__':
    main()

现在,它已准备好保存到文件中,比如 proc_queue.py,您可以从命令行 运行 查看结果:

$ python3 proc_queue.py 
Started: first 16:04:09
Finished: first 16:04:10
Started: second 16:04:10
Finished: second 16:04:13
Started: third 16:04:13
Finished: third 16:04:15
Started: first 16:04:15
Finished: first 16:04:16
Started: second 16:04:16
Finished: second 16:04:19
Started: third 16:04:19
Finished: third 16:04:21
^C

它可能有点过于复杂,但这是我能想到的唯一解决方案。如果您知道更好的方法,我很乐意了解它:)

在伪代码中:

main()
1. set lock for loop1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait

loop1()
1. do the following forever:
2.    acquire lock for loop1
3.    print 'a'
4.    release lock for loop2

loop2()
1. do the following forever:
2.    acquire lock for loop2
3.    print 'b'
4.    release lock for loop1

您可以将锁实现为共享内存变量或等待从您的对等方或其他任何地方获取消息的循环。获取锁意味着阻塞或自旋锁(轮询)直到锁准备好;释放锁将适当地设置共享变量或向正确的对等方发送正确的消息。

编辑:根据评论,这里使用许多可用的实施策略之一对 loop1() 和 loop2() 进行了更全面的开发:

(shared lock in global scope)

main()
1. lock = 1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait

loop1()
1. do the following forever
2.    loop until lock = 1
3.    print 'a'
4.    lock = 2

loop2()
1. do the following forever
2.    loop until lock = 2
3.    print 'b'
4.    lock = 1

此实现使用自旋锁并依赖线程安全共享变量 lock 来协调工作。自旋锁可能适合也可能不适合您的应用程序。您可以将这些与某些阻塞机制结合使用,以处理延迟为代价减少处理。

关键是 lock 是有状态的并且(应该)只能由正确的线程获取。如果每个线程都是 "next" 线程的 "aware" 并在完成时向其发送消息,那么您可以对消息传递做同样的事情...然后所有线程都等待接收消息。

main()
1. start loop1 on background thread
2. start loop2 on background thread
3. message loop1
4. wait

loop1()
1. do the following forever
2.    loop until message received
3.    print 'a'
4.    message loop2

loop2()
1. do the following forever
2.    loop until message received
3.    print 'b'
4.    message loop1
import asyncio

q = asyncio.Queue()

async def loop_a(q):
  for i in range(10):
    value = await q.get()
    print(value)

async def loop_b(q):
  for i in range(10):
    await q.put("a")
    print("b")


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(loop_a(q), loop_b(q)))