有没有办法缓存 Single 的结果或避免在多个订阅者调用时多次执行?

Is there a way to cache the result of a Single or avoid it being executed multiple times when called with multiple subscribers?

我有一个 Single 可以拨打 REST 电话。这个 Single 最多可以同时被调用 10 次,最终会产生 10 个对同一事物的请求。我看不出有什么方法可以使 Single 变热,我尝试在返回 Single 时使用 cache() 无济于事。

我怎样才能做到这一点?

以下是我调用的逻辑:

fun getUser(userID: UUID): Single<User> {
        if (userCache.containsUser(userID)) {
            // Just return the value already saved in the cache
            return Single.create {
                it.onSuccess(getUserFromCache(userID))
            }
        } else {
            // Make rest call, add user in cache, and then return that user
            return Single.fromCallable {
                val user = getUserFromRest(userID).blockingGet()

                userCache.addUser(user)

                return@fromCallable user
            }.cache()
        }
    }

我更新了之前的代码来实现缓存:

     private var ongoingRequests: HashMap<UUID, Single<User>> = HashMap()
     fun getUser(userID: UUID): Single<User> {
        if (userCache.containsUser(userID)) {
            return Single.create {
                it.onSuccess(getUserFromCache(userID))
            }
        } else if (ongoingRequests.containsKey(userID)) {
            return ongoingRequests[userID]!!
        } else {
            val request =  Single.create<User> {
                getUserFromRest(userID).subscribe(
                    { user ->
                        userCache.addUser(user)
                        ongoingRequests.remove(userID)
                        it.onSuccess(user)
                    },
                    {
                    }
                )
            }.cache()

            ongoingRequests[userID] = request

            return request
        }
    }

你的代码有两个问题。首先,检查用户存在性会在您调用 getUser 方法时运行,而不是在您订阅时运行。

在类似情况下,您可以使用 Single.deferred 方法 return 延迟实例。

在这种情况下,我猜你不需要。如果你正在使用 Single.cache,你也可以放弃缓存逻辑,因为 Rx 会这样做。

private val shared: MutableMap<UUID, Single<User>> = mutableMapOf()

fun getUser(userID: UUID): Single<User> {

    return shared.getOrPut(userID, {
        return Single.fromCallable {
            val user = getUserFromRest(userID).blockingGet()

            userCache.addUser(user)

            return@fromCallable user
        }.cache()
    })
}

我在java中写了代码来说明这个想法(我不是kotlin开发者,抱歉)

缓存长处理调用的结果

Single<User> getUser(String userId) {
        return Single.<String>create(emitter -> {
            Thread.sleep(1000); // call the rest
            emitter.onSuccess(new User());
        }).cache();
    }

然后你当然可以使用地图来重用结果:

Map<String, Single<User>> ongoingRequests = new HashMap<>();
Single<User> call = ongoingRequests.computeIfAbsent(userId, this::getUser);
call.subscribe()

call执行一次然后缓存。