ZeroMQ worker 的任务进度

Task progress from a ZeroMQ worker

ZeroMQ 相当新。我有一个简单的 REQ/REP 队列,如下所示。我正在使用 PHP 但这并不重要,因为任何语言绑定对我来说都很好。 这是客户端请求任务

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;

而这是一个真正执行任务的工人。

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    // Do the work here, takes 10 min, knows the count of lines added and remaining
    $srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}

由于导出数据的任务大约需要 10 分钟,我想从不同的客户端连接到服务器并获取任务完成的进度/百分比。 我想知道这是否是一种有效的方法。

我尝试了这种方法,其中 REQ/REP 部分有效,但我在 PUB/SUB 部分

中一无所获

服务器部分

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");

// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");

echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
    $p->send($prog++ . '%'); // this part doesn't get to the progress client
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    sleep(2);// some long task
    $srvr->send($msg . " Done zipping " . date('H:i:s'));
}

进步客户

$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
    echo $stat->recv() . PHP_EOL; //nothing shows here
}

请求客户

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
    $req->send("$i : Zip the file please");
    echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}

这个概念是可行的,需要一些调整:

所有 PUB 交易方必须通过至少一个空订阅设置任何非默认订阅 .setsockopt( ZMQ_SUBSCRIBE, "" ) 意味着接收所有主题(none "filter"-编辑出来了)。

接下来,PUB 端和 SUB 端都应该配置 .setsockopt( ZMQ_CONFLATE, 1 ),因为没有任何值可以填充并将所有中间值馈送到 en-queue/de-queue 管道,一旦唯一的值是在 "last" 最近的消息中。

总是,应该首选 ZeroMQ 调用的非阻塞模式(.recv( ..., flags = ZMQ_NOBLOCK ) 等),或者应该使用 Poller.poll() 预测试首先嗅探(非)-在花费更多精力阅读其上下文 "from" ZeroMQ 上下文管理器之前,存在一条消息。简单地说,阻塞模式服务调用在生产级系统中可以很好地服务的情况并不多。

另外一些进一步的调整可能会对 PUB 端有所帮助,以防更大的 "attack" 来自不受限制的 SUB 端实体池,并且 PUB 必须为每个实体创建/管理/维护资源这些(不受限制的)交易对手。

如果有多个客户想要接收相同的进度更新,您只需要使用 PUB/SUB。只需使用 PUSH/PULL 即可通过 tcp 进行简单的点对点传输。

哲学讨论

解决此类问题有两种方法。

  1. 使用额外的套接字来传送额外的消息类型,
  2. 只使用两个套接字,但通过它们传送不止一种消息类型

你说的是做 1)。可能值得考虑 2),但我必须强调我对 PHP 几乎一无所知,因此不知道是否存在鼓励一个人拥有单独的请求和进度客户端的语言功能。

如果这样做,您的原始客户端需要一个循环(在它发送请求之后)来接收多条消息,进度更新消息或最终结果。您的服务器在进行 10 分钟的查找时,将定期发送进度更新消息,并在最后发送最终结果消息。您可能会使用 PUSH/PULL 客户端到服务器,对于从服务器返回客户端的进度/结果再次使用相同的方法。

遵循 2) 在架构上更灵活。一旦可以通过单个套接字发送两种或多种消息类型并在接收端对它们进行解码,就可以发送更多消息。例如,您可以决定将 'cancel' 消息从客户端添加到服务器,或者将部分结果消息从服务器返回到客户端。这比仅仅因为您想在客户端和服务器之间添加另一个消息流而继续向您的体系结构添加更多套接字要容易得多。同样,我对 PHP 的了解还不够多,无法说这绝对是使用该语言的正确方法。它在 C、C++ 中当然很有意义。

我发现 Google 协议缓冲区(我更喜欢 ASN.1)之类的东西对这类事情非常有用。这些允许您定义要发送的消息类型,并且(至少使用 GPB)将它们组合在一个 'oneof' 中(在 ASN.1 中,人们使用标记来区分不同的消息)。 GPB 和 ASN.1 很方便,因为这样您就可以在系统中使用不同的语言、操作系统和平台,而不必真正担心发送的是什么。而且是二进制的(不是文本),它们在网络连接中效率更高。