在 Guzzle 中处理 EachPromise (each_limit) 的聚合承诺

Handle aggregate promise of EachPromise (each_limit) in Guzzle

任务

假设我们有一个 API POST 端点 returns 回答如下:

{
  "data": [1,2,3],
  "total_rows": 20
}

这意味着我们收到了部分数据,只有第一页的 3 个条目。总条目数为 20,这意味着我们要在请求中使用一些偏移量来调用其他页面。因此,总计数(和可能的偏移量)只有在第一个调用完成后才知道,而其他调用不相互依赖,可以同时完成。

工具集

它是 PHP,对于此任务,Guzzle 6 与 Promises/A+ 一起使用。此外,Guzzle 提供了 EachPromise class,它接收一个可迭代的多个承诺和一个配置哈希来设置处理。根据该 class 构造函数的 PhpDoc 块:

Configuration hash can include the following key value pairs:

  • fulfilled: (callable) Invoked when a promise fulfills. The function is invoked with three arguments: the fulfillment value, the index
    position from the iterable list of the promise, and the aggregate
    promise that manages all of the promises. The aggregate promise may
    be resolved from within the callback to short-circuit the promise.
  • rejected: (callable) Invoked when a promise is rejected. The function is invoked with three arguments: the rejection reason, the
    index position from the iterable list of the promise, and the
    aggregate promise that manages all of the promises. The aggregate
    promise may be resolved from within the callback to short-circuit
    the promise.
  • concurrency: (integer) Pass this configuration option to limit the allowed number of outstanding concurrently executing promises,
    creating a capped pool of promises. There is no limit by default.

代码

$paginatedResult = $this->client->sendAsync($this->createRequest($requestBody))
    ->then(function (ResponseInterface $response) use ($requestBody) {
            return $this->deserializeToPaginatedResult(
                $response,
                $requestBody->paginatedResultClass()
            );
        }
    )->wait();

$pageGenerator = function () use ($paginatedResult, $requestBody) {
    $perPageCount = count($paginatedResult->getItems());
    $totalItems = $paginatedResult->getTotalCount();

    for ($currentOffset = $perPageCount; $currentOffset <= $totalItems; $currentOffset += $perPageCount) {
        $newRequestBody = clone $requestBody;
        $newRequestBody->setOffset($currentOffset);

        yield $this->client->sendAsync($this->createRequest($newRequestBody));
    }
};

$aggregatedResult = (new EachPromise(
    $pageGenerator(), [
        'concurrency' => 4,
        'fulfilled' => function ($promiseResult, $promiseIndex, $promiseAggregate) use ($requestBody) {
            $paginatedResult = $this->deserializeToPaginatedResult(
                $promiseResult,
                $requestBody->paginatedResultClass()
            );

            return $paginatedResult->getItems();
        },
    ]
))->promise()
    ->then(
        function ($promisedAggregatedResult) {
            var_dump($promisedAggregatedResult);
        }
    )
    ->wait();

var_dump($aggregatedResult);

问题

在配置哈希中,fulfilled 回调接收 3 个对象,如文档所述。 $promiseResult 可以正确处理,$paginatedResult->getItems() 实际上是 returns 请求页面中的项目数组,但我无法聚合这些项目。 $aggregatedResultnull,最后一个 thened fulfillment 回调中的 $promisedAggregatedResult 也是 null.

问题

如何正确使用 Guzzle 的 EachPromise(及其辅助函数 eacheach_limit)来汇总传递给它的所有承诺的结果?

原因

根据 EachPromise class 描述:

Represents a promise that iterates over many promises and invokes side-effect functions in the process.

所以那些配置哈希调用只是副作用函数,可能可能不会 短路解决聚合承诺。这正是 fulfillment 函数既不影响 $promiseAggregate 也不会在最后 then 实现可调用参数 $promisedAggregatedResult.[=23 中结束的原因=]

有关正确使用的提示可以在 \GuzzleHttp\Promise\all\GuzzleHttp\Promise\some 函数中找到。关键思想是在那些副作用 fulfilled/rejected 回调中通过引用 use 外部聚合。如果聚合承诺从未在这些副作用函数中解析,那么它解析为 null,然后在下一个 then 实现回调中传递。同样,应该 use 在可调用的实现中通过引用聚合到 return 它作为承诺实现值。

实际代码解决方案

$paginatedResult = $this->client->sendAsync($this->createRequest($requestBody))
    ->then(function (ResponseInterface $response) use ($requestBody) {
            return $this->deserializeToPaginatedResult(
                $response,
                $requestBody->paginatedResultClass()
            );
        }
    )->wait();

$pageGenerator = function () use ($paginatedResult, $requestBody) {
    $perPageCount = count($paginatedResult->getItems());
    $totalItems = $paginatedResult->getTotalCount();

    for ($currentOffset = $perPageCount; $currentOffset <= $totalItems; $currentOffset += $perPageCount) {
        $newRequestBody = clone $requestBody;
        $newRequestBody->setOffset($currentOffset);

        yield $this->client->sendAsync($this->createRequest($newRequestBody));
    }
};
$items = $paginatedResult->getItems();

return each_limit(
    $pageGenerator(),
    4,
    function ($promiseResult) use (&$items, $requestBody) {
        $paginatedResult = $this->deserializeToPaginatedResult(
            $promiseResult,
            $requestBody->paginatedResultClass()
        );
        $items = array_merge($items, $paginatedResult->getItems());
    },
    function ($reason, $idx, PromiseInterface $aggregate) {
        $aggregate->reject($reason);
    })->then(function () use (&$items) {
        ksort($items);
        return $items;
    });

你也可以使用Pool::batch()的方法,里面几乎一样。

$pageGenerator = function () use (...) {
    // ...
    yield $this->createRequest($newRequestBody);
    // ...
}

$responses = Pool::batch($this->client, $pageGenerator(), ['concurrency' => 4]);