PHP 异步多线程 Curl 应用程序
PHP Async Multi-threaded Curl Application
我希望构建一个 process/script 每秒至少可以处理 300-400 个事务的系统。
目前我正在使用 Workerman 来完成以下工作。我可以 运行 没有任何问题地处理 400 个线程,但 tps 大约是 60-70 tps,它可以处理延迟不到一秒。
下面是工作代码:-
main.php:-
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
$http_worker = new Worker('http://0.0.0.0:2345');
$http_worker->count = 400;
$http_worker->onMessage = function ($connection, $request) {
//Config
$connection->send("");
$url = 'http://localhost:3000';
$packageid=11;
$payload = $request->post();
$temp_payload = implode("|",$payload);
list($id,$user,$package_id,$timestamp) = explode('|',$temp_payload);
$date_request_first=date('Y-m-d H:i:s');// PHP Worker current casting timestamp
$date_compare1= date("Y-m-d h:i:s a", strtotime($date_request_first));
$xml_get_subscriber='<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" ><SOAP-ENV:Body>
<ns2:root>
<msg_head>
<time>2020-08-20 17:57:29</time>
<from />
<to />
<msg_type />
<serial />
</msg_head>
<interface_msg>
<msg_response>
<ResponseClass Name="Response">
<GetUserClass Name="AJAX">
<ResultCode>0</ResultCode>
<ResultDescr>success</ResultDescr>
<IBAN>'.$id.'</IBAN>
<PREFERNOTIFYMETHOD>1</PREFERNOTIFYMETHOD>
</GetUserClass>
</ResponseClass>
</msg_response>
</interface_msg>
</ns2:root>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>';
$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$provisioning_recipe));
//$doc->loadXML($result_provisioning);
$xpath = new DOMXPath($doc);
foreach ($xpath->query("//ResultCode/text()") as $package) {
$resultCode = $package->textContent;
}
if($resultCode == 0 && $resultDesc ="success"){
$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$xml_get_subscriber_info));
$xpath = new DOMXPath($doc);
foreach ($xpath->query("//PACKAGEID/text()") as $match1) {
$PAK_checking = $match1->textContent;
}
$myArrayPak = explode('$', $PAK_checking);
$key_value = array_search($packageid,$myArrayPak);
if($key_value)
{
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.0.16:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("successful-request");
$produce_date =date('Y-m-d H:i:s');
$ar=date_create($myArrayEndDate[$key_value]);
$final_date = date_format($ar,"Y-m-d H:i:s");
$toStore="$id;$msisdn;$package_id;1;$final_date";
echo "PackageID = $myArrayPak[$key_value] , End-Date = $final_date\n";
$topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
}
function getUrlContent($url,$uid,$msisdn,$pckg,$xml_get_subscriber_info){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_POSTFIELDS, $xml_get_subscriber_info);
$data = curl_exec($ch);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
//return ($httpcode>=200 && $httpcode<300) ? $data : false;
if($httpcode!= 200)
{
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.0.17:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("failed-request");
$produce_date =date('Y-m-d H:i:s');
$toStore="$uid;$msisdn;$pckg;$produce_date";
$topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
else
{
return $data;
}
}
}
// run all workers
Worker::runAll();
?>
现在,当我将线程增加到 800 时,问题开始了:-
Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
PHP Notice: Undefined variable: PAK in test.php.php on line 77
Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
PHP Notice: Undefined variable: PAK_checking in test.php.php on line 197
Notice: Undefined variable: PAK_checking in test.php.php on line 197
PHP Notice: Undefined variable: psetdatelist in test.php.php on line 198
Notice: Undefined variable: psetdatelist in test.php.php on line 198
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
我检查了一下,当我增加工作人员数量时,CURL XML 似乎是空的。
但是当它是 400 http_worker 时就完全没有问题了。
我 运行 在 VM 中使用 8 核 CPU 和 16GB RAM。
我的目标是处理来自北向的尽可能多的请求,并检查最低 300 TPS 的南向。
流程如下:-
Client -> Main.php -> 向南向查询 -> Produce to Kafka Topic
问题:当我从 400 个线程增加到 800 个线程时,我面临的问题是,我的 curl 响应始终为空。
有什么方法可以让我至少发送 300 tps 并且 XML 毫无问题地解析它?或者有什么改进流程的建议吗?
我发现 OS Unix 在处理超过 200-300 tps 的多个请求时有限制。
我找到了另一个使用 guzzleHTTP 的替代方法,这对我很有帮助。
它帮助我实现了 600 tps +,交易延迟为 2 秒。
我希望构建一个 process/script 每秒至少可以处理 300-400 个事务的系统。 目前我正在使用 Workerman 来完成以下工作。我可以 运行 没有任何问题地处理 400 个线程,但 tps 大约是 60-70 tps,它可以处理延迟不到一秒。
下面是工作代码:-
main.php:-
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
$http_worker = new Worker('http://0.0.0.0:2345');
$http_worker->count = 400;
$http_worker->onMessage = function ($connection, $request) {
//Config
$connection->send("");
$url = 'http://localhost:3000';
$packageid=11;
$payload = $request->post();
$temp_payload = implode("|",$payload);
list($id,$user,$package_id,$timestamp) = explode('|',$temp_payload);
$date_request_first=date('Y-m-d H:i:s');// PHP Worker current casting timestamp
$date_compare1= date("Y-m-d h:i:s a", strtotime($date_request_first));
$xml_get_subscriber='<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" ><SOAP-ENV:Body>
<ns2:root>
<msg_head>
<time>2020-08-20 17:57:29</time>
<from />
<to />
<msg_type />
<serial />
</msg_head>
<interface_msg>
<msg_response>
<ResponseClass Name="Response">
<GetUserClass Name="AJAX">
<ResultCode>0</ResultCode>
<ResultDescr>success</ResultDescr>
<IBAN>'.$id.'</IBAN>
<PREFERNOTIFYMETHOD>1</PREFERNOTIFYMETHOD>
</GetUserClass>
</ResponseClass>
</msg_response>
</interface_msg>
</ns2:root>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>';
$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$provisioning_recipe));
//$doc->loadXML($result_provisioning);
$xpath = new DOMXPath($doc);
foreach ($xpath->query("//ResultCode/text()") as $package) {
$resultCode = $package->textContent;
}
if($resultCode == 0 && $resultDesc ="success"){
$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$xml_get_subscriber_info));
$xpath = new DOMXPath($doc);
foreach ($xpath->query("//PACKAGEID/text()") as $match1) {
$PAK_checking = $match1->textContent;
}
$myArrayPak = explode('$', $PAK_checking);
$key_value = array_search($packageid,$myArrayPak);
if($key_value)
{
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.0.16:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("successful-request");
$produce_date =date('Y-m-d H:i:s');
$ar=date_create($myArrayEndDate[$key_value]);
$final_date = date_format($ar,"Y-m-d H:i:s");
$toStore="$id;$msisdn;$package_id;1;$final_date";
echo "PackageID = $myArrayPak[$key_value] , End-Date = $final_date\n";
$topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
}
function getUrlContent($url,$uid,$msisdn,$pckg,$xml_get_subscriber_info){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_POSTFIELDS, $xml_get_subscriber_info);
$data = curl_exec($ch);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
//return ($httpcode>=200 && $httpcode<300) ? $data : false;
if($httpcode!= 200)
{
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', '192.168.0.17:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("failed-request");
$produce_date =date('Y-m-d H:i:s');
$toStore="$uid;$msisdn;$pckg;$produce_date";
$topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
else
{
return $data;
}
}
}
// run all workers
Worker::runAll();
?>
现在,当我将线程增加到 800 时,问题开始了:-
Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
PHP Notice: Undefined variable: PAK in test.php.php on line 77
Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
PHP Notice: Undefined variable: PAK_checking in test.php.php on line 197
Notice: Undefined variable: PAK_checking in test.php.php on line 197
PHP Notice: Undefined variable: psetdatelist in test.php.php on line 198
Notice: Undefined variable: psetdatelist in test.php.php on line 198
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice: Undefined variable: resultCode in test.php.php on line 182
Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
我检查了一下,当我增加工作人员数量时,CURL XML 似乎是空的。 但是当它是 400 http_worker 时就完全没有问题了。
我 运行 在 VM 中使用 8 核 CPU 和 16GB RAM。 我的目标是处理来自北向的尽可能多的请求,并检查最低 300 TPS 的南向。
流程如下:-
Client -> Main.php -> 向南向查询 -> Produce to Kafka Topic
问题:当我从 400 个线程增加到 800 个线程时,我面临的问题是,我的 curl 响应始终为空。
有什么方法可以让我至少发送 300 tps 并且 XML 毫无问题地解析它?或者有什么改进流程的建议吗?
我发现 OS Unix 在处理超过 200-300 tps 的多个请求时有限制。
我找到了另一个使用 guzzleHTTP 的替代方法,这对我很有帮助。 它帮助我实现了 600 tps +,交易延迟为 2 秒。