带有顶点的 RxJava:不能有多个订阅异常
RxJava with vertx: can't have multiple subscriptions exception
我正在尝试使用 RxJava 避免顶点回调地狱。
但是我有“rx.exceptions.OnErrorNotImplementedException: Cannot have multiple subscriptions
”。这里有什么问题?
public class ShouldBeBetterSetter extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
Func1<AsyncMap<String,Long>, Observable<Void>> obtainAndPutValueToMap = stringLongAsyncMap -> {
Long value = System.currentTimeMillis();
return stringLongAsyncMap.putObservable("timestamp", value)
.doOnError(Throwable::printStackTrace)
.doOnNext(aVoid -> System.out.println("succesfully putted"));
};
Observable<AsyncMap<String,Long>> clusteredMapObservable =
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
.doOnError(Throwable::printStackTrace);
vertx.periodicStream(3000).toObservable()
.flatMap(l-> clusteredMapObservable.flatMap(obtainAndPutValueToMap))
.forEach(o -> {
System.out.println("just printing.");
});
}
}
Working Verticle(没有 Rx)可以在这里找到:
https://gist.github.com/IvanZelenskyy/9d50de8980b7bdf1e959e19593f7ce4a
vertx.sharedData().getClusterWideMapObservable("mymap") returns 可观察,仅支持单个订阅者 - 因此例外。一个值得一试的解决方案是:
Observable<AsyncMap<String,Long>> clusteredMapObservable =
Observable.defer(
() -> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
);
这样每次调用 clusteredMapObservable.flatMap() 时,它都会订阅 Observable.defer() 返回的新观察值。
编辑
如果可以使用相同的 AsyncMap,如@Ivan Zelenskyy 所指出的,解决方案可以是
Observable<AsyncMap<String,Long>> clusteredMapObservable =
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap").cache()
发生的事情是,在每次定期发射时,foreach
都会重新订阅您在上面定义的 clusteredMapObservable
变量。
要修复,只需将调用移至定期流 flatmap
内的 vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
。
像这样:
vertx.periodicStream(3000).toObservable()
.flatMap(l-> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
.doOnError(Throwable::printStackTrace)
.flatMap(obtainAndPutValueToMap))
.forEach(o -> {
System.out.println("just printing.");
});
更新
如果您不喜欢 lambda 中的 labmda,那就不要。这是没有
的更新
vertx.periodicStream(3000).toObservable()
.flatMap(l-> {
return vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap");
})
.doOnError(Throwable::printStackTrace)
.flatMap(obtainAndPutValueToMap)
.forEach(o -> {
System.out.println("just printing.");
});
PS - 您对 .flatMap(obtainAndPutValueToMap))
的调用也是 lambda 中的 lambda - 您只是将其移入函数中。
我正在尝试使用 RxJava 避免顶点回调地狱。
但是我有“rx.exceptions.OnErrorNotImplementedException: Cannot have multiple subscriptions
”。这里有什么问题?
public class ShouldBeBetterSetter extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
Func1<AsyncMap<String,Long>, Observable<Void>> obtainAndPutValueToMap = stringLongAsyncMap -> {
Long value = System.currentTimeMillis();
return stringLongAsyncMap.putObservable("timestamp", value)
.doOnError(Throwable::printStackTrace)
.doOnNext(aVoid -> System.out.println("succesfully putted"));
};
Observable<AsyncMap<String,Long>> clusteredMapObservable =
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
.doOnError(Throwable::printStackTrace);
vertx.periodicStream(3000).toObservable()
.flatMap(l-> clusteredMapObservable.flatMap(obtainAndPutValueToMap))
.forEach(o -> {
System.out.println("just printing.");
});
}
}
Working Verticle(没有 Rx)可以在这里找到: https://gist.github.com/IvanZelenskyy/9d50de8980b7bdf1e959e19593f7ce4a
vertx.sharedData().getClusterWideMapObservable("mymap") returns 可观察,仅支持单个订阅者 - 因此例外。一个值得一试的解决方案是:
Observable<AsyncMap<String,Long>> clusteredMapObservable =
Observable.defer(
() -> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
);
这样每次调用 clusteredMapObservable.flatMap() 时,它都会订阅 Observable.defer() 返回的新观察值。
编辑
如果可以使用相同的 AsyncMap,如@Ivan Zelenskyy 所指出的,解决方案可以是
Observable<AsyncMap<String,Long>> clusteredMapObservable =
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap").cache()
发生的事情是,在每次定期发射时,foreach
都会重新订阅您在上面定义的 clusteredMapObservable
变量。
要修复,只需将调用移至定期流 flatmap
内的 vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
。
像这样:
vertx.periodicStream(3000).toObservable()
.flatMap(l-> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
.doOnError(Throwable::printStackTrace)
.flatMap(obtainAndPutValueToMap))
.forEach(o -> {
System.out.println("just printing.");
});
更新
如果您不喜欢 lambda 中的 labmda,那就不要。这是没有
的更新vertx.periodicStream(3000).toObservable()
.flatMap(l-> {
return vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap");
})
.doOnError(Throwable::printStackTrace)
.flatMap(obtainAndPutValueToMap)
.forEach(o -> {
System.out.println("just printing.");
});
PS - 您对 .flatMap(obtainAndPutValueToMap))
的调用也是 lambda 中的 lambda - 您只是将其移入函数中。