如何向 Pykka actor 发送 RabbitMQ 消息?
How to send RabbitMQ messages to Pykka actor?
更新 2015 年 8 月:对于想要使用消息传递的人,我目前推荐 zeromq。可以作为 pykka 的补充或完全替代。
我如何监听 RabbitMQ 队列中的消息,然后将它们转发给 Pykka 中的参与者?
目前,当我尝试这样做时,出现了奇怪的行为,系统停止运行。
以下是我如何实现演员:
class EventListener(eventlet.EventletActor):
def __init__(self, target):
"""
:param pykka.ActorRef target: Where to send the queue messages.
"""
super(EventListener, self).__init__()
self.target = target
def on_start(self):
ApplicationService.listen_for_events(self.actor_ref)
这是我在 ApplicationService
class 中的方法,它应该检查队列中的新消息:
@classmethod
def listen_for_events(cls, actor):
"""
Subscribe to messages and forward them to the given actor.
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
message = pickle.loads(body)
actor.tell(message)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming()
似乎 start_consuming
无限期阻塞。有没有办法让我自己定期 "poll" 排队?
我认为你的所有代码都是正确的。如果您想检查每个演员使用的队列,您可以在从 Actor#start
.
返回的演员参考中检查他们的 actor_inbox
属性 是否可用
我在从 EventletActor
继承时遇到了类似的问题 运行 所以为了测试我使用 EventletActor
和 ThreadingActor
尝试了相同的代码。据我从源代码中可以看出,他们都使用 eventlet
来工作。 ThreadingActor
对我来说很好,但 EventletActor
不适用于 ActorRef#tell
,它适用于 ActorRef#ask
。
我从同一目录中的两个文件开始,如下所示。
my_actors.py
:初始化两个 actor,它们将通过打印以 class 名称开头的消息内容来响应消息。
from pykka.eventlet import EventletActor
import pykka
class MyThreadingActor(pykka.ThreadingActor):
def __init__(self):
super(MyThreadingActor, self).__init__()
def on_receive(self, message):
print(
"MyThreadingActor Received: {message}".format(
message=message)
)
class MyEventletActor(EventletActor):
def __init__(self):
super(MyEventletActor, self).__init__()
def on_receive(self, message):
print(
"MyEventletActor Received: {message}".format(
message=message)
)
my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()
my_queue.py
:在pika中建立一个队列,向队列发送消息,转发给之前设置的两个actor。在将消息告知每个演员后,他们当前的演员收件箱会检查队列中的任何内容。
from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika
def on_message(channel, method_frame, header_frame, body):
print "Received Message", body
my_threading_actor_ref.tell({"msg": body})
my_eventlet_actor_ref.tell({"msg": body})
print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
queue_name = 'test'
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# It is very important to stop these actors, otherwise you may lockup
my_threading_actor_ref.stop()
my_eventlet_actor_ref.stop()
connection.close()
当我运行my_queue.py
输出如下:
Received Message A Message
ThreadingActor Inbox <Queue.Queue instance at 0x10bf55878>
MyThreadingActor Received: {'msg': 'A Message'}
EventletActor Inbox <Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>
当我点击 CTRL+C
停止队列时,我注意到 EventletActor
最终收到消息并打印出来:
^C
MyEventletActor Received: {'msg': 'A Message'}
所有这些让我相信 EventletActor
中可能存在错误,我认为您的代码没有问题,并且存在一个我在第一次检查时无法在代码中找到的错误。
希望这些信息对您有所帮助。
更新 2015 年 8 月:对于想要使用消息传递的人,我目前推荐 zeromq。可以作为 pykka 的补充或完全替代。
我如何监听 RabbitMQ 队列中的消息,然后将它们转发给 Pykka 中的参与者?
目前,当我尝试这样做时,出现了奇怪的行为,系统停止运行。
以下是我如何实现演员:
class EventListener(eventlet.EventletActor):
def __init__(self, target):
"""
:param pykka.ActorRef target: Where to send the queue messages.
"""
super(EventListener, self).__init__()
self.target = target
def on_start(self):
ApplicationService.listen_for_events(self.actor_ref)
这是我在 ApplicationService
class 中的方法,它应该检查队列中的新消息:
@classmethod
def listen_for_events(cls, actor):
"""
Subscribe to messages and forward them to the given actor.
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
message = pickle.loads(body)
actor.tell(message)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming()
似乎 start_consuming
无限期阻塞。有没有办法让我自己定期 "poll" 排队?
我认为你的所有代码都是正确的。如果您想检查每个演员使用的队列,您可以在从 Actor#start
.
actor_inbox
属性 是否可用
我在从 EventletActor
继承时遇到了类似的问题 运行 所以为了测试我使用 EventletActor
和 ThreadingActor
尝试了相同的代码。据我从源代码中可以看出,他们都使用 eventlet
来工作。 ThreadingActor
对我来说很好,但 EventletActor
不适用于 ActorRef#tell
,它适用于 ActorRef#ask
。
我从同一目录中的两个文件开始,如下所示。
my_actors.py
:初始化两个 actor,它们将通过打印以 class 名称开头的消息内容来响应消息。
from pykka.eventlet import EventletActor
import pykka
class MyThreadingActor(pykka.ThreadingActor):
def __init__(self):
super(MyThreadingActor, self).__init__()
def on_receive(self, message):
print(
"MyThreadingActor Received: {message}".format(
message=message)
)
class MyEventletActor(EventletActor):
def __init__(self):
super(MyEventletActor, self).__init__()
def on_receive(self, message):
print(
"MyEventletActor Received: {message}".format(
message=message)
)
my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()
my_queue.py
:在pika中建立一个队列,向队列发送消息,转发给之前设置的两个actor。在将消息告知每个演员后,他们当前的演员收件箱会检查队列中的任何内容。
from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika
def on_message(channel, method_frame, header_frame, body):
print "Received Message", body
my_threading_actor_ref.tell({"msg": body})
my_eventlet_actor_ref.tell({"msg": body})
print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
queue_name = 'test'
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# It is very important to stop these actors, otherwise you may lockup
my_threading_actor_ref.stop()
my_eventlet_actor_ref.stop()
connection.close()
当我运行my_queue.py
输出如下:
Received Message A Message
ThreadingActor Inbox
<Queue.Queue instance at 0x10bf55878>
MyThreadingActor Received:
{'msg': 'A Message'}
EventletActor Inbox
<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>
当我点击 CTRL+C
停止队列时,我注意到 EventletActor
最终收到消息并打印出来:
^C
MyEventletActor Received:{'msg': 'A Message'}
所有这些让我相信 EventletActor
中可能存在错误,我认为您的代码没有问题,并且存在一个我在第一次检查时无法在代码中找到的错误。
希望这些信息对您有所帮助。