如何在 vert.x "rxResponse().subscribe" 回调之外订阅 "HttpClientResponse::toFlowable()"

How to subscribe to "HttpClientResponse::toFlowable()" outside of the vert.x "rxResponse().subscribe" callback

我正在尝试在事件循环之外使用 HttpClientResponse.toFlowable() 的内容,使用 AtomicReference 来保存 Flowable。

所以,我设置了一个测试,returns 正文,然后在预告片解码期间失败。因此,我对 flowable 的订阅应该会收到响应正文,然后是错误。

这是我在之后(仍在回调中)订阅 Flowable 时观察到的结果,但在订阅回调之外的流时却观察不到。在后一种情况下,我的测试通过了 1/3 的时间...

The vertx-rx doc 明确表示它是一个热流,应该在事件循环中订阅或暂停。

所以,我尝试在订阅前暂停响应,但我得到了相同的测试结果:只有 1/3 的运行同时获得正文和异常。

此外,在查看 Vert.x HttpClientResponse::toFlowable() 实现时,我可以看到 readStream.pause() 无论如何都会生成。

所以我的问题是... 我是不是遗漏了什么?或者它可能是一个 Vert.x 错误?

这是我正在使用的代码:

package my.tests;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;

import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.net.NetServer;

public class VxTestRx3 {
    static NetServer server;
    static HttpClient client;

    @BeforeAll
    static void setUp() throws InterruptedException {
        server = startServer();
        client = Vertx.vertx()
                      .createHttpClient();
    }

    @RepeatedTest(1000)
    void shouldReachOnNextAndOnError() throws Exception {

        CountDownLatch bodyReadLatch = new CountDownLatch(1);
        CountDownLatch responseReadyToRead = new CountDownLatch(1);
        AtomicReference<Flowable<Buffer>> flowableRef = new AtomicReference<>();
        AtomicInteger ab = new AtomicInteger(0);
        client.rxRequest(HttpMethod.GET,
                         server.actualPort(),
                         "localhost",
                         "/")
              .subscribe(request -> {
                  request.rxResponse()
                         .subscribe(response -> {
                             // even with "response.pause()" here, the test is failing.
                             flowableRef.set(response.toFlowable());
                             responseReadyToRead.countDown();
                             // If the Flowable is subscribed here : I get my onNext & onError called.
                             /*flowableRef.get()
                                        .doOnComplete(bodyReadLatch::countDown)
                                        .subscribe(buffer -> ab.addAndGet(1),
                                                   t -> {
                                                       ab.addAndGet(2);
                                                       bodyReadLatch.countDown();
                                                   });*/
                         });
                  Flowable.<Buffer>empty().subscribe(request.toSubscriber());
              });
        responseReadyToRead.await();
        // If the Flowable is subscribed here : the behaviour is not stable.
        flowableRef.get()
                   .doOnComplete(bodyReadLatch::countDown)
                   .subscribe(buffer -> ab.addAndGet(1),
                              t -> {
                                  ab.addAndGet(2);
                                  bodyReadLatch.countDown();
                              });
        bodyReadLatch.await();
        // Should have been in onNext then onError callbacks.
        Assertions.assertThat(ab.get()).isEqualTo(3);
    }

    // This starts a server synchronously.
    // It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
    private static NetServer startServer() throws InterruptedException {
        CountDownLatch serverReady = new CountDownLatch(1);
        AtomicReference<NetServer> servRef = new AtomicReference<>();
        Vertx.vertx().createNetServer()
                                .connectHandler(socket -> {
                                    String content1 = "HTTP/1.1 200 OK\r\n"
                                            + "transfer-encoding: chunked\r\n"
                                            + "\r\n"
                                            + "2\r\n"
                                            + "OK"
                                            + "\r\n"
                                            + "0\r\n"
                                            + "é: trailerValue\r\n" // é is an invalid trailer name
                                            + "\r\n";
                                    // Read the request content, then write content, then close the socket.
                                    socket.handler(b -> socket.write(content1).subscribe(socket::close));
                                })
                                .listen()
                .subscribe(netServer -> {
                    servRef.set(netServer);
                    serverReady.countDown();
                });
        serverReady.await();
        return servRef.get();
    }
}

