如何避免 Kotlin 协程的并发问题?
How to avoid concurrency issues with Kotlin coroutines?
我将在 android 应用程序中实现聊天功能。为此,我通过协程流每五秒从服务器获取一次聊天消息。问题是当我想发送一条消息时,服务器有时会收到两个并发请求并且 returns 出现错误。我应该如何确保这些请求 运行 在我的聊天存储库中顺序排列?这是我的聊天库:
class ChatRepositoryImpl @Inject constructor(
private val api: ApolloApi,
private val checkTokenIsSetDataStore: CheckTokenIsSetDataStore
) : ChatRepository {
override fun chatMessages(
lastIndex: Int,
limit: Int,
offset: Int,
channelId: Int,
): Flow<Resource<ChatMessages>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
while (true) {
val response = ChatMessagesQuery(
lastIndex = Input.fromNullable(lastIndex),
limit = Input.fromNullable(limit),
offset = Input.fromNullable(offset),
channelId
).let {
api.getApolloClient(token)
.query(it)
.await()
}
response.data?.let {
emit(
Resource.Success<ChatMessages>(
it.chatMessages
)
)
}
if (response.data == null)
emit(Resource.Error<ChatMessages>(message = response.errors?.get(0)?.message))
delay(5000L)
}
}.flowOn(Dispatchers.IO)
override fun chatSendText(channelId: Int, text: String): Flow<Resource<ChatSendText>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
val response = ChatSendTextMutation(
channelId = channelId,
text = text
).let {
api.getApolloClient(token)
.mutate(it)
.await()
}
response.data?.let {
return@flow emit(
Resource.Success<ChatSendText>(
it.chatSendText
)
)
}
return@flow emit(Resource.Error<ChatSendText>(message = response.errors?.get(0)?.message))
}.flowOn(Dispatchers.IO)
}
限制并发的一种方法是使用像 Mutex or Semaphore 这样的实用程序。我们可以很容易地用 mutex 解决你的问题:
class ChatRepositoryImpl ... {
private val apolloMutex = Mutex()
override fun chatMessages(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.query(it)
.await()
}
...
}
override fun chatSendText(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.mutate(it)
.await()
}
...
}
不过,这个问题应该不是真正解决在客户端,而是在服务端。您尝试的解决方案并不能完全保护您免受并发请求。如果由于某些原因,应用程序的两个实例具有相同的令牌,或者如果用户试图操纵您的应用程序,它仍然可以发送并发请求。
如果您不能轻易地正确修复问题,您可以在服务器端应用您打算在客户端应用的相同修复。只需按顺序处理请求或部分请求。它更 error-proof 也更高效,因为这样只需要按顺序完成整个请求时间的一部分。
我将在 android 应用程序中实现聊天功能。为此,我通过协程流每五秒从服务器获取一次聊天消息。问题是当我想发送一条消息时,服务器有时会收到两个并发请求并且 returns 出现错误。我应该如何确保这些请求 运行 在我的聊天存储库中顺序排列?这是我的聊天库:
class ChatRepositoryImpl @Inject constructor(
private val api: ApolloApi,
private val checkTokenIsSetDataStore: CheckTokenIsSetDataStore
) : ChatRepository {
override fun chatMessages(
lastIndex: Int,
limit: Int,
offset: Int,
channelId: Int,
): Flow<Resource<ChatMessages>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
while (true) {
val response = ChatMessagesQuery(
lastIndex = Input.fromNullable(lastIndex),
limit = Input.fromNullable(limit),
offset = Input.fromNullable(offset),
channelId
).let {
api.getApolloClient(token)
.query(it)
.await()
}
response.data?.let {
emit(
Resource.Success<ChatMessages>(
it.chatMessages
)
)
}
if (response.data == null)
emit(Resource.Error<ChatMessages>(message = response.errors?.get(0)?.message))
delay(5000L)
}
}.flowOn(Dispatchers.IO)
override fun chatSendText(channelId: Int, text: String): Flow<Resource<ChatSendText>> = flow {
var token = ""
checkTokenIsSetDataStore.get.first {
token = it
true
}
val response = ChatSendTextMutation(
channelId = channelId,
text = text
).let {
api.getApolloClient(token)
.mutate(it)
.await()
}
response.data?.let {
return@flow emit(
Resource.Success<ChatSendText>(
it.chatSendText
)
)
}
return@flow emit(Resource.Error<ChatSendText>(message = response.errors?.get(0)?.message))
}.flowOn(Dispatchers.IO)
}
限制并发的一种方法是使用像 Mutex or Semaphore 这样的实用程序。我们可以很容易地用 mutex 解决你的问题:
class ChatRepositoryImpl ... {
private val apolloMutex = Mutex()
override fun chatMessages(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.query(it)
.await()
}
...
}
override fun chatSendText(...) {
...
apolloMutex.withLock {
api.getApolloClient(token)
.mutate(it)
.await()
}
...
}
不过,这个问题应该不是真正解决在客户端,而是在服务端。您尝试的解决方案并不能完全保护您免受并发请求。如果由于某些原因,应用程序的两个实例具有相同的令牌,或者如果用户试图操纵您的应用程序,它仍然可以发送并发请求。
如果您不能轻易地正确修复问题,您可以在服务器端应用您打算在客户端应用的相同修复。只需按顺序处理请求或部分请求。它更 error-proof 也更高效,因为这样只需要按顺序完成整个请求时间的一部分。