在 PHPRatchet 中接收 RabbitMQ 消息

Receiving RabbitMQ messages in PHPRatchet

我正在尝试实现 PHP 将消息推送到 RabbitMQ 的机制(我不希望 RabbitMQ 直接暴露给用户),RabbitMQ 连接到 Ratchet PHP 并且 Ratchet 通过 websocket 连接向用户广播它。

我遇到的问题 是让 Ratchet 服务器同时侦听队列消息并进一步传输它们。 Ratchet 文档假定使用 ZeroMQ,并且在对不再有此类方法的过时文档和库进行长时间搜索后(例如 React\Stomp),我需要有这些解决方案经验的人的新视角。

我所拥有的是 pusher.php(来自 RabbitMQ 文档的标准示例):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

为了简化复制场景,我还包括 Chat class:

use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;

class Chat implements MessageComponentInterface
{
    protected $clients;

    public function __construct()
    {
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $connection)
    {
        // Store the new connection to send messages to later
        $this->clients->attach($connection);

        echo "New connection! ({$connection->resourceId})\n";
    }

    public function onMessage(ConnectionInterface $from, $msg)
    {
        $numRecv = count($this->clients) - 1;
        echo sprintf('Connection %d sending message "%s" to %d other connection%s'."\n"
            , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');

        foreach($this->clients as $client)
        {
            /** @var \SplObjectStorage $client */
            if($from !== $client)
            {
                // The sender is not the receiver, send to each client connected
                $client->send($msg);
            }
        }
    }

    public function onClose(ConnectionInterface $conn)
    {
        // The connection is closed, remove it, as we can no longer send it messages
        $this->clients->detach($conn);

        echo "Connection {$conn->resourceId} has disconnected\n";
    }

    public function onError(ConnectionInterface $conn, \Exception $e)
    {
        echo "An error has occurred: {$e->getMessage()}\n";

        $conn->close();
    }
}

和 Ratchet server.php(标准 Ratchet 示例和 RabbitMQ 接收器示例):

use PhpAmqpLib\Connection\AMQPStreamConnection;
use Src\Chat;

// RABBIT_RECEIVER
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg)
{
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks))
{
    $channel->wait();
}

$channel->close();
$connection->close();
// RABBIT_RECEIVER END

$server = new \Ratchet\App('sockets.dev');
$server->route('/', new Chat());

$server->run();

当前版本基本上是 2 个独立的消息侦听机制,它们单独工作得很好(所以没有问题),除了它们相互阻塞并且不在它们之间传输消息。

问题是如何让server.php让RabbitMQ接收消息并将其插入运行棘轮服务器。

我想通了所以为后代添加答案。解决方案不是完全实时的,但它非常接近,似乎具有良好的性能并且对于 Ratchet websocket 服务器是非阻塞的。解决方案是 Ratchet 自定义 loopaddPeriodicTimer 方法。

所以 server.php 的代码应该是这样的:

$loop = React\EventLoop\Factory::create();
$chat = new Chat($loop);

$server = new \Ratchet\App('sockets.dev', 8080, '127.0.0.1', $loop);
$server->route('/', $chat);

$server->run();

Chat.php class(只有__constructor 因为其余的是一样的):

public function __construct(LoopInterface $loop)
{
    $this->loop = $loop;
    $this->clients = new \SplObjectStorage();

    $this->loop->addPeriodicTimer(0, function ()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

        $channel = $connection->channel();
        $channel->queue_declare('hello', false, false, false, false);
        echo ' [*] Checking for for messages from RabbitMQ', "\n";

        $max_number_messages_to_fetch_per_batch = 10000;
        do
        {
            $message = $channel->basic_get('hello', true);
            if($message)
            {
                foreach($this->clients as $client)
                {
                    $client->send($message->body);
                }

                $max_number_messages_to_fetch_per_batch--;
            }
        }
        while($message && $max_number_messages_to_fetch_per_batch > 0);

        $channel->close();
        $connection->close();
    });

}

启动 Ratchet 服务器将附加周期性事件(例如每 0 秒一次),这将检查 RabbitMQ 是否有新消息并处理它们。

为了更好的性能控制,允许 websocket 喘口气,一批处理的来自 RabbitMQ 的消息数量限制为 10k。已处理的消息从队列中删除,下一批在下一次迭代中处理。

您还可以使用 addPeriodicTimer 间隔参数微调更新频率。 0 秒是您将获得的最接近实时的时间,但您可能不需要它,可以将其更改为更高的值。