带有顶点的 RxJava:不能有多个订阅异常

RxJava with vertx: can't have multiple subscriptions exception

我正在尝试使用 RxJava 避免顶点回调地狱。 但是我有“rx.exceptions.OnErrorNotImplementedException: Cannot have multiple subscriptions”。这里有什么问题?

public class ShouldBeBetterSetter extends AbstractVerticle {
    @Override
    public void start(Future<Void> startFuture) throws Exception {
        Func1<AsyncMap<String,Long>, Observable<Void>> obtainAndPutValueToMap = stringLongAsyncMap -> {
            Long value = System.currentTimeMillis();
            return stringLongAsyncMap.putObservable("timestamp", value)
                .doOnError(Throwable::printStackTrace)
                .doOnNext(aVoid -> System.out.println("succesfully putted"));
        };

        Observable<AsyncMap<String,Long>> clusteredMapObservable =
                vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
                        .doOnError(Throwable::printStackTrace);

        vertx.periodicStream(3000).toObservable()
                .flatMap(l-> clusteredMapObservable.flatMap(obtainAndPutValueToMap))
                .forEach(o -> {
                    System.out.println("just printing.");
                });
    }
}

Working Verticle(没有 Rx)可以在这里找到: https://gist.github.com/IvanZelenskyy/9d50de8980b7bdf1e959e19593f7ce4a

vertx.sharedData().getClusterWideMapObservable("mymap") returns 可观察,仅支持单个订阅者 - 因此例外。一个值得一试的解决方案是:

Observable<AsyncMap<String,Long>> clusteredMapObservable =
            Observable.defer(
            () -> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
 );

这样每次调用 clusteredMapObservable.flatMap() 时,它都会订阅 Observable.defer() 返回的新观察值。

编辑

如果可以使用相同的 AsyncMap,如@Ivan Zelenskyy 所指出的,解决方案可以是

Observable<AsyncMap<String,Long>> clusteredMapObservable =   
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap").cache()

发生的事情是,在每次定期发射时,foreach 都会重新订阅您在上面定义的 clusteredMapObservable 变量。

要修复,只需将调用移至定期流 flatmap 内的 vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")

像这样:

vertx.periodicStream(3000).toObservable()
                .flatMap(l-> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
                        .doOnError(Throwable::printStackTrace)
                        .flatMap(obtainAndPutValueToMap))
                .forEach(o -> {
                    System.out.println("just printing.");
                });

更新

如果您不喜欢 lambda 中的 labmda,那就不要。这是没有

的更新
vertx.periodicStream(3000).toObservable()
                    .flatMap(l-> {
                        return vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap");
                    })
                    .doOnError(Throwable::printStackTrace)
                    .flatMap(obtainAndPutValueToMap)
                    .forEach(o -> {
                        System.out.println("just printing.");
                    });

PS - 您对 .flatMap(obtainAndPutValueToMap)) 的调用也是 lambda 中的 lambda - 您只是将其移入函数中。