RxJava - 调用 Observables 方法时的非确定性行为

RxJava - Non Deterministic behaviour when calling Observables methods

我对以下代码有疑问:

public void foo(List<Player> players_list) {

    JsonArray playersArr = new JsonArray();

    rx.Observable.from(players_list) // assume players_list is a list contains 2 players
    .concatMap(player -> {

    return getUser(player.getUserId()) // get entity 'User' from DB
    .flatMap(userObj -> {
        User user = ...;

        playersArr.add(new JsonObject()
            .putString("uid", player.getExtUserId())
        );

            return rx.Observable.just(playersArr);
        });
    }).subscribe(playersObj -> {

    }, (exception) -> {
        log.error("", exception);

    }, () -> {

        // at this point I expect 'playersArr' to consist 2 entries!
    });
}

当 运行 这段代码似乎它的输出是不确定的,这意味着 - 大多数时候我在 JsonArray 中得到 2 个条目的有效结果,但有时我得到 1 个条目。

我想知道为什么??

编辑:
我尝试从 .flatMap --> .concatMap 切换,它似乎解决了问题,但我不确定它是否真的是一个好的解决方案。

我假设 playerArrs 不是线程安全的,如果你的 Observable 是在异步上下文中执行的,也许 playerArrs 可能会跳过添加调用。

您的 Observable 有一些副作用,您是否更新了存在于 Observable 之外的对象。为避免这种情况,您可以通过 Observable 构建 JsonArray

的方式更新代码
public void foo(List<Player> players_list) {

    rx.Observable.from(players_list) // assume players_list is a list contains 2 players
                 .concatMap(player -> getUser(player.getUserId()) // get entity 'User' from DB
                 .map(userObj -> new JsonObject().putString("uid", player.getExtUserId())
                .reduce(new JsonArray(), (seed, acu) -> seed.add(acu))) // build the JsonArray
                .subscribe(playersObj -> { }, 
                           (exception) -> { log.error("", exception); }, 
                          () -> {
    // at this point I expect 'playersArr' to consist 2 entries!
                 });
}

我不确定我的代码,但我认为它应该接近您的需要。