使用 Reactor Netty 连接到服务器消息队列
Connecting to a Server message queue using Reactor Netty
我正在尝试使用 Reactor Netty 连接到 docker 容器上的消息队列 运行。由于依赖性问题,我是独立执行此操作的,未使用 SpringFlux。
从 Reactor Netty 文档中的示例中,我看到有一种方法可以连接到服务器并获得响应:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asString()
.block();
}
但是当我之后尝试通过 System.out.println() 显示输出时,没有任何反应。
我也试过了解如何使用:
Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)
但我不确定具体要做什么。
我在文档中看到有一个名为 Connection 的 class,它使用 TCPClient 并具有订阅方法。
我有点迷茫,你能给我指出在不使用 spring-flux 的情况下在 Reactor Netty 中实现这个的正确方向吗?
谢谢
编辑:
经过一些实验,我得到了这个:
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((res, bytes) - > {
System.out.println(bytes.asString());
return bytes.asString();})
.subscribe();
}
这给了我一个 FluxHandle,我如何使用它来实际读取响应的主体?
所以我想出了如何订阅和读取从服务器收到的数据,甚至使用 jackson
库将数据转换为 JSON,以便我的代码更容易读取。
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((resp, bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response, MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " + ex);
}
});
}
似乎在使用 subscribe() 方法时我可以收听传入的响应并对其进行处理。我仍然需要添加一种方法,以便在服务器停止或消息队列关闭时关闭连接,这样客户端就不会挂在 non-existent 消息队列上。
我正在尝试使用 Reactor Netty 连接到 docker 容器上的消息队列 运行。由于依赖性问题,我是独立执行此操作的,未使用 SpringFlux。
从 Reactor Netty 文档中的示例中,我看到有一种方法可以连接到服务器并获得响应:
public static void main(String[] args) {
String response =
HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.responseContent()
.aggregate()
.asString()
.block();
}
但是当我之后尝试通过 System.out.println() 显示输出时,没有任何反应。
我也试过了解如何使用:
Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)
但我不确定具体要做什么。 我在文档中看到有一个名为 Connection 的 class,它使用 TCPClient 并具有订阅方法。
我有点迷茫,你能给我指出在不使用 spring-flux 的情况下在 Reactor Netty 中实现这个的正确方向吗?
谢谢
编辑:
经过一些实验,我得到了这个:
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((res, bytes) - > {
System.out.println(bytes.asString());
return bytes.asString();})
.subscribe();
}
这给了我一个 FluxHandle,我如何使用它来实际读取响应的主体?
所以我想出了如何订阅和读取从服务器收到的数据,甚至使用 jackson
库将数据转换为 JSON,以便我的代码更容易读取。
private Disposable subscribe() {
return HttpClient.create()
.headers(h -> h.add("my header", my_header)
.get()
.uri(my_uri)
.response((resp, bytes) -> {
return bytes.asString();
})
.subscribe(response -> {
try {
consumeData(new ObjectMapper()
.readValue(response, MyData.class));
} catch (IOException ex) {
System.out.println("ERROR converting to json: " + ex);
}
});
}
似乎在使用 subscribe() 方法时我可以收听传入的响应并对其进行处理。我仍然需要添加一种方法,以便在服务器停止或消息队列关闭时关闭连接,这样客户端就不会挂在 non-existent 消息队列上。