好的,经过更多搜索,我发现了发生的事情。

简化

首先,我已经 re-created 测试删除了 rx-vert 的使用,并在套接字上添加了一个异常处理程序:

package org.tests;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.NetServer;
import io.vertx.core.streams.ReadStream;

public class VxTestNonRx {
    static NetServer server;
    static HttpClient client;

    @BeforeAll
    static void setUp() throws InterruptedException {
        server = startServer();
        client = Vertx.vertx().createHttpClient();
    }

    @RepeatedTest(10000)
    void shouldReachOnNextAndOnError() throws Exception {
        CountDownLatch bodyReadLatch = new CountDownLatch(1);
        CountDownLatch responseReadyToRead = new CountDownLatch(1);
        AtomicReference<ReadStream<Buffer>> readStreamRef = new AtomicReference<>();
        AtomicInteger ab = new AtomicInteger(0);
        client.request(HttpMethod.GET,
                       server.actualPort(),
                       "localhost",
                       "/")
              .compose(HttpClientRequest::send)
              .onSuccess(response -> {
                  // Catches exceptions when they happen before we subscribed to the ReadStream
                  response.netSocket().exceptionHandler(t -> System.out.println("NET EXCEPTION HANDLER" + t.getMessage()));
                  //response.pause(); // Try to pause FIRST to avoid skipping buffers
                  // When handlers are set here : 
                  //  - with pause(): 100% times exceptionHandler only.
                  //  - without pause(): 100% times : handler, then exceptionHandler
                  readStreamRef.set(response);
                  responseReadyToRead.countDown();
              });
        responseReadyToRead.await();
        // When handlers are set here : 
        // - without pause() : 
        //     - 66% times : handler, then exceptionHandler
        //     - 33% times : only exceptionHandler gets called.
        // - with pause() : 
        //     - 99% times : only onError gets called 
        //     - 1% times  : netSocket().exceptionHandler, then handler gets called. (but neither endHandler nor exceptionHandler are called !)
        readStreamRef.get()
                     .handler(buffer -> {
                         System.out.println("HANDLER");
                         ab.addAndGet(1);
                     })
                     .endHandler(unused -> {
                         System.out.println("END HANDLER");
                         bodyReadLatch.countDown();
                     })
                     .exceptionHandler(t -> {
                         System.out.println("ERROR HANDLER");
                         ab.addAndGet(2);
                         bodyReadLatch.countDown();
                     });
        readStreamRef.get().resume();
        bodyReadLatch.await();
        // Should have been in onNext then onError callbacks.
        Assertions.assertThat(ab.get()).isEqualTo(3);
    }

    // This starts a server synchronously.
    // It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
    private static NetServer startServer() throws InterruptedException {
        CountDownLatch serverReady = new CountDownLatch(1);
        AtomicReference<NetServer> servRef = new AtomicReference<>();
        Vertx.vertx().createNetServer()
             .connectHandler(socket -> {
                 String content1 = "HTTP/1.1 200 OK\r\n"
                         + "transfer-encoding: chunked\r\n"
                         + "\r\n"
                         + "2\r\n"
                         + "OK"
                         + "\r\n"
                         + "0\r\n"
                         + "é: trailerValue\r\n" // é is an invalid trailer name
                         + "\r\n";
                 // Read the request content, then write content, then close the socket.
                 socket.handler(b -> socket.write(content1, ar -> socket.close()));
             })
             .listen()
             .onSuccess(netServer -> {
                 servRef.set(netServer);
                 serverReady.countDown();
             });
        serverReady.await();
        return servRef.get();
    }

观察

正如 java 评论中所写,这是我的发现:

