Single.zip - 如何捕捉失败的呼叫并继续其余的网络呼叫?

Single.zip - How to catch failed call and continue the rest of network calls?

我正在进行 5 次并行网络调用,模拟其中 4 次成功,其中 1 次失败。

失败的调用导致整个Single.zip()失败,我无法获得其他4个网络调用的结果,即使它们已经成功。

如何处理 Single.zip() 中单个失败的网络调用的错误并获得成功的结果?

private Single<BigInteger> createNetworkCall(){
        return Single.fromCallable(() -> {
            
            return service.getBalance("validaddress").execute();
        }).subscribeOn(Schedulers.io());
}
private Single<BigInteger> createFailedNetworkCall(){
        return Single.fromCallable(() -> {
            
            return service.getBalance("invalidaddress").execute();
        }).subscribeOn(Schedulers.io());
}
private void makeParallelCalls(){
        List<Single<BigInteger>> iterable = new ArrayList<>();
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createFailedNetworkCall());

        Single.zip(iterable, (results) -> {
            Log.d(TAG, "makeParallelCalls: " + Arrays.toString(results));
            return results;
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(results-> {
                    Log.d(TAG, "onSuccess: makeParallelCalls: " + results);

                }, (exception) -> {
                    Log.e(TAG, "onError: makeParallelCalls", exception);

                });
}

我对 RxJava 很陌生,所以我不知道是否有更好的解决方案,或者这是否是一个非常糟糕的做法,但我在 [=15= 上使用了 onErrorReturn 回调].

如果有人有更好的解决方案,请分享,以便我标记你的正确!

public static final int INVALID_USER_ADDRESS = -101;
private Single<BigInteger> createNetworkCall(){
        return Single.fromCallable(() -> {
            
            return service.getBalance("validaddress").execute();
        }).subscribeOn(Schedulers.io());
}
private Single<BigInteger> createFailedNetworkCall(){
        return Single.fromCallable(() -> {
            
            return service.getBalance("invalidaddress").execute();
        }).subscribeOn(Schedulers.io()).subscribeOn(Schedulers.io()).onErrorReturn(throwable -> {
            if(throwable instanceof IllegalArgumentException){
                return BigInteger.valueOf(INVALID_USER_ADDRESS);
            }else{
                return BigInteger.valueOf(-100);
            }
        });;
}
private void makeParallelCalls(){
        List<Single<BigInteger>> iterable = new ArrayList<>();
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createNetworkCall());
        iterable.add(createFailedNetworkCall());

        Single.zip(iterable, (results) -> {
            Log.d(TAG, "makeParallelCalls: " + Arrays.toString(results));
            return results;
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(results-> {

                    for(int i = 0; i < results.length; i++){
                        if(results[i].equals(INVALID_USER_ADDRESS)){
                            //Handle error here
                            
                        }else{
                            //Handle success value here
                            
                        }
                    }
                }, (exception) -> {
                    Log.e(TAG, "onError: makeParallelCalls", exception);

                });
}

.subscribe(results-> {

returns:

[2583397195825000000000, 2583397195825000000000, 2583397195825000000000, 2583397195825000000000, -101]

捕获异常,不允许错误中断Single.zip。

例如,在您的请求工厂中,return 可选而不是响应。

private Single<Optional<Long>> performNetworkCall(int n){
    return Single.fromCallable(() -> {
        if (n % 2 == 0) {
            throw new Exception("failed call");
        }
        return 0L;
    }).subscribeOn(Schedulers.io())
    .map(Optional::of)
    .onErrorReturnItem(Optional.empty());
    // or .onErrorReturn(exception -> Optional.empty());
}

private void makeParallelCalls(){
    List<Single<BigInteger>> iterable = new ArrayList<>();
    iterable.add(performNetworkCall(1));
    iterable.add(performNetworkCall(2));
    iterable.add(performNetworkCall(3));
    iterable.add(performNetworkCall(4));
    iterable.add(performNetworkCall(5));

    Single.zip(iterable, (results) -> {
                for (Object result : results) {
                    var optional = (Optional<Integer>)result;
                    if (optional.isEmpty()) {
                        // this one failed, no data
                    } else {
                        var response = optional.get();
                    }
                }
                return results;
            });
}

您可以将 Optional 替换为包含附加信息的自定义 class,例如导致失败的异常或错误信息。

如果您有恢复选项,则可以使用 .onErrorResumeNext