如何在RabbitMQ中将PHP和return任务回队列?
How in RabbitMQ and PHP return task back to the queue?
如果处理结果不适合我,我如何return将消息返回队列。仅找到有关消息确认的信息,但我认为它不适合我。如果作为处理的结果,我需要将参数 RETRY 消息添加回队列。然后这个工人或另一个工人再次拿起它并尝试处理它。
例如:
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
$condition = json_decode($msg->body);
if (!$condition) {
# return to the queue
}
};
$channel->basic_consume('test', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
解决方案比我想象的要简单,事实证明这个任务不是专门针对 RabbitMQ,而是关于变量的范围。如果有人对解决方案感兴趣,请点击此处:
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
global $channel;
$condition = json_decode($msg->body);
if (!$condition) {
$msg = new AMQPMessage(json_encode(array(
'condition' => false
)));
$channel->basic_publish($msg, '', 'test');
}
};
$channel->basic_consume('test', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
将自动 no_ack 标志设置为 false
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
$channel->basic_consume('test', '', false, false, false, false, $callback);
你必须使用确认,如果你的进程不工作你可以忽略确认
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($message) {
$condition = json_decode($message->body);
if (!$condition) {
// return to the queue
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}else{
// send ack , remove from queue
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
};
$channel->basic_consume('test', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
当然用这种方法你会面临消息总是在队列的头部,还有另一种可能性,
如果你真的想重试,你可以按照下面的方法
定义一个重试队列,最好是你的队列名 -retry
最好定义一个死信队列:-dlq
然后你可以做如下的事情:
如何设置 -retry
队列:
这是其中最重要的部分。您需要声明具有以下功能的队列:
x-dead-letter-exchange: 应与您的主队列路由键相同
x-dead-letter-routing-key: 应该和你的主队列路由键相同
x-message-ttl:重试之间的延迟
代码为 sudo 代码,请勿复制粘贴,这只是给您一个提示
$maximumRetry = 5;
$callback = function($message) {
$body = json_decode($message->body);
try {
// process result is your condition
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} catch(Exception $e) {
// return to the queue
$body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1
if ($body['try_attempt'] >= $maximumRetry ){
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
return
}
$msg = new AMQPMessage(json_encode($message));
$channel->basic_publish($msg, '', 'test-retry');
}
};
我们需要 3 个队列才能重新绑定。
queue.example
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example
- 特点:
- x-死信交换:queue.exchange
- x-dead-letter-routing-key: queue.example-dlq
queue.example-dlq
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example-dlq
queue.example-重试
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example-重试
- 特点:
- x-死信交换:queue.exchange
- x-dead-letter-routing-key:queue.example-已添加
- x-消息-ttl: 10000
------------更新------------
Quorum 队列提供开箱即用的功能,因此在消费者中,您可以了解每条消息被重试的次数,您还可以轻松地为其定义死信队列,有关更多信息,您可以阅读更多关于 quorom queues and poison message handling
这可能有点晚了,但这是你应该如何使用这个版本的 php-amqplib "php-amqplib/php-amqplib": "^3.1"
你需要将基本消费方法的 no_ack 参数设置为false(这是默认值)然后在传递给回调的 AMQPMessage 对象上使用方法 nack 在回调中明确指定它
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
$condition = json_decode($msg->body);
if (!$condition) {
// message will be added back to the queue
$msg->nack(true);
}
};
$channel->basic_consume('test', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
如果处理结果不适合我,我如何return将消息返回队列。仅找到有关消息确认的信息,但我认为它不适合我。如果作为处理的结果,我需要将参数 RETRY 消息添加回队列。然后这个工人或另一个工人再次拿起它并尝试处理它。
例如:
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
$condition = json_decode($msg->body);
if (!$condition) {
# return to the queue
}
};
$channel->basic_consume('test', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
解决方案比我想象的要简单,事实证明这个任务不是专门针对 RabbitMQ,而是关于变量的范围。如果有人对解决方案感兴趣,请点击此处:
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
global $channel;
$condition = json_decode($msg->body);
if (!$condition) {
$msg = new AMQPMessage(json_encode(array(
'condition' => false
)));
$channel->basic_publish($msg, '', 'test');
}
};
$channel->basic_consume('test', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
将自动 no_ack 标志设置为 false
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
$channel->basic_consume('test', '', false, false, false, false, $callback);
你必须使用确认,如果你的进程不工作你可以忽略确认
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($message) {
$condition = json_decode($message->body);
if (!$condition) {
// return to the queue
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}else{
// send ack , remove from queue
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
};
$channel->basic_consume('test', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
当然用这种方法你会面临消息总是在队列的头部,还有另一种可能性, 如果你真的想重试,你可以按照下面的方法
定义一个重试队列,最好是你的队列名 -retry
最好定义一个死信队列:-dlq
然后你可以做如下的事情:
如何设置 -retry
队列:
这是其中最重要的部分。您需要声明具有以下功能的队列:
x-dead-letter-exchange: 应与您的主队列路由键相同
x-dead-letter-routing-key: 应该和你的主队列路由键相同
x-message-ttl:重试之间的延迟
代码为 sudo 代码,请勿复制粘贴,这只是给您一个提示
$maximumRetry = 5;
$callback = function($message) {
$body = json_decode($message->body);
try {
// process result is your condition
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} catch(Exception $e) {
// return to the queue
$body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1
if ($body['try_attempt'] >= $maximumRetry ){
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
return
}
$msg = new AMQPMessage(json_encode($message));
$channel->basic_publish($msg, '', 'test-retry');
}
};
我们需要 3 个队列才能重新绑定。
queue.example
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example
- 特点:
- x-死信交换:queue.exchange
- x-dead-letter-routing-key: queue.example-dlq
- 绑定:
queue.example-dlq
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example-dlq
- 绑定:
queue.example-重试
- 绑定:
- 交易所:queue.exchange
- 路由:queue.example-重试
- 特点:
- x-死信交换:queue.exchange
- x-dead-letter-routing-key:queue.example-已添加
- x-消息-ttl: 10000
- 绑定:
------------更新------------
Quorum 队列提供开箱即用的功能,因此在消费者中,您可以了解每条消息被重试的次数,您还可以轻松地为其定义死信队列,有关更多信息,您可以阅读更多关于 quorom queues and poison message handling
这可能有点晚了,但这是你应该如何使用这个版本的 php-amqplib "php-amqplib/php-amqplib": "^3.1"
你需要将基本消费方法的 no_ack 参数设置为false(这是默认值)然后在传递给回调的 AMQPMessage 对象上使用方法 nack 在回调中明确指定它
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();
$channel->queue_declare('test', false, false, false, false);
$callback = function($msg) {
$condition = json_decode($msg->body);
if (!$condition) {
// message will be added back to the queue
$msg->nack(true);
}
};
$channel->basic_consume('test', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>