  • ReadStream 处理程序在收到响应后立即设置时:

    • pause() 首先读取 ReadStream 时:我没有得到任何缓冲区,ReadStream.exceptionHandler 总是被调用。
    • 没有 pause():我总是在 ReadStream.handler 中接收正文缓冲区,然后 ReadStream.exceptionHandler 被调用。
  • 当在接收到的响应回调上下文之外设置处理程序时:

    • pause()首先读取 ReadStream 时(仍在回调上下文中):
      • 99% 的时间:只有 ReadStream.exceptionHandler 被调用
      • 1% 的时间:netSocket().exceptionHandler 被调用,然后处理程序被调用,但 ReadStream.endHandlerReadStream.exceptionHandler 都没有被调用,流“永无止境”!!
    • 没有pause()ing
      • 66% 的时间:ReadStream.handler 被调用,然后是 ReadStream.exceptionHandler.
      • 33% 的时间:只有 ReadStream.exceptionHandler 被调用。

解释

直接在“收到响应”回调中设置处理程序时,行为稳定,这有助于理解 ReadStream.pause() 的工作原理。

暂停时,Vert.x不会停止阅读回复内容。它将缓冲 ByteBuffer 以便能够在 ReadStream.fetch(x)ReadStream.resume() 上为它们提供服务。 但是当它遇到错误时(在我的例子中,在读取预告片时,在正文缓冲之后)它仍然会调用设置为 ReadStream.exceptionHandler 的回调。在那种情况下,正文没有被读取(因为暂停),但是 pre-defined exceptionHandler 被调用了。

但是,如果没有任何 ReadStream.pause(),一旦“收到响应”事件结束,请求的缓冲区(这是另一个 eventloop 事件)将直接发送到已注册的 ReadStream.handler,然后 vert.x 会发现错误并调用注册的 ReadStream.exceptionHandler.

这很好地解释了我的稳定案例。现在,让我们解释一下不稳定的!

在“收到响应”承诺范围之外设置 ReadStream 的处理程序时,可能会错过一些事件。这很简单,当 vert.x 超出“收到响应”范围时,即事件结束,然后:

  • Vert.x 处理缓冲区(或跳过它们并在暂停时保存它们)作为其他事件的一部分,可能在同一事件循环中,但 ReadStream.handler 注册是在上下文之外进行的这次 !那是另一个线程,这段代码可能还没有 运行 !因此发送到 ReadStream 的缓冲区将丢失。
  • 更糟糕的是,当发现尾部错误时,如果尚未设置异常处理程序,Vert.x 将退回到它可以获得的下一个异常处理程序:TCP 连接一(response.netSocket().exceptionHandler), 或者如果都没有设置...事件循环异常处理程序,等等。所以错误是......丢失,即使在暂停流时也是如此。

这解释了为什么在“响应接收事件”上下文之外定义我的处理程序时,我注册的 ReadStream.handler 并不总是被调用(缓冲区事件已在注册之前发送)。但幸运的是,在 99% 的时间里,异常发生在我的异常处理程序注册之后。现在解释了最后 1%,在 vert.x 处理异常时既没有注册处理程序,也没有注册异常处理程序...

这如何适用于最初的 rx-vertx 问题?

嗯...调用 response.toFlowable()ReadStream 处理程序映射到 Flowable 即将到来的订阅。 但同样,对生成的 Flowable 的订阅发生在“收到响应事件”的范围之外。因此,事件可能在实际订阅之前就已经流动了!

最终,这是一个 vert.x 问题吗?

嗯...不,我误用了它。

但是当 ReadStream 终端方法 (endHandler/exceptionHandler) 已经被调用时,ReadStream 的 handler/endHandler/exceptionHandler 注册失败可能是件好事。 因此,我们永远不会永远等待 ReadStream 发布缓冲区,并且可以获得更清晰的异常消息!