使用 Reactor Netty 连接到服务器消息队列

Connecting to a Server message queue using Reactor Netty

我正在尝试使用 Reactor Netty 连接到 docker 容器上的消息队列 运行。由于依赖性问题,我是独立执行此操作的,未使用 SpringFlux。

从 Reactor Netty 文档中的示例中,我看到有一种方法可以连接到服务器并获得响应:

public static void main(String[] args) {
        String response =
                HttpClient.create()
                          .headers(h -> h.add("my header", my_header)
                          .get()
                          .uri(my_uri)
                          .responseContent() 
                          .aggregate()       
                          .asString()        
                          .block();
    }

但是当我之后尝试通过 System.out.println() 显示输出时,没有任何反应。

我也试过了解如何使用:

Flux<V> response(BiFunction<HttpClientResponse,ByteBufFlux,Publisher<V>> receiver)

但我不确定具体要做什么。 我在文档中看到有一个名为 Connection 的 class,它使用 TCPClient 并具有订阅方法。

我有点迷茫,你能给我指出在不使用 spring-flux 的情况下在 Reactor Netty 中实现这个的正确方向吗?

谢谢

编辑:

经过一些实验,我得到了这个:

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((res, bytes) - > {
                   System.out.println(bytes.asString());
                   return bytes.asString();})
               .subscribe();
}

这给了我一个 FluxHandle,我如何使用它来实际读取响应的主体?

所以我想出了如何订阅和读取从服务器收到的数据,甚至使用 jackson 库将数据转换为 JSON,以便我的代码更容易读取。

private Disposable subscribe() {
    return HttpClient.create()
               .headers(h -> h.add("my header", my_header)
               .get()
               .uri(my_uri)
               .response((resp, bytes) -> {
                    return bytes.asString();
                })
                .subscribe(response -> {
                    try {
                        consumeData(new ObjectMapper()
                                .readValue(response, MyData.class));
                    } catch (IOException ex) {
                        System.out.println("ERROR converting to json: " + ex);
                    }
                    });
}

似乎在使用 subscribe() 方法时我可以收听传入的响应并对其进行处理。我仍然需要添加一种方法,以便在服务器停止或消息队列关闭时关闭连接,这样客户端就不会挂在 non-existent 消息队列上。