在多处理中更改列表的值

Changing values of list in multiprocessing

我是 python 多处理的新手,了解以下代码的背景知识。我正在尝试创建三个进程,一个用于向列表添加元素,一个用于修改列表中的元素,一个用于打印列表。

理想情况下,这三个进程使用共享内存中的同一个列表,使用 manager 启动。

我面临的问题是 testprocess2 无法将值设置为 0,基本上,它无法更改列表。

class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class testprocess2(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                self.trades[idx].order_id = 0
            # lock.release()
            sleep(1)


class testprocess1(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(self.trades[idx].order_id)

            sleep(1)


class testprocess(Process):
    def __init__(self, trades, lock):
        super().__init__(args=(trades, lock))
        self.trades = trades
        self.lock = lock

    def run(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = testprocess(records, lock)
        p1.start()

        p2 = testprocess1(records, lock)
        p2.start()

        p3 = testprocess2(records, lock)
        p3.start()

        p1.join()
        p2.join()
        p3.join()

严格来说,您的 managed 列表不在共享内存中, 了解正在发生的事情非常重要。保存 Trade 实例的实际列表驻留在执行 Manager() 调用时创建的进程中。当您随后执行 records = manager.list([Trade(5)]) 时,records 不是对该列表的直接引用,因为正如我所说,我们不是在处理共享内存。它是一个特殊的 proxy 对象,它实现了与列表相同的方法,但是当你,例如,在这个代理对象上调用 append 时,它会接受你正在尝试的参数附加和序列化它并通过套接字或管道将其传输到管理器的进程,在那里它获得 de-serialized 并附加到实际列表。简而言之,对代理对象的操作变成了远程方法调用。

现在解决你的问题。您正在尝试使用以下语句重置 order_id 属性:

self.trades[idx].order_id = 0

由于我们是通过代理对象处理远程列表,不幸的是,上述语句等同于:

trade = self.trades[idx] # fetch object from the remote list
trade.order_id = 0 # reset the order_id to 0 on the local copy

缺少的是用新更新的贸易对象更新列表:

self.trades[idx] = trade

所以你的单个更新语句确实需要用上面的 3 语句序列替换。

我还冒昧地以多种方式修改了您的代码。

  1. PEP8 Style Guide for Python Code 建议 class 名称大写。
  2. 由于您所有的进程 class 的构造方式相同(即具有相同的 __init__ 方法),我创建了一个抽象基础 class,TestProcess 这些 classes 继承自。他们所要做的就是提供一个run方法。
  3. 我已经制作了这些进程 classes daemon classes。这意味着它们将在主进程终止时自动终止。我这样做是为了演示目的,这样程序就不会无限循环。主进程将在 15 秒后终止。
  4. 您不需要将 tradeslock 参数传递给 [=26= 的 __init__ 方法] class。如果你不是从 Process 派生你的 classes 并且你只是想,例如,让你新创建的进程是 运行 一个函数 foo 接受参数 交易锁定,那么您将指定p1 = Process(target=foo, args=(trades, lock))。这就是 args 参数的真正目的,即与 target 参数一起使用。有关详细信息,请参阅 threading.Thread class 的文档。实际上,我认为从 multiprocessing.Process 派生 classes 的价值很小(如果不这样做, 重用 就有更好的机会)。但是既然你这样做了,你已经在你的 __init__ 方法中设置了实例属性 self.tradesself.lock,当你的 run 方法被你调用start 方法。您无需再做任何事情。 请参阅最后的两个附加代码示例。
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class TestProcess(Process, ABC):
    def __init__(self, trades, lock):
        Process.__init__(self, daemon=True)
        self.trades = trades
        self.lock = lock

    @abstractmethod
    def run():
        pass

class TestProcess2(TestProcess):
    def run(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                trade = self.trades[idx]
                trade.order_id = 0
                # We must tell the managed list that it has been updated!!!:
                self.trades[idx] = trade
            # lock.release()
            sleep(1)


class TestProcess1(TestProcess):
    def run(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(f'index = {idx}, order id = {self.trades[idx].order_id}')

            sleep(1)


class TestProcess(TestProcess):
    def run(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = TestProcess(records, lock)
        p1.start()

        p2 = TestProcess1(records, lock)
        p2.start()

        p3 = TestProcess2(records, lock)
        p3.start()

        sleep(15) # run for 15 seconds

使用 classes 不是从 multiprocessing.Process

派生的
from multiprocessing import Process, Manager, Lock
from time import sleep
import random
from abc import ABC, abstractmethod


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


class TestProcess(ABC):
    def __init__(self, trades, lock):
        self.trades = trades
        self.lock = lock

    @abstractmethod
    def process():
        pass

class TestProcess2(TestProcess):
    def process(self):
        while True:
            # lock.acquire()
            print("Altering")
            for idx in range(len(self.trades)):
                trade = self.trades[idx]
                trade.order_id = 0
                # We must tell the managed list that it has been updated!!!:
                self.trades[idx] = trade
            # lock.release()
            sleep(1)


class TestProcess1(TestProcess):
    def process(self):
        while True:
            print("start")
            for idx in range(len(self.trades)):
                print(f'index = {idx}, order id = {self.trades[idx].order_id}')

            sleep(1)


class TestProcess(TestProcess):
    def process(self):
        while True:
            # lock.acquire()
            n = random.randint(0, 9)
            print("adding random {}".format(n))
            self.trades.append(Trade(n))
            # lock.release()
            # print(trades)
            sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        tp = TestProcess(records, lock)
        p1 = Process(target=tp.process, daemon=True)
        p1.start()

        tp1 = TestProcess1(records, lock)
        p2 = Process(target=tp1.process, daemon=True)
        p2.start()

        tp2 = TestProcess2(records, lock)
        p3 = Process(target=tp2.process, daemon=True)
        p3.start()

        sleep(15) # run for 15 seconds

使用函数而不是从 multiprocessing.Process

派生的 classes
from multiprocessing import Process, Manager, Lock
from time import sleep
import random


class Trade:
    def __init__(self, id):
        self.exchange = None
        self.order_id = id


def testprocess2(trades, lock):
    while True:
        # lock.acquire()
        print("Altering")
        for idx in range(len(trades)):
            trade = trades[idx]
            trade.order_id = 0
            # We must tell the managed list that it has been updated!!!:
            trades[idx] = trade
        # lock.release()
        sleep(1)


def testprocess1(trades, lock):
    while True:
        print("start")
        for idx in range(len(trades)):
            print(f'index = {idx}, order id = {trades[idx].order_id}')

        sleep(1)


def testprocess(trades, lock):
    while True:
        # lock.acquire()
        n = random.randint(0, 9)
        print("adding random {}".format(n))
        trades.append(Trade(n))
        # lock.release()
        # print(trades)
        sleep(5)


if __name__ == "__main__":

    with Manager() as manager:
        records = manager.list([Trade(5)])
        lock = Lock()

        p1 = Process(target=testprocess, args=(records, lock), daemon=True)
        p1.start()

        p2 = Process(target=testprocess1, args=(records, lock), daemon=True)
        p2.start()

        p3 = Process(target=testprocess2, args=(records, lock), daemon=True)
        p3.start()

        sleep(15) # run for 15 seconds