Rx Netty:异步客户端不断收到 "Content stream is already disposed" 错误
Rx Netty: Async client keeps getting "Content stream is already disposed" error
在下面的代码中,我尝试使用 Rx Netty 构建一个简单的异步 HTTP 客户端,它向 api 发出 75 POST 次请求。我遇到的问题是我不断收到 "java.lang.IllegalStateException: Content stream is already disposed." 错误。
我在这里做错了什么?是否与此有关:https://github.com/ReactiveX/RxNetty/issues/264
NioEventLoopGroup provider = new NioEventLoopGroup();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("my-api.com", 80)
.eventloop(provider)
.build();
Gson gson = new Gson();
Observable.range(1, 75, Schedulers.from(provider))
.flatMap(count -> {
Data data = new Data("test" + count);
return client.submit(
HttpClientRequest.createPost("/create")
.withHeader("Authorization", AUTH_HEADER)
.withHeader("Content-Type", "application/json")
.withContent(gson.toJson(data))
);
})
.flatMap(response -> {
return response.getContent().map((ByteBuf content) -> {
return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class);
});
})
.subscribe(
data -> logger.info("item done"),
err -> {
logger.error("Error", err);
provider.shutdownGracefully();
},
() -> {
logger.info("done");
provider.shutdownGracefully();
}
);
我意识到,当您执行 response.getContent()
时,需要在地图而不是平面地图内完成。例如:
NioEventLoopGroup provider = new NioEventLoopGroup();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf,
ByteBuf>("my-api.com", 80)
.eventloop(provider)
.build();
Gson gson = new Gson();
Observable.range(1, 75, Schedulers.from(provider))
.flatMap(count -> {
Data data = new Data("test" + count);
return client.submit(
HttpClientRequest.createPost("/create")
.withHeader("Authorization", AUTH_HEADER)
.withHeader("Content-Type", "application/json")
.withContent(gson.toJson(data))
);
})
// map not flatMap
.map(response -> {
return response.getContent().map((ByteBuf content) -> {
return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class);
});
})
.subscribe(
data -> logger.info("item done"),
err -> {
logger.error("Error", err);
provider.shutdownGracefully();
},
() -> {
logger.info("done");
provider.shutdownGracefully();
}
);
在下面的代码中,我尝试使用 Rx Netty 构建一个简单的异步 HTTP 客户端,它向 api 发出 75 POST 次请求。我遇到的问题是我不断收到 "java.lang.IllegalStateException: Content stream is already disposed." 错误。
我在这里做错了什么?是否与此有关:https://github.com/ReactiveX/RxNetty/issues/264
NioEventLoopGroup provider = new NioEventLoopGroup();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("my-api.com", 80)
.eventloop(provider)
.build();
Gson gson = new Gson();
Observable.range(1, 75, Schedulers.from(provider))
.flatMap(count -> {
Data data = new Data("test" + count);
return client.submit(
HttpClientRequest.createPost("/create")
.withHeader("Authorization", AUTH_HEADER)
.withHeader("Content-Type", "application/json")
.withContent(gson.toJson(data))
);
})
.flatMap(response -> {
return response.getContent().map((ByteBuf content) -> {
return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class);
});
})
.subscribe(
data -> logger.info("item done"),
err -> {
logger.error("Error", err);
provider.shutdownGracefully();
},
() -> {
logger.info("done");
provider.shutdownGracefully();
}
);
我意识到,当您执行 response.getContent()
时,需要在地图而不是平面地图内完成。例如:
NioEventLoopGroup provider = new NioEventLoopGroup();
HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf,
ByteBuf>("my-api.com", 80)
.eventloop(provider)
.build();
Gson gson = new Gson();
Observable.range(1, 75, Schedulers.from(provider))
.flatMap(count -> {
Data data = new Data("test" + count);
return client.submit(
HttpClientRequest.createPost("/create")
.withHeader("Authorization", AUTH_HEADER)
.withHeader("Content-Type", "application/json")
.withContent(gson.toJson(data))
);
})
// map not flatMap
.map(response -> {
return response.getContent().map((ByteBuf content) -> {
return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class);
});
})
.subscribe(
data -> logger.info("item done"),
err -> {
logger.error("Error", err);
provider.shutdownGracefully();
},
() -> {
logger.info("done");
provider.shutdownGracefully();
}
);