Amphp : 运行 多个具有相同连接的异步循环(事件存储客户端)
Amphp : Run many async loops with same connection (eventstore client)
我正在使用使用 amphp 的 eventstore 客户端。我需要在我的应用程序内部重用多个部分的连接。
所以我创建了一个连接提供程序:
public function getConnection(): EventStoreConnection
{
if ($this->connection) {
return $this->connection;
}
$this->connection = $this->createConnection();
wait($this->connection->connectAsync());
return $this->connection;
}
然后我在很多地方使用这个连接:
\Amp\Loop::run(function () use ($eventStoreEvents, $streamName) {
$connection = $this->connectionProvider->getConnection();
// Creation of an event stream
yield $connection->appendToStreamAsync($streamName, ExpectedVersion::ANY, $eventStoreEvents);
// sleep(10); // This sleep does not work, code continue like nothing happend
});
\Amp\Loop::run(function () use ($streamName, $aggregateFqcn, &$aggregateRoot) {
$start = 0;
$count = \Prooph\EventStore\Internal\Consts::MAX_READ_SIZE;
$connection = $this->connectionProvider->getConnection();
do {
$events = [];
/** @var StreamEventsSlice $streamEventsSlice */
$streamEventsSlice = yield $connection
->readStreamEventsForwardAsync(
$streamName,
$start,
$count,
true
);
if (!$streamEventsSlice->status()->equals(SliceReadStatus::success())) {
dump($streamEventsSlice); // Event stream does not exist
// Error here: the event stream doesn't exist at this point.
throw new RuntimeGangxception('Impossible to generate the aggregate');
}
} while (! $streamEventsSlice->isEndOfStream());
});
问题:好像第一个请求还没有结束,但是第二个循环已经开始了。 sleep uncommented 没有任何作用!
但是事件流最终是用里面的相关事件创建的,所以第一个请求成功了。
如果我启动一个连接然后关闭然后启动一个新连接,它就可以工作。但由于每个新连接的握手开销,速度很慢。
我用 Amphp 的 WebSocket 库尝试了一个类似的例子,它成功了。你看有什么不对吗?
这是我使用 websocket 进行的测试:
$connection = \Amp\Promise\wait(connect('ws://localhost:8080'));
Amp\Loop::run(function () use ($connection) {
/** @var Connection $connection */
yield $connection->send("Hello...");
sleep(10); // This sleep works!
});
Amp\Loop::run(function () use ($connection) {
/** @var Connection $connection */
yield $connection->send("... World !");
});
$connection->close();
你想做的事毫无意义。你应该阅读 amphp's documenation.
Amp 为事件循环使用了一个全局访问器,因为每个应用程序只有一个事件循环。同时有两个循环运行是没有意义的,因为它们必须以忙碌等待的方式相互调度才能正确运行。
也就是说,实际上没有第二个循环。
Prooph eventstore 库基于 amphp 但不遵循所有原则:您不能等待连接准备就绪。如果你试图大规模使用它会更糟,所以不要试图等待承诺完成。
作为替代方案,您可以为以后设置承诺并检查连接是否为空。这实际上是图书馆内部处理进一步步骤的方式。
在我这边,我决定停止使用这个库。但作为替代方案,您可以使用使用 HTTP 客户端的库,它也来自 prooph 团队。
我正在使用使用 amphp 的 eventstore 客户端。我需要在我的应用程序内部重用多个部分的连接。
所以我创建了一个连接提供程序:
public function getConnection(): EventStoreConnection
{
if ($this->connection) {
return $this->connection;
}
$this->connection = $this->createConnection();
wait($this->connection->connectAsync());
return $this->connection;
}
然后我在很多地方使用这个连接:
\Amp\Loop::run(function () use ($eventStoreEvents, $streamName) {
$connection = $this->connectionProvider->getConnection();
// Creation of an event stream
yield $connection->appendToStreamAsync($streamName, ExpectedVersion::ANY, $eventStoreEvents);
// sleep(10); // This sleep does not work, code continue like nothing happend
});
\Amp\Loop::run(function () use ($streamName, $aggregateFqcn, &$aggregateRoot) {
$start = 0;
$count = \Prooph\EventStore\Internal\Consts::MAX_READ_SIZE;
$connection = $this->connectionProvider->getConnection();
do {
$events = [];
/** @var StreamEventsSlice $streamEventsSlice */
$streamEventsSlice = yield $connection
->readStreamEventsForwardAsync(
$streamName,
$start,
$count,
true
);
if (!$streamEventsSlice->status()->equals(SliceReadStatus::success())) {
dump($streamEventsSlice); // Event stream does not exist
// Error here: the event stream doesn't exist at this point.
throw new RuntimeGangxception('Impossible to generate the aggregate');
}
} while (! $streamEventsSlice->isEndOfStream());
});
问题:好像第一个请求还没有结束,但是第二个循环已经开始了。 sleep uncommented 没有任何作用!
但是事件流最终是用里面的相关事件创建的,所以第一个请求成功了。
如果我启动一个连接然后关闭然后启动一个新连接,它就可以工作。但由于每个新连接的握手开销,速度很慢。
我用 Amphp 的 WebSocket 库尝试了一个类似的例子,它成功了。你看有什么不对吗?
这是我使用 websocket 进行的测试:
$connection = \Amp\Promise\wait(connect('ws://localhost:8080'));
Amp\Loop::run(function () use ($connection) {
/** @var Connection $connection */
yield $connection->send("Hello...");
sleep(10); // This sleep works!
});
Amp\Loop::run(function () use ($connection) {
/** @var Connection $connection */
yield $connection->send("... World !");
});
$connection->close();
你想做的事毫无意义。你应该阅读 amphp's documenation.
Amp 为事件循环使用了一个全局访问器,因为每个应用程序只有一个事件循环。同时有两个循环运行是没有意义的,因为它们必须以忙碌等待的方式相互调度才能正确运行。
也就是说,实际上没有第二个循环。
Prooph eventstore 库基于 amphp 但不遵循所有原则:您不能等待连接准备就绪。如果你试图大规模使用它会更糟,所以不要试图等待承诺完成。
作为替代方案,您可以为以后设置承诺并检查连接是否为空。这实际上是图书馆内部处理进一步步骤的方式。
在我这边,我决定停止使用这个库。但作为替代方案,您可以使用使用 HTTP 客户端的库,它也来自 prooph 团队。