如何在 RxJava Vert.x 中结束链式 http 请求?
How to end chained http requests in RxJava Vert.x?
如何结束 Rx 中的链式请求 Vert.X?
HttpClient client = Vertx.vertx().createHttpClient();
HttpClientRequest request = client.request(HttpMethod.POST,
"someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData);
request.toObservable().
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> { //something
return httpClientResponse.toObservable();
}).
map(buffer -> {return buffer.toString()}).
//flatmap data -> Observable<HttpClientResponse>
flatMap(postData -> client.request(HttpMethod.POST,
someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()).
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> {
return httpClientResponse.toObservable();
})......//other operators
request.end();
请注意,我有 .end()
作为最高请求。如何结束 .flatmap
内的请求?我什至需要结束它吗?
有多种方法可以确保调用request.end()
。但我会深入研究 Vert.x 的文档,或者如果有的话,只打开源代码,看看它是否会为你调用 end() 。否则一个可能是
final HttpClientRequest request = ...
request.toObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
request.end();
}
});
我想你可以像下面的代码那样做。
主要思想是不要直接使用 Vertx
客户端获得的 HttpClientRequest
。相反,您创建另一个可在收到第一个订阅后立即调用 end()
的 flowable。
在这里,例如,您可以通过一对自定义方法获取请求:在本例中为 request1()
和 request2()
。他们都使用 doOnSubscribe()
来触发你需要的 end()
。 Read its description on the ReactiveX page.
这个考试使用了vertx和reactivex,希望你能使用这个设置。
import io.reactivex.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.core.http.HttpClientResponse;
import org.junit.Test;
public class Whosebug {
@Test public void test(){
Buffer jsonData = Buffer.buffer("..."); // the json data.
HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client.
request1(client)
.flatMap(httpClientResponse -> httpClientResponse.toFlowable())
.map(buffer -> buffer.toString())
.flatMap(postData -> request2(client, postData) )
.forEach( httpResponse -> {
// do something with returned data);
});
}
private Flowable<HttpClientResponse> request1(HttpClient client) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
private Flowable<HttpClientResponse> request2(HttpClient client, String postData) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
// do something with postData
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
}
如何结束 Rx 中的链式请求 Vert.X?
HttpClient client = Vertx.vertx().createHttpClient();
HttpClientRequest request = client.request(HttpMethod.POST,
"someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(jsonData.length())).write(jsonData);
request.toObservable().
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> { //something
return httpClientResponse.toObservable();
}).
map(buffer -> {return buffer.toString()}).
//flatmap data -> Observable<HttpClientResponse>
flatMap(postData -> client.request(HttpMethod.POST,
someURL")
.putHeader("content-type", "application/x-www-form-urlencoded")
.putHeader("content-length", Integer.toString(postData.length())).write(postData).toObservable()).
//flatmap HttpClientResponse -> Observable<Buffer>
flatMap(httpClientResponse -> {
return httpClientResponse.toObservable();
})......//other operators
request.end();
请注意,我有 .end()
作为最高请求。如何结束 .flatmap
内的请求?我什至需要结束它吗?
有多种方法可以确保调用request.end()
。但我会深入研究 Vert.x 的文档,或者如果有的话,只打开源代码,看看它是否会为你调用 end() 。否则一个可能是
final HttpClientRequest request = ...
request.toObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
request.end();
}
});
我想你可以像下面的代码那样做。
主要思想是不要直接使用 Vertx
客户端获得的 HttpClientRequest
。相反,您创建另一个可在收到第一个订阅后立即调用 end()
的 flowable。
在这里,例如,您可以通过一对自定义方法获取请求:在本例中为 request1()
和 request2()
。他们都使用 doOnSubscribe()
来触发你需要的 end()
。 Read its description on the ReactiveX page.
这个考试使用了vertx和reactivex,希望你能使用这个设置。
import io.reactivex.Flowable;
import io.vertx.core.http.HttpMethod;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpClient;
import io.vertx.reactivex.core.http.HttpClientRequest;
import io.vertx.reactivex.core.http.HttpClientResponse;
import org.junit.Test;
public class Whosebug {
@Test public void test(){
Buffer jsonData = Buffer.buffer("..."); // the json data.
HttpClient client = Vertx.vertx().createHttpClient(); // the vertx client.
request1(client)
.flatMap(httpClientResponse -> httpClientResponse.toFlowable())
.map(buffer -> buffer.toString())
.flatMap(postData -> request2(client, postData) )
.forEach( httpResponse -> {
// do something with returned data);
});
}
private Flowable<HttpClientResponse> request1(HttpClient client) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
private Flowable<HttpClientResponse> request2(HttpClient client, String postData) {
HttpClientRequest request = client.request(HttpMethod.POST,"someURL");
// do something with postData
return request
.toFlowable()
.doOnSubscribe( subscription -> request.end() );
}
}