如何避免 Kotlin 协程的并发问题?

How to avoid concurrency issues with Kotlin coroutines?

我将在 android 应用程序中实现聊天功能。为此,我通过协程流每五秒从服务器获取一次聊天消息。问题是当我想发送一条消息时,服务器有时会收到两个并发请求并且 returns 出现错误。我应该如何确保这些请求 运行 在我的聊天存储库中顺序排列?这是我的聊天库:

class ChatRepositoryImpl @Inject constructor(
    private val api: ApolloApi,
    private val checkTokenIsSetDataStore: CheckTokenIsSetDataStore
) : ChatRepository {

    override fun chatMessages(
        lastIndex: Int,
        limit: Int,
        offset: Int,
        channelId: Int,
    ): Flow<Resource<ChatMessages>> = flow {
        var token = ""
        checkTokenIsSetDataStore.get.first {
            token = it
            true
        }
        while (true) {
            val response = ChatMessagesQuery(
                lastIndex = Input.fromNullable(lastIndex),
                limit = Input.fromNullable(limit),
                offset = Input.fromNullable(offset),
                channelId
            ).let {
                api.getApolloClient(token)
                    .query(it)
                    .await()
            }

            response.data?.let {
                emit(
                    Resource.Success<ChatMessages>(
                        it.chatMessages
                    )
                )
            }
            if (response.data == null)
                emit(Resource.Error<ChatMessages>(message = response.errors?.get(0)?.message))
            delay(5000L)
        }
    }.flowOn(Dispatchers.IO)

    override fun chatSendText(channelId: Int, text: String): Flow<Resource<ChatSendText>> = flow {
        var token = ""
        checkTokenIsSetDataStore.get.first {
            token = it
            true
        }

        val response = ChatSendTextMutation(
            channelId = channelId,
            text = text
        ).let {
            api.getApolloClient(token)
                .mutate(it)
                .await()
        }

        response.data?.let {
            return@flow emit(
                Resource.Success<ChatSendText>(
                    it.chatSendText
                )
            )
        }
        return@flow emit(Resource.Error<ChatSendText>(message = response.errors?.get(0)?.message))
    }.flowOn(Dispatchers.IO)
}

限制并发的一种方法是使用像 Mutex or Semaphore 这样的实用程序。我们可以很容易地用 mutex 解决你的问题:

class ChatRepositoryImpl ... {
    private val apolloMutex = Mutex()

    override fun chatMessages(...) {
        ...
        apolloMutex.withLock {
            api.getApolloClient(token)
                .query(it)
                .await()
        }
        ...
    }

    override fun chatSendText(...) {
        ...
        apolloMutex.withLock {
            api.getApolloClient(token)
                .mutate(it)
                .await()
        }
        ...
    }

不过,这个问题应该不是真正解决在客户端,而是在服务端。您尝试的解决方案并不能完全保护您免受并发请求。如果由于某些原因,应用程序的两个实例具有相同的令牌,或者如果用户试图操纵您的应用程序,它仍然可以发送并发请求。

如果您不能轻易地正确修复问题,您可以在服务器端应用您打算在客户端应用的相同修复。只需按顺序处理请求或部分请求。它更 error-proof 也更高效,因为这样只需要按顺序完成整个请求时间的一部分。