如何在异步模式下使用 Netflix ObservableResult 和 rxJava
How to use Netflix ObservableResult and rxJava within Asynchronous mode
我曾尝试使用 netflix observable,但我只能同步使用:
我是这样定义远程调用的:
@Named
public class BroConsumerService {
..
@HystrixCommand(fallbackMethod = "stubbedMethod")
public Observable<String> executeObservableBro(String name) {
return new ObservableResult<String>() {
@Override
public String invoke() {
return executeRemoteService(name);
}
};
}
private String stubbedMethod(String name) {
return "return stubbed";
}
//这里我实际上是在调用(并观察这个方法)
@RequestMapping("/executeObservableBro")
public String executeObservableBro(@RequestParam(value = "name", required = false) String name) throws ExecutionException, InterruptedException {
Observable<String> result= broConsumerService.executeObservableBro(name);
result.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.printf(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println("on next..");
}
});
}
但这是同步进行的。我希望在执行之前能够 "listen" 到 executeObservableBro。每次执行时都会收到通知。
示例将不胜感激。
谢谢,
雷.
您必须在 subscribeOn 方法中提供调度程序,例如:
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable2 = Observable.create(subscriber->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Arrays.asList(1, 2, 3).forEach((value)-> subscriber.onNext(value));
subscriber.onCompleted();
subscriber.onError(new RuntimeException("error"));
});
System.out.println("Before");
observable2
.subscribeOn(Schedulers.io()).subscribe(
(next) -> log.info("Next element {}", next),
(error) -> log.error("Got exception", error),
() -> log.info("Finished")//on complete
);
System.out.println("After");
//Thread.sleep(5000); //uncomment this to wait for subscriptions, otherwise main will quit
}
默认情况下它不是异步的:)
我曾尝试使用 netflix observable,但我只能同步使用:
我是这样定义远程调用的:
@Named
public class BroConsumerService {
..
@HystrixCommand(fallbackMethod = "stubbedMethod")
public Observable<String> executeObservableBro(String name) {
return new ObservableResult<String>() {
@Override
public String invoke() {
return executeRemoteService(name);
}
};
}
private String stubbedMethod(String name) {
return "return stubbed";
}
//这里我实际上是在调用(并观察这个方法)
@RequestMapping("/executeObservableBro")
public String executeObservableBro(@RequestParam(value = "name", required = false) String name) throws ExecutionException, InterruptedException {
Observable<String> result= broConsumerService.executeObservableBro(name);
result.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.printf(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println("on next..");
}
});
}
但这是同步进行的。我希望在执行之前能够 "listen" 到 executeObservableBro。每次执行时都会收到通知。
示例将不胜感激。
谢谢, 雷.
您必须在 subscribeOn 方法中提供调度程序,例如:
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable2 = Observable.create(subscriber->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Arrays.asList(1, 2, 3).forEach((value)-> subscriber.onNext(value));
subscriber.onCompleted();
subscriber.onError(new RuntimeException("error"));
});
System.out.println("Before");
observable2
.subscribeOn(Schedulers.io()).subscribe(
(next) -> log.info("Next element {}", next),
(error) -> log.error("Got exception", error),
() -> log.info("Finished")//on complete
);
System.out.println("After");
//Thread.sleep(5000); //uncomment this to wait for subscriptions, otherwise main will quit
}
默认情况下它不是异步的:)