使用 RxJava 并行调用网络服务。这是正确的方法吗?
Calling network services in parallel using RxJava. Is this the right way?
想法是并行进行 3 个网络调用。 (我使用 Google 作为演示目的的服务。以下工作但不确定这是正确的方法还是可以简化。如果我必须结合所有三个搜索的响应,我该怎么办? 请指教
public class GoogleSearchRx
{
public static void main(String args[])
{
CountDownLatch latch = new CountDownLatch(3);
search("RxJava").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
//run the last one on current thread
search("Erik Meijer").subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
public static Observable<Elements> search(String q)
{
String google = "http://www.google.com/search?q=";
String charset = "UTF-8";
String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!
return Observable.create(new Observable.OnSubscribe<Elements>()
{
@Override public void call(Subscriber<? super Elements> subscriber)
{
out.println(currentThreadName() + "\tOnSubscribe.call");
try
{
Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
subscriber.onNext(links);
}
catch (IOException e)
{
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
}
按照您问题的 "combine the responses of all the three searches" 部分,您可能正在寻找 Zip.
Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
new Func3<Elements, Elements, Elements, Elements>() {
@Override
public Elements call(Elements result1, Elements result2, Elements result3) {
// Add all the results together...
return results;
}
}
).subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
这假设您希望同时处理所有结果(在订阅者中,此处)而不关心给定结果使用了哪个查询。
请注意,zip
函数有不同版本,从 1..N 个可观察对象,Func1
到 Func9
或 FuncN
,允许您压缩一个特定或任意数量的可观察量。
这是另一种处理整个过程(包括 Jsoup 调用)的方法,避免了任何倒计时闩锁,并提供了一种避免必须使用 Observable.create
的方法(因为让 Rx 运算符更容易处理所有订户管理事宜!)
("back-of-napkin" 代码,可能需要一些微调才能编译。)
final String google = "http://www.google.com/search?q=";
final String charset = "UTF-8";
final String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // ...
Observable.just("RxJava", "Reactive Extensions", "Erik Meijer")
.flatMap((query) -> Observable.defer(() -> {
try {
return Observable.from(Jsoup.connect(google + URLEncoder.encode(query, charset))
.timeout(1000)
.userAgent(userAgent)
.get()
.select("li.g>h3>a")).subscribeOn(Schedulers.io());
} catch (IOException e) {
throw new RuntimeException(e);
}
}))
.forEach(
(link) -> out.println(link.text()),
(e) -> out.println("Failed: " + e.getMessage()));
请注意,在您的原始示例中,无法保证排序。一种处理方法是 toSortedList
,它要么期望实现 Comparable
的项目的 Observable,要么期望 Func2
提供元素之间的比较。
想法是并行进行 3 个网络调用。 (我使用 Google 作为演示目的的服务。以下工作但不确定这是正确的方法还是可以简化。如果我必须结合所有三个搜索的响应,我该怎么办? 请指教
public class GoogleSearchRx
{
public static void main(String args[])
{
CountDownLatch latch = new CountDownLatch(3);
search("RxJava").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
//run the last one on current thread
search("Erik Meijer").subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
public static Observable<Elements> search(String q)
{
String google = "http://www.google.com/search?q=";
String charset = "UTF-8";
String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!
return Observable.create(new Observable.OnSubscribe<Elements>()
{
@Override public void call(Subscriber<? super Elements> subscriber)
{
out.println(currentThreadName() + "\tOnSubscribe.call");
try
{
Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
subscriber.onNext(links);
}
catch (IOException e)
{
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
}
按照您问题的 "combine the responses of all the three searches" 部分,您可能正在寻找 Zip.
Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
new Func3<Elements, Elements, Elements, Elements>() {
@Override
public Elements call(Elements result1, Elements result2, Elements result3) {
// Add all the results together...
return results;
}
}
).subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);
这假设您希望同时处理所有结果(在订阅者中,此处)而不关心给定结果使用了哪个查询。
请注意,zip
函数有不同版本,从 1..N 个可观察对象,Func1
到 Func9
或 FuncN
,允许您压缩一个特定或任意数量的可观察量。
这是另一种处理整个过程(包括 Jsoup 调用)的方法,避免了任何倒计时闩锁,并提供了一种避免必须使用 Observable.create
的方法(因为让 Rx 运算符更容易处理所有订户管理事宜!)
("back-of-napkin" 代码,可能需要一些微调才能编译。)
final String google = "http://www.google.com/search?q=";
final String charset = "UTF-8";
final String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // ...
Observable.just("RxJava", "Reactive Extensions", "Erik Meijer")
.flatMap((query) -> Observable.defer(() -> {
try {
return Observable.from(Jsoup.connect(google + URLEncoder.encode(query, charset))
.timeout(1000)
.userAgent(userAgent)
.get()
.select("li.g>h3>a")).subscribeOn(Schedulers.io());
} catch (IOException e) {
throw new RuntimeException(e);
}
}))
.forEach(
(link) -> out.println(link.text()),
(e) -> out.println("Failed: " + e.getMessage()));
请注意,在您的原始示例中,无法保证排序。一种处理方法是 toSortedList
,它要么期望实现 Comparable
的项目的 Observable,要么期望 Func2
提供元素之间的比较。