如何在 python 电路中使用线程

How to use threads with python circuits

我最近在 python 中发现了用于构建异步应用程序的 Circuits 框架。我正在构建一个事件驱动的应用程序,这个框架似乎很适合我的需要。该框架基于组件的想法,组件在收到事件时做出反应。伟大的!!我按照教程创建了一些简单的应用程序,在我尝试使用组件执行一些繁重的计算任务之前,一切似乎都能正常工作。我知道,该框架支持工人,但我不想使用工人。我想做的是运行一个线程中的每个组件,这样几个组件可以并行执行。该框架似乎通过 start 方法支持这种模式。

来自 Component.py 源代码:

def start(self, process=False, link=None):
    """
    Start a new thread or process that invokes this manager's
    ``run()`` method. The invocation of this method returns
    immediately after the task or process has been started.
    """

    if process:
        # Parent<->Child Bridge
        if link is not None:
            from circuits.net.sockets import Pipe
            from circuits.core.bridge import Bridge

            channels = (uuid(),) * 2
            parent, child = Pipe(*channels)
            bridge = Bridge(parent, channel=channels[0]).register(link)

            args = (child,)
        else:
            args = ()
            bridge = None

        self.__process = Process(
            target=self.run, args=args, name=self.name
        )
        self.__process.daemon = True
        self.__process.start()

        return self.__process, bridge
    else:
        self.__thread = Thread(target=self.run, name=self.name)
        self.__thread.daemon = True
        self.__thread.start()

        return self.__thread, None

def join(self):
    if getattr(self, "_thread", None) is not None:
        return self.__thread.join()

    if getattr(self, "_process", None) is not None:
        return self.__process.join()

所以我尝试使用前面显示的 startjoin 方法来实现经典的 producer/consumer 应用程序。我希望生产者和消费者 运行 在他们自己的线程和主线程中等待直到他们完成。

import time
from threading import current_thread
from circuits import Component, Event, Debugger


class go(Event):
    """ go """

class produced(Event):
    """ produced """

class Consumer(Component):

    def started(self, *args):
        print(current_thread())
        print(current_thread().ident)
        print("Comuser started")

    def produced(self, *args):
        print("I am consuming...")

class Producer(Component):

    def started(self, *args):
        print("Producer started")
        print(current_thread().ident)

    def go(self, *args):
        print("gooooooooooo")
        while True:
            self.fire(produced())
            print("Produced element, going to sleep for 1 sec")
            time.sleep(1)

c = Consumer()
c.start()
p = Producer()
p.start()

p.fire(go())

c.join()
p.join()

不幸的是,上面的代码没有按预期工作。应用程序在主代码执行后立即退出。我的代码有什么问题?如果您知道以类似方式使用此库的任何示例,您可以向我提供 link 吗?

谢谢。

http://pythonhosted.org/circuits/


编辑

在 James 的回答之后,我又尝试了几种方法来 运行 组件,但我仍然无法让它们并行地 运行。

代码:

c = Consumer()
c.start()
p = Producer()
p.run()
p.fire(go())

输出:

<Thread(Consumer, started daemon 4334432256)>
4334432256
Comuser started
Producer started
140735301485312

应用似乎卡住了。然后,我尝试使用启动其他组件的主应用程序组件。

代码:

class App(Component):

    def started(self, *args):
        print("App started")
        p.fire(go())

(App() + Debugger()).run()

输出:

Comuser started
Producer started
4461318144
<registered[*] (<Debugger/* 75445:MainThread (queued=0) [S]>, <App/* 75445:MainThread (queued=2) [R]> )>
<started[*] (<App/* 75445:MainThread (queued=1) [R]> )>
App started
gooooooooooo
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
^C<signal[*] (2, <frame object at 0x7fe218725fa8> )>
<stopped[*] (<App/* 75445:MainThread (queued=0) [S]> )>

看起来只有生产者是 运行ning...我希望在输出中看到的是这样的:

Produced...
Consumed...
Produced...
Consumed...

这里是电路的作者。

不幸的是,这里没有调用 .run() 的 "main" 组件。这就是说您已经在 "thread" 模式下启动了两个组件,但现在没有 "main" 运行 组件。

如果您在 Producer 或 Consume 上调用 .run() 并以 "thread" 模式启动另一个;你应该得到更多你想要的预期结果。


更新: 很抱歉延迟回复,但我相信您正在使用这种模式(*基于 hello_multi_bridge.py 示例):

from os import getpid


from circuits import ipc, Component, Event, Timer


class go(Event):
    """go"""


class produced(Event):
    """produced"""


class Consumer(Component):

    def produced(self, *args):
        print("Consumed {} from {}".format(repr(args), getpid()))


class Producer(Component):

    def init(self):
        self.consumers = [
            Consumer().start(process=True, link=self),
        ]

    def ready(self, *args):
        Timer(1, go(), persist=True).register(self)

    def go(self, *args):
        for process, bridge in self.consumers:
            self.fire(ipc(produced(), bridge.channel))


class App(Component):

    def init(self):
        Producer().register(self)


App().run()

这会产生如下输出:

Consumed () from 68646
Consumed () from 68646
...