如何将 ZeroMQ 套接字与 Ratchet web-socket 库绑定,以便为 php 应用程序实时应用?
How to bind ZeroMQ socket with Ratchet web-socket library to make real time application for php application?
我只是这个涉及 websocket、Ratchet 和 ZeroMQ 的整个领域的初学者。
我的基本理解:
websocket
有助于在服务器和客户端之间创建开放连接。
Ratchet
是一个基于 PHP 的库,它使用 PHP 的核心套接字函数来创建一个 PHP 套接字框架,使我们能够轻松地在 PHP 套接字编程。
ZeroMQ
是一个套接字库,可帮助非棘轮应用程序(其他 PHP 脚本)通过棘轮套接字和网络套接字发送数据。
我正在按照棘轮中关于 'hello world' 和 'pusher' 的教程进行操作,但它们似乎都不完整,并且只教授如何仅使用控制台。我还在 github 中找到了棘轮示例,但没有正确记录。我正在寻找一个完整的示例(带有专用 html 页面和 javascript)
下面是我正在处理的代码:这是我提出 Ajax 请求的控制器方法之一。此方法将创建一个新的 post(比方说)。我想在 ZeroMq 的帮助下通过 broadcasting/pushing 在多个客户端的浏览器中动态更新 post 的列表。
控制器中的一个方法:
public function create_new_post(){
// ------
// code to create a new post.
// -------
// After creating a post
$response = [
'new_post_title' => $title,
'post_id' => $id
];
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:8000");
$socket->send(json_encode($response));
}
推送文件:
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
class Pusher implements WampServerInterface{
public function onPostEntry($data){
// Data that were sent by ZeroMQ through create_new_post() method
$entry_data = json_decode($data);
// AND AFTER THIS, I DONT HAVE CLUE OF WHAT TO DO NEXT !!
}
}
Shell 到 运行 服务器的脚本:
require dirname(__DIR__) . '/vendor/autoload.php';
$loop = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8000');
$pull->on('message', array($pusher, 'onBidEntry'));
// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0');
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
)
),
$webSock
);
$loop->run();
Shell 脚本仅说明它将在端口 8080 中提供服务,但是我将如何提及我的路线。假设我只希望在页面 'mysite/allposts' 中打开连接。另外,我必须在客户端(javascript 文件)编写的脚本是什么,以及如何通过客户端更新特定的 DOM 对象来接收这些新数据。
我按照你说的例子做了。他们对我来说似乎并不完整,但我明白你的意思。 Ratchet 是一个服务器端脚本,只允许您编写一个实现 websockets 并且能够监听 ZMQ 消息的服务。您将在命令行上启动 Ratchet 脚本,它作为与 Apache 并行的服务运行。
这完全独立于 websocket 的客户端。就像他们推荐的那样,我在客户端使用了 Autobahn.js。这个库实现了 WAMP 协议。它最大限度地简化了客户端代码。
您的代码存在问题,class Pusher implements WampServerInterface
没有 public function onPostEntry
。这个 class 必须实现 WampServerInterface
,这意味着它必须至少具有这些功能:
- onSubscribe(ConnectionInterface $conn, $topic)
- onUnSubscribe(ConnectionInterface $conn, $topic)
- onOpen(ConnectionInterface $conn)
- onClose(ConnectionInterface $conn)
- onPublish(ConnectionInterface $conn, $topic, $event, 数组 $exclude, 数组 $eligible
- onError(ConnectionInterface $conn, \Exception $e)
- onZMQMessage($jsondata)
可以有其他更高级的功能,例如 call
在客户端上执行远程程序。
在发件人端(ZMQ 消息),输入此代码:
$zmq = new ZMQWrapper;
$zqm->publish('posts', $response);
class ZMQWrapper {
function __construct(){
$this->context = new ZMQContext();
$this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH);
$this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500);
$this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT);
}
function publish($topic, $msg){
$data = ['topic' => "mb.$topic", 'msg' => $msg];
$this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT);
}
}
在 pusher 文件中放入如下内容:
public function onSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client subscribed to %s', $topicId));
// you could broadcast that user x joined the discussion
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client unsubscribed from %s', $topicId));
// you could broadcast that user x leaved the discussion
}
public function onOpen(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d connected', $conn->resourceId));
$this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe
// clients will contain the list of all clients
}
public function onClose(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d disconnected', $conn->resourceId));
// you could broadcast that user x leaved the discussion
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event)));
foreach($topic->getIterator() as $peer){
if(!in_array($peer->WAMP->sessionId, $exclude)){
$peer->event($topicId, $event);
}
}
}
最后一块在客户端。如果用户打开页面 mysite/allposts
,则在 javascript 中包含 autobahn.js
。 websocket 将在变量 ab
下可用。然后你做:
打开页面时:
var currentSession;
ab.connect(
Paths.ws,
function(session) { // onconnect
currentSession = session
onWsConnect(session)
},
function(code, reason, detail) {// onhangup
onWsDisconnect(code, reason, detail)
},{
maxRetries: 60,
retryDelay: 2000,
skipSubprotocolCheck: true
}
)
currentSession.subscribe('posts', onPostReceived)
function onPostReceived(topic, message){
//display the new post
}
关闭页面时:
currentSession.unsubscribe(topic)
你注意到我把一切都保持得非常笼统。这使我可以让同一个系统处理多种类型的消息。不同的是 ZMQ 消息和 currentSession.subscribe
.
的参数
在我的实现中,我还跟踪打开连接的登录用户,但我删除了这部分代码。
希望对您有所帮助。
我只是这个涉及 websocket、Ratchet 和 ZeroMQ 的整个领域的初学者。
我的基本理解:
websocket
有助于在服务器和客户端之间创建开放连接。
Ratchet
是一个基于 PHP 的库,它使用 PHP 的核心套接字函数来创建一个 PHP 套接字框架,使我们能够轻松地在 PHP 套接字编程。
ZeroMQ
是一个套接字库,可帮助非棘轮应用程序(其他 PHP 脚本)通过棘轮套接字和网络套接字发送数据。
我正在按照棘轮中关于 'hello world' 和 'pusher' 的教程进行操作,但它们似乎都不完整,并且只教授如何仅使用控制台。我还在 github 中找到了棘轮示例,但没有正确记录。我正在寻找一个完整的示例(带有专用 html 页面和 javascript)
下面是我正在处理的代码:这是我提出 Ajax 请求的控制器方法之一。此方法将创建一个新的 post(比方说)。我想在 ZeroMq 的帮助下通过 broadcasting/pushing 在多个客户端的浏览器中动态更新 post 的列表。
控制器中的一个方法:
public function create_new_post(){
// ------
// code to create a new post.
// -------
// After creating a post
$response = [
'new_post_title' => $title,
'post_id' => $id
];
$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:8000");
$socket->send(json_encode($response));
}
推送文件:
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
class Pusher implements WampServerInterface{
public function onPostEntry($data){
// Data that were sent by ZeroMQ through create_new_post() method
$entry_data = json_decode($data);
// AND AFTER THIS, I DONT HAVE CLUE OF WHAT TO DO NEXT !!
}
}
Shell 到 运行 服务器的脚本:
require dirname(__DIR__) . '/vendor/autoload.php';
$loop = React\EventLoop\Factory::create();
$pusher = new MyApp\Pusher;
// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8000');
$pull->on('message', array($pusher, 'onBidEntry'));
// Set up our WebSocket server for clients wanting real-time updates
$webSock = new React\Socket\Server($loop);
$webSock->listen(8080, '0.0.0.0');
$webServer = new Ratchet\Server\IoServer(
new Ratchet\Http\HttpServer(
new Ratchet\WebSocket\WsServer(
new Ratchet\Wamp\WampServer(
$pusher
)
)
),
$webSock
);
$loop->run();
Shell 脚本仅说明它将在端口 8080 中提供服务,但是我将如何提及我的路线。假设我只希望在页面 'mysite/allposts' 中打开连接。另外,我必须在客户端(javascript 文件)编写的脚本是什么,以及如何通过客户端更新特定的 DOM 对象来接收这些新数据。
我按照你说的例子做了。他们对我来说似乎并不完整,但我明白你的意思。 Ratchet 是一个服务器端脚本,只允许您编写一个实现 websockets 并且能够监听 ZMQ 消息的服务。您将在命令行上启动 Ratchet 脚本,它作为与 Apache 并行的服务运行。
这完全独立于 websocket 的客户端。就像他们推荐的那样,我在客户端使用了 Autobahn.js。这个库实现了 WAMP 协议。它最大限度地简化了客户端代码。
您的代码存在问题,class Pusher implements WampServerInterface
没有 public function onPostEntry
。这个 class 必须实现 WampServerInterface
,这意味着它必须至少具有这些功能:
- onSubscribe(ConnectionInterface $conn, $topic)
- onUnSubscribe(ConnectionInterface $conn, $topic)
- onOpen(ConnectionInterface $conn)
- onClose(ConnectionInterface $conn)
- onPublish(ConnectionInterface $conn, $topic, $event, 数组 $exclude, 数组 $eligible
- onError(ConnectionInterface $conn, \Exception $e)
- onZMQMessage($jsondata)
可以有其他更高级的功能,例如 call
在客户端上执行远程程序。
在发件人端(ZMQ 消息),输入此代码:
$zmq = new ZMQWrapper;
$zqm->publish('posts', $response);
class ZMQWrapper {
function __construct(){
$this->context = new ZMQContext();
$this->socket = $this->context->getSocket(ZMQ::SOCKET_PUSH);
$this->socket->setSockOpt(ZMQ::SOCKOPT_LINGER, 500);
$this->socket->connect("tcp://127.0.0.1:" . ZMQ_PORT);
}
function publish($topic, $msg){
$data = ['topic' => "mb.$topic", 'msg' => $msg];
$this->socket->send(json_encode($data), ZMQ::MODE_DONTWAIT);
}
}
在 pusher 文件中放入如下内容:
public function onSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client subscribed to %s', $topicId));
// you could broadcast that user x joined the discussion
}
public function onUnSubscribe(ConnectionInterface $conn, $topic) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('A client unsubscribed from %s', $topicId));
// you could broadcast that user x leaved the discussion
}
public function onOpen(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d connected', $conn->resourceId));
$this->clients[$conn->resourceId] = array(); // this will allow you to save state information of the client, you can modify in onSubscribe and onUnsubscribe
// clients will contain the list of all clients
}
public function onClose(ConnectionInterface $conn) {
$log = $this->getLogger();
$log->info(sprintf('Client %d disconnected', $conn->resourceId));
// you could broadcast that user x leaved the discussion
}
public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
$log = $this->getLogger();
$topicId = $topic->getId();
$log->info(sprintf('Client %d published to %s : %s', $conn->resourceId, $topicId, json_encode($event)));
foreach($topic->getIterator() as $peer){
if(!in_array($peer->WAMP->sessionId, $exclude)){
$peer->event($topicId, $event);
}
}
}
最后一块在客户端。如果用户打开页面 mysite/allposts
,则在 javascript 中包含 autobahn.js
。 websocket 将在变量 ab
下可用。然后你做:
打开页面时:
var currentSession;
ab.connect(
Paths.ws,
function(session) { // onconnect
currentSession = session
onWsConnect(session)
},
function(code, reason, detail) {// onhangup
onWsDisconnect(code, reason, detail)
},{
maxRetries: 60,
retryDelay: 2000,
skipSubprotocolCheck: true
}
)
currentSession.subscribe('posts', onPostReceived)
function onPostReceived(topic, message){
//display the new post
}
关闭页面时:
currentSession.unsubscribe(topic)
你注意到我把一切都保持得非常笼统。这使我可以让同一个系统处理多种类型的消息。不同的是 ZMQ 消息和 currentSession.subscribe
.
在我的实现中,我还跟踪打开连接的登录用户,但我删除了这部分代码。
希望对您有所帮助。