如何在 python 电路中使用线程
How to use threads with python circuits
我最近在 python 中发现了用于构建异步应用程序的 Circuits 框架。我正在构建一个事件驱动的应用程序,这个框架似乎很适合我的需要。该框架基于组件的想法,组件在收到事件时做出反应。伟大的!!我按照教程创建了一些简单的应用程序,在我尝试使用组件执行一些繁重的计算任务之前,一切似乎都能正常工作。我知道,该框架支持工人,但我不想使用工人。我想做的是运行一个线程中的每个组件,这样几个组件可以并行执行。该框架似乎通过 start
方法支持这种模式。
来自 Component.py 源代码:
def start(self, process=False, link=None):
"""
Start a new thread or process that invokes this manager's
``run()`` method. The invocation of this method returns
immediately after the task or process has been started.
"""
if process:
# Parent<->Child Bridge
if link is not None:
from circuits.net.sockets import Pipe
from circuits.core.bridge import Bridge
channels = (uuid(),) * 2
parent, child = Pipe(*channels)
bridge = Bridge(parent, channel=channels[0]).register(link)
args = (child,)
else:
args = ()
bridge = None
self.__process = Process(
target=self.run, args=args, name=self.name
)
self.__process.daemon = True
self.__process.start()
return self.__process, bridge
else:
self.__thread = Thread(target=self.run, name=self.name)
self.__thread.daemon = True
self.__thread.start()
return self.__thread, None
def join(self):
if getattr(self, "_thread", None) is not None:
return self.__thread.join()
if getattr(self, "_process", None) is not None:
return self.__process.join()
所以我尝试使用前面显示的 start
和 join
方法来实现经典的 producer/consumer 应用程序。我希望生产者和消费者 运行 在他们自己的线程和主线程中等待直到他们完成。
import time
from threading import current_thread
from circuits import Component, Event, Debugger
class go(Event):
""" go """
class produced(Event):
""" produced """
class Consumer(Component):
def started(self, *args):
print(current_thread())
print(current_thread().ident)
print("Comuser started")
def produced(self, *args):
print("I am consuming...")
class Producer(Component):
def started(self, *args):
print("Producer started")
print(current_thread().ident)
def go(self, *args):
print("gooooooooooo")
while True:
self.fire(produced())
print("Produced element, going to sleep for 1 sec")
time.sleep(1)
c = Consumer()
c.start()
p = Producer()
p.start()
p.fire(go())
c.join()
p.join()
不幸的是,上面的代码没有按预期工作。应用程序在主代码执行后立即退出。我的代码有什么问题?如果您知道以类似方式使用此库的任何示例,您可以向我提供 link 吗?
谢谢。
http://pythonhosted.org/circuits/
编辑
在 James 的回答之后,我又尝试了几种方法来 运行 组件,但我仍然无法让它们并行地 运行。
代码:
c = Consumer()
c.start()
p = Producer()
p.run()
p.fire(go())
输出:
<Thread(Consumer, started daemon 4334432256)>
4334432256
Comuser started
Producer started
140735301485312
应用似乎卡住了。然后,我尝试使用启动其他组件的主应用程序组件。
代码:
class App(Component):
def started(self, *args):
print("App started")
p.fire(go())
(App() + Debugger()).run()
输出:
Comuser started
Producer started
4461318144
<registered[*] (<Debugger/* 75445:MainThread (queued=0) [S]>, <App/* 75445:MainThread (queued=2) [R]> )>
<started[*] (<App/* 75445:MainThread (queued=1) [R]> )>
App started
gooooooooooo
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
^C<signal[*] (2, <frame object at 0x7fe218725fa8> )>
<stopped[*] (<App/* 75445:MainThread (queued=0) [S]> )>
看起来只有生产者是 运行ning...我希望在输出中看到的是这样的:
Produced...
Consumed...
Produced...
Consumed...
这里是电路的作者。
不幸的是,这里没有调用 .run()
的 "main" 组件。这就是说您已经在 "thread" 模式下启动了两个组件,但现在没有 "main" 运行 组件。
如果您在 Producer 或 Consume 上调用 .run()
并以 "thread" 模式启动另一个;你应该得到更多你想要的预期结果。
更新: 很抱歉延迟回复,但我相信您正在使用这种模式(*基于 hello_multi_bridge.py
示例):
from os import getpid
from circuits import ipc, Component, Event, Timer
class go(Event):
"""go"""
class produced(Event):
"""produced"""
class Consumer(Component):
def produced(self, *args):
print("Consumed {} from {}".format(repr(args), getpid()))
class Producer(Component):
def init(self):
self.consumers = [
Consumer().start(process=True, link=self),
]
def ready(self, *args):
Timer(1, go(), persist=True).register(self)
def go(self, *args):
for process, bridge in self.consumers:
self.fire(ipc(produced(), bridge.channel))
class App(Component):
def init(self):
Producer().register(self)
App().run()
这会产生如下输出:
Consumed () from 68646
Consumed () from 68646
...
我最近在 python 中发现了用于构建异步应用程序的 Circuits 框架。我正在构建一个事件驱动的应用程序,这个框架似乎很适合我的需要。该框架基于组件的想法,组件在收到事件时做出反应。伟大的!!我按照教程创建了一些简单的应用程序,在我尝试使用组件执行一些繁重的计算任务之前,一切似乎都能正常工作。我知道,该框架支持工人,但我不想使用工人。我想做的是运行一个线程中的每个组件,这样几个组件可以并行执行。该框架似乎通过 start
方法支持这种模式。
来自 Component.py 源代码:
def start(self, process=False, link=None):
"""
Start a new thread or process that invokes this manager's
``run()`` method. The invocation of this method returns
immediately after the task or process has been started.
"""
if process:
# Parent<->Child Bridge
if link is not None:
from circuits.net.sockets import Pipe
from circuits.core.bridge import Bridge
channels = (uuid(),) * 2
parent, child = Pipe(*channels)
bridge = Bridge(parent, channel=channels[0]).register(link)
args = (child,)
else:
args = ()
bridge = None
self.__process = Process(
target=self.run, args=args, name=self.name
)
self.__process.daemon = True
self.__process.start()
return self.__process, bridge
else:
self.__thread = Thread(target=self.run, name=self.name)
self.__thread.daemon = True
self.__thread.start()
return self.__thread, None
def join(self):
if getattr(self, "_thread", None) is not None:
return self.__thread.join()
if getattr(self, "_process", None) is not None:
return self.__process.join()
所以我尝试使用前面显示的 start
和 join
方法来实现经典的 producer/consumer 应用程序。我希望生产者和消费者 运行 在他们自己的线程和主线程中等待直到他们完成。
import time
from threading import current_thread
from circuits import Component, Event, Debugger
class go(Event):
""" go """
class produced(Event):
""" produced """
class Consumer(Component):
def started(self, *args):
print(current_thread())
print(current_thread().ident)
print("Comuser started")
def produced(self, *args):
print("I am consuming...")
class Producer(Component):
def started(self, *args):
print("Producer started")
print(current_thread().ident)
def go(self, *args):
print("gooooooooooo")
while True:
self.fire(produced())
print("Produced element, going to sleep for 1 sec")
time.sleep(1)
c = Consumer()
c.start()
p = Producer()
p.start()
p.fire(go())
c.join()
p.join()
不幸的是,上面的代码没有按预期工作。应用程序在主代码执行后立即退出。我的代码有什么问题?如果您知道以类似方式使用此库的任何示例,您可以向我提供 link 吗?
谢谢。
http://pythonhosted.org/circuits/
编辑
在 James 的回答之后,我又尝试了几种方法来 运行 组件,但我仍然无法让它们并行地 运行。
代码:
c = Consumer()
c.start()
p = Producer()
p.run()
p.fire(go())
输出:
<Thread(Consumer, started daemon 4334432256)>
4334432256
Comuser started
Producer started
140735301485312
应用似乎卡住了。然后,我尝试使用启动其他组件的主应用程序组件。
代码:
class App(Component):
def started(self, *args):
print("App started")
p.fire(go())
(App() + Debugger()).run()
输出:
Comuser started
Producer started
4461318144
<registered[*] (<Debugger/* 75445:MainThread (queued=0) [S]>, <App/* 75445:MainThread (queued=2) [R]> )>
<started[*] (<App/* 75445:MainThread (queued=1) [R]> )>
App started
gooooooooooo
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
Produced element, going to sleep for 1 sec
^C<signal[*] (2, <frame object at 0x7fe218725fa8> )>
<stopped[*] (<App/* 75445:MainThread (queued=0) [S]> )>
看起来只有生产者是 运行ning...我希望在输出中看到的是这样的:
Produced...
Consumed...
Produced...
Consumed...
这里是电路的作者。
不幸的是,这里没有调用 .run()
的 "main" 组件。这就是说您已经在 "thread" 模式下启动了两个组件,但现在没有 "main" 运行 组件。
如果您在 Producer 或 Consume 上调用 .run()
并以 "thread" 模式启动另一个;你应该得到更多你想要的预期结果。
更新: 很抱歉延迟回复,但我相信您正在使用这种模式(*基于 hello_multi_bridge.py
示例):
from os import getpid
from circuits import ipc, Component, Event, Timer
class go(Event):
"""go"""
class produced(Event):
"""produced"""
class Consumer(Component):
def produced(self, *args):
print("Consumed {} from {}".format(repr(args), getpid()))
class Producer(Component):
def init(self):
self.consumers = [
Consumer().start(process=True, link=self),
]
def ready(self, *args):
Timer(1, go(), persist=True).register(self)
def go(self, *args):
for process, bridge in self.consumers:
self.fire(ipc(produced(), bridge.channel))
class App(Component):
def init(self):
Producer().register(self)
App().run()
这会产生如下输出:
Consumed () from 68646
Consumed () from 68646
...