PHP 异步多线程 Curl 应用程序

PHP Async Multi-threaded Curl Application

我希望构建一个 process/script 每秒至少可以处理 300-400 个事务的系统。 目前我正在使用 Workerman 来完成以下工作。我可以 运行 没有任何问题地处理 400 个线程,但 tps 大约是 60-70 tps,它可以处理延迟不到一秒。

下面是工作代码:-

main.php:-

<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;

$http_worker = new Worker('http://0.0.0.0:2345');

$http_worker->count = 400;
$http_worker->onMessage = function ($connection, $request) {
//Config
    $connection->send("");
    $url = 'http://localhost:3000';
    $packageid=11;
    $payload = $request->post();
    $temp_payload = implode("|",$payload);
    list($id,$user,$package_id,$timestamp) = explode('|',$temp_payload);

    $date_request_first=date('Y-m-d H:i:s');// PHP Worker current casting timestamp
    $date_compare1= date("Y-m-d h:i:s a", strtotime($date_request_first));
    
    $xml_get_subscriber='<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" ><SOAP-ENV:Body>
    <ns2:root>
        <msg_head>
            <time>2020-08-20 17:57:29</time>
            <from />
            <to />
            <msg_type />
            <serial />
        </msg_head>
        <interface_msg>
            <msg_response>
                <ResponseClass Name="Response">
                    <GetUserClass Name="AJAX">
                        <ResultCode>0</ResultCode>
                        <ResultDescr>success</ResultDescr>
                        <IBAN>'.$id.'</IBAN>
                        <PREFERNOTIFYMETHOD>1</PREFERNOTIFYMETHOD>
                    </GetUserClass>
                </ResponseClass>
            </msg_response>
        </interface_msg>
    </ns2:root>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>';

$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$provisioning_recipe));
//$doc->loadXML($result_provisioning);

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//ResultCode/text()")  as $package) {
    $resultCode = $package->textContent;
}
if($resultCode == 0 && $resultDesc ="success"){


$doc = new DOMDocument();
$doc->loadXML(getURLContent($url,$id,$msisdn,$package_id,$xml_get_subscriber_info));

$xpath = new DOMXPath($doc);

foreach ($xpath->query("//PACKAGEID/text()")  as $match1) {
    $PAK_checking = $match1->textContent;
}

$myArrayPak = explode('$', $PAK_checking);
$key_value = array_search($packageid,$myArrayPak);
        if($key_value)
        {
            
        $conf = new RdKafka\Conf();
        $conf->set('metadata.broker.list', '192.168.0.16:9092');
        
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("successful-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $ar=date_create($myArrayEndDate[$key_value]);
        $final_date = date_format($ar,"Y-m-d H:i:s");
        $toStore="$id;$msisdn;$package_id;1;$final_date";
        echo "PackageID = $myArrayPak[$key_value]  , End-Date = $final_date\n";
        $topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
        $producer->poll(0);
        
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }   


}
}


function getUrlContent($url,$uid,$msisdn,$pckg,$xml_get_subscriber_info){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
curl_setopt($ch, CURLOPT_TIMEOUT, 5);
curl_setopt($ch, CURLOPT_POSTFIELDS, $xml_get_subscriber_info);
$data = curl_exec($ch);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
//return ($httpcode>=200 && $httpcode<300) ? $data : false;
if($httpcode!= 200)
{
    $conf = new RdKafka\Conf();
        $conf->set('metadata.broker.list', '192.168.0.17:9092');
        
        $producer = new RdKafka\Producer($conf);
        $topic = $producer->newTopic("failed-request");
        $produce_date =date('Y-m-d H:i:s');
        
        $toStore="$uid;$msisdn;$pckg;$produce_date";
        $topic->produce(RD_KAFKA_PARTITION_UA,0, "$toStore");
        $producer->poll(0);
        
        for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
            $result = $producer->flush(10000);
            if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
                break;
            }
        }
        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }   
}
else
{
   return $data;
}

}



}


// run all workers
Worker::runAll();
?>

现在,当我将线程增加到 800 时,问题开始了:-

Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 69
PHP Notice:  Undefined variable: PAK in test.php.php on line 77

Notice: Undefined variable: PAK in test.php.php on line 77
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 185
PHP Notice:  Undefined variable: PAK_checking in test.php.php on line 197

Notice: Undefined variable: PAK_checking in test.php.php on line 197
PHP Notice:  Undefined variable: psetdatelist in test.php.php on line 198

Notice: Undefined variable: psetdatelist in test.php.php on line 198
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

Warning: DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166
PHP Notice:  Undefined variable: resultCode in test.php.php on line 182

Notice: Undefined variable: resultCode in test.php.php on line 182
PHP Warning:  DOMDocument::loadXML(): Empty string supplied as input in test.php.php on line 166

我检查了一下,当我增加工作人员数量时,CURL XML 似乎是空的。 但是当它是 400 http_worker 时就完全没有问题了。

我 运行 在 VM 中使用 8 核 CPU 和 16GB RAM。 我的目标是处理来自北向的尽可能多的请求,并检查最低 300 TPS 的南向。

流程如下:-

Client -> Main.php -> 向南向查询 -> Produce to Kafka Topic

问题:当我从 400 个线程增加到 800 个线程时,我面临的问题是,我的 curl 响应始终为空。

有什么方法可以让我至少发送 300 tps 并且 XML 毫无问题地解析它?或者有什么改进流程的建议吗?

我发现 OS Unix 在处理超过 200-300 tps 的多个请求时有限制。

我找到了另一个使用 guzzleHTTP 的替代方法,这对我很有帮助。 它帮助我实现了 600 tps +,交易延迟为 2 秒。