如何在 vert.x "rxResponse().subscribe" 回调之外订阅 "HttpClientResponse::toFlowable()"
How to subscribe to "HttpClientResponse::toFlowable()" outside of the vert.x "rxResponse().subscribe" callback
我正在尝试在事件循环之外使用 HttpClientResponse.toFlowable()
的内容,使用 AtomicReference 来保存 Flowable。
所以,我设置了一个测试,returns 正文,然后在预告片解码期间失败。因此,我对 flowable 的订阅应该会收到响应正文,然后是错误。
这是我在之后(仍在回调中)订阅 Flowable 时观察到的结果,但在订阅回调之外的流时却观察不到。在后一种情况下,我的测试通过了 1/3 的时间...
The vertx-rx doc 明确表示它是一个热流,应该在事件循环中订阅或暂停。
所以,我尝试在订阅前暂停响应,但我得到了相同的测试结果:只有 1/3 的运行同时获得正文和异常。
此外,在查看 Vert.x HttpClientResponse::toFlowable()
实现时,我可以看到 readStream.pause()
无论如何都会生成。
所以我的问题是... 我是不是遗漏了什么?或者它可能是一个 Vert.x 错误?
这是我正在使用的代码:
package my.tests;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.net.NetServer;
public class VxTestRx3 {
static NetServer server;
static HttpClient client;
@BeforeAll
static void setUp() throws InterruptedException {
server = startServer();
client = Vertx.vertx()
.createHttpClient();
}
@RepeatedTest(1000)
void shouldReachOnNextAndOnError() throws Exception {
CountDownLatch bodyReadLatch = new CountDownLatch(1);
CountDownLatch responseReadyToRead = new CountDownLatch(1);
AtomicReference<Flowable<Buffer>> flowableRef = new AtomicReference<>();
AtomicInteger ab = new AtomicInteger(0);
client.rxRequest(HttpMethod.GET,
server.actualPort(),
"localhost",
"/")
.subscribe(request -> {
request.rxResponse()
.subscribe(response -> {
// even with "response.pause()" here, the test is failing.
flowableRef.set(response.toFlowable());
responseReadyToRead.countDown();
// If the Flowable is subscribed here : I get my onNext & onError called.
/*flowableRef.get()
.doOnComplete(bodyReadLatch::countDown)
.subscribe(buffer -> ab.addAndGet(1),
t -> {
ab.addAndGet(2);
bodyReadLatch.countDown();
});*/
});
Flowable.<Buffer>empty().subscribe(request.toSubscriber());
});
responseReadyToRead.await();
// If the Flowable is subscribed here : the behaviour is not stable.
flowableRef.get()
.doOnComplete(bodyReadLatch::countDown)
.subscribe(buffer -> ab.addAndGet(1),
t -> {
ab.addAndGet(2);
bodyReadLatch.countDown();
});
bodyReadLatch.await();
// Should have been in onNext then onError callbacks.
Assertions.assertThat(ab.get()).isEqualTo(3);
}
// This starts a server synchronously.
// It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
private static NetServer startServer() throws InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);
AtomicReference<NetServer> servRef = new AtomicReference<>();
Vertx.vertx().createNetServer()
.connectHandler(socket -> {
String content1 = "HTTP/1.1 200 OK\r\n"
+ "transfer-encoding: chunked\r\n"
+ "\r\n"
+ "2\r\n"
+ "OK"
+ "\r\n"
+ "0\r\n"
+ "é: trailerValue\r\n" // é is an invalid trailer name
+ "\r\n";
// Read the request content, then write content, then close the socket.
socket.handler(b -> socket.write(content1).subscribe(socket::close));
})
.listen()
.subscribe(netServer -> {
servRef.set(netServer);
serverReady.countDown();
});
serverReady.await();
return servRef.get();
}
}
好的,经过更多搜索,我发现了发生的事情。
简化
首先,我已经 re-created 测试删除了 rx-vert 的使用,并在套接字上添加了一个异常处理程序:
package org.tests;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.NetServer;
import io.vertx.core.streams.ReadStream;
public class VxTestNonRx {
static NetServer server;
static HttpClient client;
@BeforeAll
static void setUp() throws InterruptedException {
server = startServer();
client = Vertx.vertx().createHttpClient();
}
@RepeatedTest(10000)
void shouldReachOnNextAndOnError() throws Exception {
CountDownLatch bodyReadLatch = new CountDownLatch(1);
CountDownLatch responseReadyToRead = new CountDownLatch(1);
AtomicReference<ReadStream<Buffer>> readStreamRef = new AtomicReference<>();
AtomicInteger ab = new AtomicInteger(0);
client.request(HttpMethod.GET,
server.actualPort(),
"localhost",
"/")
.compose(HttpClientRequest::send)
.onSuccess(response -> {
// Catches exceptions when they happen before we subscribed to the ReadStream
response.netSocket().exceptionHandler(t -> System.out.println("NET EXCEPTION HANDLER" + t.getMessage()));
//response.pause(); // Try to pause FIRST to avoid skipping buffers
// When handlers are set here :
// - with pause(): 100% times exceptionHandler only.
// - without pause(): 100% times : handler, then exceptionHandler
readStreamRef.set(response);
responseReadyToRead.countDown();
});
responseReadyToRead.await();
// When handlers are set here :
// - without pause() :
// - 66% times : handler, then exceptionHandler
// - 33% times : only exceptionHandler gets called.
// - with pause() :
// - 99% times : only onError gets called
// - 1% times : netSocket().exceptionHandler, then handler gets called. (but neither endHandler nor exceptionHandler are called !)
readStreamRef.get()
.handler(buffer -> {
System.out.println("HANDLER");
ab.addAndGet(1);
})
.endHandler(unused -> {
System.out.println("END HANDLER");
bodyReadLatch.countDown();
})
.exceptionHandler(t -> {
System.out.println("ERROR HANDLER");
ab.addAndGet(2);
bodyReadLatch.countDown();
});
readStreamRef.get().resume();
bodyReadLatch.await();
// Should have been in onNext then onError callbacks.
Assertions.assertThat(ab.get()).isEqualTo(3);
}
// This starts a server synchronously.
// It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
private static NetServer startServer() throws InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);
AtomicReference<NetServer> servRef = new AtomicReference<>();
Vertx.vertx().createNetServer()
.connectHandler(socket -> {
String content1 = "HTTP/1.1 200 OK\r\n"
+ "transfer-encoding: chunked\r\n"
+ "\r\n"
+ "2\r\n"
+ "OK"
+ "\r\n"
+ "0\r\n"
+ "é: trailerValue\r\n" // é is an invalid trailer name
+ "\r\n";
// Read the request content, then write content, then close the socket.
socket.handler(b -> socket.write(content1, ar -> socket.close()));
})
.listen()
.onSuccess(netServer -> {
servRef.set(netServer);
serverReady.countDown();
});
serverReady.await();
return servRef.get();
}
观察
正如 java 评论中所写,这是我的发现:
当 ReadStream
处理程序在收到响应后立即设置时:
- 当
pause()
首先读取 ReadStream 时:我没有得到任何缓冲区,ReadStream.exceptionHandler
总是被调用。
- 没有
pause()
:我总是在 ReadStream.handler
中接收正文缓冲区,然后 ReadStream.exceptionHandler
被调用。
当在接收到的响应回调上下文之外设置处理程序时:
- 当
pause()
首先读取 ReadStream 时(仍在回调上下文中):
- 99% 的时间:只有
ReadStream.exceptionHandler
被调用
- 1% 的时间:
netSocket().exceptionHandler
被调用,然后处理程序被调用,但 ReadStream.endHandler
和 ReadStream.exceptionHandler
都没有被调用,流“永无止境”!!
- 没有
pause()ing
:
- 66% 的时间:
ReadStream.handler
被调用,然后是 ReadStream.exceptionHandler
.
- 33% 的时间:只有
ReadStream.exceptionHandler
被调用。
解释
直接在“收到响应”回调中设置处理程序时,行为稳定,这有助于理解 ReadStream.pause()
的工作原理。
暂停时,Vert.x不会停止阅读回复内容。它将缓冲 ByteBuffer 以便能够在 ReadStream.fetch(x)
或 ReadStream.resume()
上为它们提供服务。
但是当它遇到错误时(在我的例子中,在读取预告片时,在正文缓冲之后)它仍然会调用设置为 ReadStream.exceptionHandler
的回调。在那种情况下,正文没有被读取(因为暂停),但是 pre-defined exceptionHandler 被调用了。
但是,如果没有任何 ReadStream.pause()
,一旦“收到响应”事件结束,请求的缓冲区(这是另一个 eventloop 事件)将直接发送到已注册的 ReadStream.handler
,然后 vert.x 会发现错误并调用注册的 ReadStream.exceptionHandler
.
这很好地解释了我的稳定案例。现在,让我们解释一下不稳定的!
在“收到响应”承诺范围之外设置 ReadStream
的处理程序时,可能会错过一些事件。这很简单,当 vert.x 超出“收到响应”范围时,即事件结束,然后:
- Vert.x 处理缓冲区(或跳过它们并在暂停时保存它们)作为其他事件的一部分,可能在同一事件循环中,但
ReadStream.handler
注册是在上下文之外进行的这次 !那是另一个线程,这段代码可能还没有 运行 !因此发送到 ReadStream 的缓冲区将丢失。
- 更糟糕的是,当发现尾部错误时,如果尚未设置异常处理程序,Vert.x 将退回到它可以获得的下一个异常处理程序:TCP 连接一(
response.netSocket().exceptionHandler
), 或者如果都没有设置...事件循环异常处理程序,等等。所以错误是......丢失,即使在暂停流时也是如此。
这解释了为什么在“响应接收事件”上下文之外定义我的处理程序时,我注册的 ReadStream.handler
并不总是被调用(缓冲区事件已在注册之前发送)。但幸运的是,在 99% 的时间里,异常发生在我的异常处理程序注册之后。现在解释了最后 1%,在 vert.x 处理异常时既没有注册处理程序,也没有注册异常处理程序...
这如何适用于最初的 rx-vertx 问题?
嗯...调用 response.toFlowable()
将 ReadStream
处理程序映射到 Flowable
即将到来的订阅。
但同样,对生成的 Flowable 的订阅发生在“收到响应事件”的范围之外。因此,事件可能在实际订阅之前就已经流动了!
最终,这是一个 vert.x 问题吗?
嗯...不,我误用了它。
但是当 ReadStream
终端方法 (endHandler/exceptionHandler) 已经被调用时,ReadStream
的 handler/endHandler/exceptionHandler 注册失败可能是件好事。
因此,我们永远不会永远等待 ReadStream
发布缓冲区,并且可以获得更清晰的异常消息!
我正在尝试在事件循环之外使用 HttpClientResponse.toFlowable()
的内容,使用 AtomicReference 来保存 Flowable。
所以,我设置了一个测试,returns 正文,然后在预告片解码期间失败。因此,我对 flowable 的订阅应该会收到响应正文,然后是错误。
这是我在之后(仍在回调中)订阅 Flowable 时观察到的结果,但在订阅回调之外的流时却观察不到。在后一种情况下,我的测试通过了 1/3 的时间...
The vertx-rx doc 明确表示它是一个热流,应该在事件循环中订阅或暂停。
所以,我尝试在订阅前暂停响应,但我得到了相同的测试结果:只有 1/3 的运行同时获得正文和异常。
此外,在查看 Vert.x HttpClientResponse::toFlowable()
实现时,我可以看到 readStream.pause()
无论如何都会生成。
所以我的问题是... 我是不是遗漏了什么?或者它可能是一个 Vert.x 错误?
这是我正在使用的代码:
package my.tests;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.net.NetServer;
public class VxTestRx3 {
static NetServer server;
static HttpClient client;
@BeforeAll
static void setUp() throws InterruptedException {
server = startServer();
client = Vertx.vertx()
.createHttpClient();
}
@RepeatedTest(1000)
void shouldReachOnNextAndOnError() throws Exception {
CountDownLatch bodyReadLatch = new CountDownLatch(1);
CountDownLatch responseReadyToRead = new CountDownLatch(1);
AtomicReference<Flowable<Buffer>> flowableRef = new AtomicReference<>();
AtomicInteger ab = new AtomicInteger(0);
client.rxRequest(HttpMethod.GET,
server.actualPort(),
"localhost",
"/")
.subscribe(request -> {
request.rxResponse()
.subscribe(response -> {
// even with "response.pause()" here, the test is failing.
flowableRef.set(response.toFlowable());
responseReadyToRead.countDown();
// If the Flowable is subscribed here : I get my onNext & onError called.
/*flowableRef.get()
.doOnComplete(bodyReadLatch::countDown)
.subscribe(buffer -> ab.addAndGet(1),
t -> {
ab.addAndGet(2);
bodyReadLatch.countDown();
});*/
});
Flowable.<Buffer>empty().subscribe(request.toSubscriber());
});
responseReadyToRead.await();
// If the Flowable is subscribed here : the behaviour is not stable.
flowableRef.get()
.doOnComplete(bodyReadLatch::countDown)
.subscribe(buffer -> ab.addAndGet(1),
t -> {
ab.addAndGet(2);
bodyReadLatch.countDown();
});
bodyReadLatch.await();
// Should have been in onNext then onError callbacks.
Assertions.assertThat(ab.get()).isEqualTo(3);
}
// This starts a server synchronously.
// It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
private static NetServer startServer() throws InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);
AtomicReference<NetServer> servRef = new AtomicReference<>();
Vertx.vertx().createNetServer()
.connectHandler(socket -> {
String content1 = "HTTP/1.1 200 OK\r\n"
+ "transfer-encoding: chunked\r\n"
+ "\r\n"
+ "2\r\n"
+ "OK"
+ "\r\n"
+ "0\r\n"
+ "é: trailerValue\r\n" // é is an invalid trailer name
+ "\r\n";
// Read the request content, then write content, then close the socket.
socket.handler(b -> socket.write(content1).subscribe(socket::close));
})
.listen()
.subscribe(netServer -> {
servRef.set(netServer);
serverReady.countDown();
});
serverReady.await();
return servRef.get();
}
}
好的,经过更多搜索,我发现了发生的事情。
简化
首先,我已经 re-created 测试删除了 rx-vert 的使用,并在套接字上添加了一个异常处理程序:
package org.tests;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.NetServer;
import io.vertx.core.streams.ReadStream;
public class VxTestNonRx {
static NetServer server;
static HttpClient client;
@BeforeAll
static void setUp() throws InterruptedException {
server = startServer();
client = Vertx.vertx().createHttpClient();
}
@RepeatedTest(10000)
void shouldReachOnNextAndOnError() throws Exception {
CountDownLatch bodyReadLatch = new CountDownLatch(1);
CountDownLatch responseReadyToRead = new CountDownLatch(1);
AtomicReference<ReadStream<Buffer>> readStreamRef = new AtomicReference<>();
AtomicInteger ab = new AtomicInteger(0);
client.request(HttpMethod.GET,
server.actualPort(),
"localhost",
"/")
.compose(HttpClientRequest::send)
.onSuccess(response -> {
// Catches exceptions when they happen before we subscribed to the ReadStream
response.netSocket().exceptionHandler(t -> System.out.println("NET EXCEPTION HANDLER" + t.getMessage()));
//response.pause(); // Try to pause FIRST to avoid skipping buffers
// When handlers are set here :
// - with pause(): 100% times exceptionHandler only.
// - without pause(): 100% times : handler, then exceptionHandler
readStreamRef.set(response);
responseReadyToRead.countDown();
});
responseReadyToRead.await();
// When handlers are set here :
// - without pause() :
// - 66% times : handler, then exceptionHandler
// - 33% times : only exceptionHandler gets called.
// - with pause() :
// - 99% times : only onError gets called
// - 1% times : netSocket().exceptionHandler, then handler gets called. (but neither endHandler nor exceptionHandler are called !)
readStreamRef.get()
.handler(buffer -> {
System.out.println("HANDLER");
ab.addAndGet(1);
})
.endHandler(unused -> {
System.out.println("END HANDLER");
bodyReadLatch.countDown();
})
.exceptionHandler(t -> {
System.out.println("ERROR HANDLER");
ab.addAndGet(2);
bodyReadLatch.countDown();
});
readStreamRef.get().resume();
bodyReadLatch.await();
// Should have been in onNext then onError callbacks.
Assertions.assertThat(ab.get()).isEqualTo(3);
}
// This starts a server synchronously.
// It serves a static HTTP response with an illegal trailer name (making Vert.x fail)
private static NetServer startServer() throws InterruptedException {
CountDownLatch serverReady = new CountDownLatch(1);
AtomicReference<NetServer> servRef = new AtomicReference<>();
Vertx.vertx().createNetServer()
.connectHandler(socket -> {
String content1 = "HTTP/1.1 200 OK\r\n"
+ "transfer-encoding: chunked\r\n"
+ "\r\n"
+ "2\r\n"
+ "OK"
+ "\r\n"
+ "0\r\n"
+ "é: trailerValue\r\n" // é is an invalid trailer name
+ "\r\n";
// Read the request content, then write content, then close the socket.
socket.handler(b -> socket.write(content1, ar -> socket.close()));
})
.listen()
.onSuccess(netServer -> {
servRef.set(netServer);
serverReady.countDown();
});
serverReady.await();
return servRef.get();
}
观察
正如 java 评论中所写,这是我的发现:
当
ReadStream
处理程序在收到响应后立即设置时:- 当
pause()
首先读取 ReadStream 时:我没有得到任何缓冲区,ReadStream.exceptionHandler
总是被调用。 - 没有
pause()
:我总是在ReadStream.handler
中接收正文缓冲区,然后ReadStream.exceptionHandler
被调用。
- 当
当在接收到的响应回调上下文之外设置处理程序时:
- 当
pause()
首先读取 ReadStream 时(仍在回调上下文中):- 99% 的时间:只有
ReadStream.exceptionHandler
被调用 - 1% 的时间:
netSocket().exceptionHandler
被调用,然后处理程序被调用,但ReadStream.endHandler
和ReadStream.exceptionHandler
都没有被调用,流“永无止境”!!
- 99% 的时间:只有
- 没有
pause()ing
:- 66% 的时间:
ReadStream.handler
被调用,然后是ReadStream.exceptionHandler
. - 33% 的时间:只有
ReadStream.exceptionHandler
被调用。
- 66% 的时间:
- 当
解释
直接在“收到响应”回调中设置处理程序时,行为稳定,这有助于理解 ReadStream.pause()
的工作原理。
暂停时,Vert.x不会停止阅读回复内容。它将缓冲 ByteBuffer 以便能够在 ReadStream.fetch(x)
或 ReadStream.resume()
上为它们提供服务。
但是当它遇到错误时(在我的例子中,在读取预告片时,在正文缓冲之后)它仍然会调用设置为 ReadStream.exceptionHandler
的回调。在那种情况下,正文没有被读取(因为暂停),但是 pre-defined exceptionHandler 被调用了。
但是,如果没有任何 ReadStream.pause()
,一旦“收到响应”事件结束,请求的缓冲区(这是另一个 eventloop 事件)将直接发送到已注册的 ReadStream.handler
,然后 vert.x 会发现错误并调用注册的 ReadStream.exceptionHandler
.
这很好地解释了我的稳定案例。现在,让我们解释一下不稳定的!
在“收到响应”承诺范围之外设置 ReadStream
的处理程序时,可能会错过一些事件。这很简单,当 vert.x 超出“收到响应”范围时,即事件结束,然后:
- Vert.x 处理缓冲区(或跳过它们并在暂停时保存它们)作为其他事件的一部分,可能在同一事件循环中,但
ReadStream.handler
注册是在上下文之外进行的这次 !那是另一个线程,这段代码可能还没有 运行 !因此发送到 ReadStream 的缓冲区将丢失。 - 更糟糕的是,当发现尾部错误时,如果尚未设置异常处理程序,Vert.x 将退回到它可以获得的下一个异常处理程序:TCP 连接一(
response.netSocket().exceptionHandler
), 或者如果都没有设置...事件循环异常处理程序,等等。所以错误是......丢失,即使在暂停流时也是如此。
这解释了为什么在“响应接收事件”上下文之外定义我的处理程序时,我注册的 ReadStream.handler
并不总是被调用(缓冲区事件已在注册之前发送)。但幸运的是,在 99% 的时间里,异常发生在我的异常处理程序注册之后。现在解释了最后 1%,在 vert.x 处理异常时既没有注册处理程序,也没有注册异常处理程序...
这如何适用于最初的 rx-vertx 问题?
嗯...调用 response.toFlowable()
将 ReadStream
处理程序映射到 Flowable
即将到来的订阅。
但同样,对生成的 Flowable 的订阅发生在“收到响应事件”的范围之外。因此,事件可能在实际订阅之前就已经流动了!
最终,这是一个 vert.x 问题吗?
嗯...不,我误用了它。
但是当 ReadStream
终端方法 (endHandler/exceptionHandler) 已经被调用时,ReadStream
的 handler/endHandler/exceptionHandler 注册失败可能是件好事。
因此,我们永远不会永远等待 ReadStream
发布缓冲区,并且可以获得更清晰的异常消息!