如何正确管理 Reactor 中的可关闭资源
How to properly manage closable resources in Reactor
我有一个 http 客户端和执行程序,当所有工作完成后应该将其关闭。
我正在尝试以此处为 RxJava 1.x 描述的方式使用 Flux.using 方法:
https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/ResourceManagement.java
我的资源创建方法:
public static Flux<GithubClient> createResource(String token,
int connectionCount) {
return Flux.using(
() -> {
logger.info(Thread.currentThread().getName() + " : Created and started the client.");
return new GithubClient(token, connectionCount);
},
client -> {
logger.info(Thread.currentThread().getName() + " : About to create Observable.");
return Flux.just(client);
},
client -> {
logger.info(Thread.currentThread().getName() + " : Closing the client.");
client.close();
},
false
).doOnSubscribe(subscription -> logger.info("subscribed"));
}
然后我使用:
Flux<StateMutator> dataMutators = GithubClient.createResource(
config.getAccessToken(),
config.getConnectionCount())
.flatMap(client -> client.loadRepository(organization, repository)
问题是客户端连接甚至在发出第一个请求之前就关闭了。
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Created and started the client.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : About to create Observable.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - subscribed
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Closing the client.
java.lang.IllegalStateException: Client instance has been closed.
at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:273)
没有找到 Reactor 的任何示例。
谢谢
再次阅读使用说明,发现错误。使用 return Flux.just(client);
返回客户端没有意义,因为 Flux 会立即终止,这会触发客户端关闭。
我最终实现了:
public static Flux<StateMutator> createAndExecute(GithubPublicConfiguration config,
Function<GithubClient, Flux<StateMutator>> toExecute) {
return Flux.using(
() -> {
logger.debug(Thread.currentThread().getName() + " : Created and started the client.");
return new GithubClient(entityModelHandler, config.getAccessToken(), config.getConnectionCount());
},
client -> toExecute.apply(client),
client -> {
logger.debug(Thread.currentThread().getName() + " : Closing the client.");
client.close();
},
false
);
}
然后我打电话给:
GithubClient.createAndExecute(config,
client -> client.loadRepository(organization, repository))
现在所有操作都按适当的顺序进行。
我有一个 http 客户端和执行程序,当所有工作完成后应该将其关闭。
我正在尝试以此处为 RxJava 1.x 描述的方式使用 Flux.using 方法: https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter08/ResourceManagement.java
我的资源创建方法:
public static Flux<GithubClient> createResource(String token,
int connectionCount) {
return Flux.using(
() -> {
logger.info(Thread.currentThread().getName() + " : Created and started the client.");
return new GithubClient(token, connectionCount);
},
client -> {
logger.info(Thread.currentThread().getName() + " : About to create Observable.");
return Flux.just(client);
},
client -> {
logger.info(Thread.currentThread().getName() + " : Closing the client.");
client.close();
},
false
).doOnSubscribe(subscription -> logger.info("subscribed"));
}
然后我使用:
Flux<StateMutator> dataMutators = GithubClient.createResource(
config.getAccessToken(),
config.getConnectionCount())
.flatMap(client -> client.loadRepository(organization, repository)
问题是客户端连接甚至在发出第一个请求之前就关闭了。
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Created and started the client.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : About to create Observable.
[main] INFO com.sapho.services.githubpublic.client.GithubClient - subscribed
[main] INFO com.sapho.services.githubpublic.client.GithubClient - main : Closing the client.
java.lang.IllegalStateException: Client instance has been closed.
at jersey.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.glassfish.jersey.client.JerseyClient.checkNotClosed(JerseyClient.java:273)
没有找到 Reactor 的任何示例。
谢谢
再次阅读使用说明,发现错误。使用 return Flux.just(client);
返回客户端没有意义,因为 Flux 会立即终止,这会触发客户端关闭。
我最终实现了:
public static Flux<StateMutator> createAndExecute(GithubPublicConfiguration config,
Function<GithubClient, Flux<StateMutator>> toExecute) {
return Flux.using(
() -> {
logger.debug(Thread.currentThread().getName() + " : Created and started the client.");
return new GithubClient(entityModelHandler, config.getAccessToken(), config.getConnectionCount());
},
client -> toExecute.apply(client),
client -> {
logger.debug(Thread.currentThread().getName() + " : Closing the client.");
client.close();
},
false
);
}
然后我打电话给:
GithubClient.createAndExecute(config,
client -> client.loadRepository(organization, repository))
现在所有操作都按适当的顺序进行。