Kombu/Celery 消息传递
Kombu/Celery messaging
我有一个发送和接收消息的简单应用程序 kombu,并使用 Celery 处理消息。 Kombu alon,我可以正常接收消息。当我发送 "Hello" 时,kombu 收到 "Hello"。但是当我添加任务时,kombu收到的是celery的任务ID。
我做这个项目的目的是让我可以安排何时发送和接收消息,因此 Celery。
我想知道为什么kombu收到的是任务id而不是发送的消息?我找了又找,也没有找到关于这件事的相关结果。我是使用此应用程序的初学者,如果能帮我解决这个问题,我将不胜感激。
我的代码:
task.py
from celery import Celery
app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')
@app.task(name='task.add')
def add(x, y):
return x+y
send.py
import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
if connection.connect() is False:
print("not connected")
else:
print("connected")
#checks if connection is okay
#rabbitmq connection
channel = connection.channel()
#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
#message here
x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)
#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
exchange = exchange,
routing_key='queue1')
print("Message sent: [x]")
connection.release()
receive.py
import kombu
#receive
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
channel = connection.channel()
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
print("Waiting for messages...")
def callback(body, message):
print('Got message - %s' % body)
message.ack()
consumer = kombu.Consumer(channel,
queues=queue,
callbacks=[callback])
consumer.consume()
while True:
connection.drain_events()
我正在使用:
Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker
我发送的内容:
xxx
yyy
海带收到的东西:
Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a
您需要调用 result.get()
来接收 add.delay()
的实际值。您看到的消息正文是字符串格式的 AsyncResult
实例。这没有多大意义。
我找到了我的问题的答案,对于可能遇到此类问题的任何人,我将分享对我有用的答案。
I found the solution here.
Or here - 用户 jennaliu 如果第一个 link 不起作用,答案可能会对您有所帮助。
我有一个发送和接收消息的简单应用程序 kombu,并使用 Celery 处理消息。 Kombu alon,我可以正常接收消息。当我发送 "Hello" 时,kombu 收到 "Hello"。但是当我添加任务时,kombu收到的是celery的任务ID。
我做这个项目的目的是让我可以安排何时发送和接收消息,因此 Celery。
我想知道为什么kombu收到的是任务id而不是发送的消息?我找了又找,也没有找到关于这件事的相关结果。我是使用此应用程序的初学者,如果能帮我解决这个问题,我将不胜感激。
我的代码:
task.py
from celery import Celery
app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')
@app.task(name='task.add')
def add(x, y):
return x+y
send.py
import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
if connection.connect() is False:
print("not connected")
else:
print("connected")
#checks if connection is okay
#rabbitmq connection
channel = connection.channel()
#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
#message here
x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)
#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
exchange = exchange,
routing_key='queue1')
print("Message sent: [x]")
connection.release()
receive.py
import kombu
#receive
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
channel = connection.channel()
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
print("Waiting for messages...")
def callback(body, message):
print('Got message - %s' % body)
message.ack()
consumer = kombu.Consumer(channel,
queues=queue,
callbacks=[callback])
consumer.consume()
while True:
connection.drain_events()
我正在使用:
Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker
我发送的内容:
xxx
yyy
海带收到的东西:
Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a
您需要调用 result.get()
来接收 add.delay()
的实际值。您看到的消息正文是字符串格式的 AsyncResult
实例。这没有多大意义。
我找到了我的问题的答案,对于可能遇到此类问题的任何人,我将分享对我有用的答案。
I found the solution here.
Or here - 用户 jennaliu 如果第一个 link 不起作用,答案可能会对您有所帮助。