Activemq 和 Php Stomp:同步生产者示例
Activemq and Php Stomp: synchronous producer sample
我正在尝试让这个原则起作用:
- 发送一个消息(1)并等待包含一些结果(实际上是操作的json结果)的ack的生产者
- 消费者每 5 秒检查所有未决消息,并在一行中处理所有消息,并在一行中确认所有消息,然后再次等待 5 秒(无限循环)。
这是我的 stompproducer.php
的 30 行:
<?php
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
$queue = '/aaaa';
$msg = 'bar';
if (count($argv)<3) {
echo $argv[0]." [msg] [nb to send]\n";
exit(1);
}
$msg = (string)$argv[1];
$to_send = intval($argv[2]);
try {
$stomp = new Stomp('tcp://localhost:61613');
while (--$to_send) {
msg("Sending...");
$result = $stomp->send(
$queue,
$msg." ". date("Y-m-d H:i:s"),
array('receipt' => 'message-123')
);
echo 'result='.var_export($result,true)."\n";
msg("Done.");
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
这是我的 stompconsumer.php
的 30 行:
<?php
$queue = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
try {
$stomp = new Stomp('tcp://localhost:61613');
$stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
$stomp->setReadTimeout(0, 10000);
while (true) {
$frames_read=array();
while ($stomp->hasFrame()) {
$frame = $stomp->readFrame();
if ($frame != null) {
array_push($frames_read, $frame);
}
if (count($frames_read)==40) {
break;
}
}
msg("Nombre de frames lues : ".count($frames_read));
msg("Pause...");
$e=$_waitTimer-(microtime(true)-$_timeLastAsk);
if ($e>0) {
usleep($e);
}
if (count($frames_read)>0) {
msg("Ack now...");
foreach ($frames_read as $frame) {
$stomp->ack($frame);
}
}
$_timeLastAsk = microtime(true);
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
我无法做到同步生产者,即等待消费者确认的生产者。如果你运行我在这里完成的示例,你会看到生产者 立即 发送所有消息,然后退出,所有 "true" 像 "ok" 结果当呼叫 $stomp->send()
。
我仍然没有找到好的示例,也没有找到带有简单阻塞示例的好的文档。
我应该怎么做才能让我的生产者阻塞,直到消费者发送它的确认?
注意:我已阅读所有文档 here and the stomp php questions on Whosebug here and here。
从这个问题来看,您似乎在寻找请求/响应类型的消息传递模式。这是你必须自己实现的东西,因为你引用的 STOMP ack 只是代表消费者向消息代理确认消息,生产者对此一无所知。请求响应涉及在出站消息上设置回复地址,然后在发送下一条消息之前等待收到对该地址的响应。有很多文章记录了这种事情,例如 one。
或者,如果您只需要知道代理是否已收到来自客户端的消息并将其持久化,那么您可以使用 STOMP 内置的 receipt 机制让代理向您发送收据,表明它已经处理了您发送的消息。然而,这并不能保证消费者已经处理了消息。
我首先想到的是:看看这个 stomp 插件:
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
我能想到的另一个解决方法是:
在生产者方面:
1. 更改您的生产者以发送持久消息
在您的消费者方面:
使用定时器。
1. 阅读 message/frames 直到空或达到最大上限。
2. 创建一个 CURL 请求和清空消息打包列表
3. 让你的服务器休眠 5 秒
您肯定需要进一步测试,但应该可以。进程唤醒后,您应该能够读取所有排队的消息。
需要考虑的事项:
- 持久消息需要过期时间
- 您需要在您的消费者端确认更新已参与消息的状态。使用 ACK=client 这样您就可以确认所有已确认的消息
- 如果您不必等待 CURL 响应,就会更容易。
- 开箱即用,不支持从消费者(服务器端)发送 ACK。
祝你好运
我刚想起来,你可以试试reactphp/stomp库。
这是一个事件驱动的库,可能会对您有所帮助。特地看看广告的核心功能addPeriodicTimer
https://github.com/reactphp/stomp
干杯
我正在尝试让这个原则起作用:
- 发送一个消息(1)并等待包含一些结果(实际上是操作的json结果)的ack的生产者
- 消费者每 5 秒检查所有未决消息,并在一行中处理所有消息,并在一行中确认所有消息,然后再次等待 5 秒(无限循环)。
这是我的 stompproducer.php
的 30 行:
<?php
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
$queue = '/aaaa';
$msg = 'bar';
if (count($argv)<3) {
echo $argv[0]." [msg] [nb to send]\n";
exit(1);
}
$msg = (string)$argv[1];
$to_send = intval($argv[2]);
try {
$stomp = new Stomp('tcp://localhost:61613');
while (--$to_send) {
msg("Sending...");
$result = $stomp->send(
$queue,
$msg." ". date("Y-m-d H:i:s"),
array('receipt' => 'message-123')
);
echo 'result='.var_export($result,true)."\n";
msg("Done.");
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
这是我的 stompconsumer.php
的 30 行:
<?php
$queue = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);
function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}
try {
$stomp = new Stomp('tcp://localhost:61613');
$stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
$stomp->setReadTimeout(0, 10000);
while (true) {
$frames_read=array();
while ($stomp->hasFrame()) {
$frame = $stomp->readFrame();
if ($frame != null) {
array_push($frames_read, $frame);
}
if (count($frames_read)==40) {
break;
}
}
msg("Nombre de frames lues : ".count($frames_read));
msg("Pause...");
$e=$_waitTimer-(microtime(true)-$_timeLastAsk);
if ($e>0) {
usleep($e);
}
if (count($frames_read)>0) {
msg("Ack now...");
foreach ($frames_read as $frame) {
$stomp->ack($frame);
}
}
$_timeLastAsk = microtime(true);
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
我无法做到同步生产者,即等待消费者确认的生产者。如果你运行我在这里完成的示例,你会看到生产者 立即 发送所有消息,然后退出,所有 "true" 像 "ok" 结果当呼叫 $stomp->send()
。
我仍然没有找到好的示例,也没有找到带有简单阻塞示例的好的文档。
我应该怎么做才能让我的生产者阻塞,直到消费者发送它的确认?
注意:我已阅读所有文档 here and the stomp php questions on Whosebug here and here。
从这个问题来看,您似乎在寻找请求/响应类型的消息传递模式。这是你必须自己实现的东西,因为你引用的 STOMP ack 只是代表消费者向消息代理确认消息,生产者对此一无所知。请求响应涉及在出站消息上设置回复地址,然后在发送下一条消息之前等待收到对该地址的响应。有很多文章记录了这种事情,例如 one。
或者,如果您只需要知道代理是否已收到来自客户端的消息并将其持久化,那么您可以使用 STOMP 内置的 receipt 机制让代理向您发送收据,表明它已经处理了您发送的消息。然而,这并不能保证消费者已经处理了消息。
我首先想到的是:看看这个 stomp 插件:
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
我能想到的另一个解决方法是: 在生产者方面: 1. 更改您的生产者以发送持久消息
在您的消费者方面: 使用定时器。 1. 阅读 message/frames 直到空或达到最大上限。 2. 创建一个 CURL 请求和清空消息打包列表 3. 让你的服务器休眠 5 秒
您肯定需要进一步测试,但应该可以。进程唤醒后,您应该能够读取所有排队的消息。
需要考虑的事项: - 持久消息需要过期时间 - 您需要在您的消费者端确认更新已参与消息的状态。使用 ACK=client 这样您就可以确认所有已确认的消息 - 如果您不必等待 CURL 响应,就会更容易。 - 开箱即用,不支持从消费者(服务器端)发送 ACK。
祝你好运
我刚想起来,你可以试试reactphp/stomp库。 这是一个事件驱动的库,可能会对您有所帮助。特地看看广告的核心功能addPeriodicTimer
https://github.com/reactphp/stomp
干杯