MongoDB 反应式模板交易
MongoDB reactive template transactions
我已经将 mongodb 用于我的开源项目一年多了,最近我决定尝试交易。在为使用事务的方法编写了一些测试后,我发现它们会抛出一些奇怪的异常,但我无法弄清楚问题出在哪里。所以我有一个使用自定义 coroutine context
的 method delete
和一个 mutex
:
open suspend fun delete(photoInfo: PhotoInfo): Boolean {
return withContext(coroutineContext) {
return@withContext mutex.withLock {
return@withLock deletePhotoInternalInTransaction(photoInfo)
}
}
}
然后调用 method 执行一些删除操作:
//FIXME: doesn't work in tests
//should be called from within locked mutex
private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
check(!photoInfo.isEmpty())
val transactionMono = template.inTransaction().execute { txTemplate ->
return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
.flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
.flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
}.next()
return try {
transactionMono.awaitFirst()
true
} catch (error: Throwable) {
logger.error("Could not delete photo", error)
false
}
}
这里我有五个操作,从五个不同的文档中删除数据。这是其中一项操作的示例:
open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
val query = Query()
.addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))
return template.remove(query, PhotoInfo::class.java)
.map { deletionResult -> deletionResult.wasAcknowledged() }
.doOnError { error -> logger.error("DB error", error) }
.onErrorReturn(false)
}
我希望这个操作在任何一个删除失败时失败,所以我使用了一个事务。
然后我有一些 tests 用于使用此 delete
方法的处理程序:
@Test
fun `photo should not be uploaded if could not enqueue static map downloading request`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.DatabaseError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
通常第一个测试通过:
以下两个失败,但出现以下异常:
17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017.
然后:
Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.
并且:
17:22:36.951 [Thread-16] WARN reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017.
有时其中两个通过而最后一个失败。
看起来只有第一个事务成功,任何后续事务都会失败,我想原因是我必须手动关闭它(或 ClientSession)。但是我找不到关于如何关闭 transactions/sessions 的任何信息。 Here 是我能找到的为数不多的几个例子之一,他们使用反应模板进行交易,我没有看到他们做任何额外的事情来关闭 transaction/session。
或者可能是因为我在模拟一个在事务中抛出异常的方法?也许在这种情况下它没有被关闭?
您可以检查连接 options and accord you driver
val connection = MongoConnection(List("localhost"))
val db = connection.database("plugin")
...
connection.askClose()
你可以搜索askClose()方法,希望对你有帮助
客户端 sessions/tranactions 已正确关闭,但在测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待锁请求超时。
基本上您必须管理您的索引创建,这样它们就不会干扰来自客户的交易。
一个快速修复方法是通过 shell 中命令下方的 运行 增加锁定超时。
db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )
在生产中你可以查看事务错误标签
并重试该操作。
我已经将 mongodb 用于我的开源项目一年多了,最近我决定尝试交易。在为使用事务的方法编写了一些测试后,我发现它们会抛出一些奇怪的异常,但我无法弄清楚问题出在哪里。所以我有一个使用自定义 coroutine context
的 method delete
和一个 mutex
:
open suspend fun delete(photoInfo: PhotoInfo): Boolean {
return withContext(coroutineContext) {
return@withContext mutex.withLock {
return@withLock deletePhotoInternalInTransaction(photoInfo)
}
}
}
然后调用 method 执行一些删除操作:
//FIXME: doesn't work in tests
//should be called from within locked mutex
private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
check(!photoInfo.isEmpty())
val transactionMono = template.inTransaction().execute { txTemplate ->
return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
.flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
.flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
.flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
}.next()
return try {
transactionMono.awaitFirst()
true
} catch (error: Throwable) {
logger.error("Could not delete photo", error)
false
}
}
这里我有五个操作,从五个不同的文档中删除数据。这是其中一项操作的示例:
open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
val query = Query()
.addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))
return template.remove(query, PhotoInfo::class.java)
.map { deletionResult -> deletionResult.wasAcknowledged() }
.doOnError { error -> logger.error("DB error", error) }
.onErrorReturn(false)
}
我希望这个操作在任何一个删除失败时失败,所以我使用了一个事务。
然后我有一些 tests 用于使用此 delete
方法的处理程序:
@Test
fun `photo should not be uploaded if could not enqueue static map downloading request`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.DatabaseError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
@Test
fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
val webClient = getWebTestClient()
val userId = "1234235236"
val token = "fwerwe"
runBlocking {
Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)
Mockito.doThrow(IOException("BAM"))
.`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
}
kotlin.run {
val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
val multipartData = createTestMultipartFile(PHOTO1, packet)
val content = webClient
.post()
.uri("/v1/api/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(multipartData))
.exchange()
.expectStatus().is5xxServerError
.expectBody()
val response = fromBodyContent<UploadPhotoResponse>(content)
assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)
assertEquals(0, findAllFiles().size)
runBlocking {
assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
}
}
}
通常第一个测试通过:
以下两个失败,但出现以下异常:
17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017.
然后:
Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.
并且:
17:22:36.951 [Thread-16] WARN reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017.
有时其中两个通过而最后一个失败。
看起来只有第一个事务成功,任何后续事务都会失败,我想原因是我必须手动关闭它(或 ClientSession)。但是我找不到关于如何关闭 transactions/sessions 的任何信息。 Here 是我能找到的为数不多的几个例子之一,他们使用反应模板进行交易,我没有看到他们做任何额外的事情来关闭 transaction/session。
或者可能是因为我在模拟一个在事务中抛出异常的方法?也许在这种情况下它没有被关闭?
您可以检查连接 options and accord you driver
val connection = MongoConnection(List("localhost"))
val db = connection.database("plugin")
...
connection.askClose()
你可以搜索askClose()方法,希望对你有帮助
客户端 sessions/tranactions 已正确关闭,但在测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待锁请求超时。
基本上您必须管理您的索引创建,这样它们就不会干扰来自客户的交易。
一个快速修复方法是通过 shell 中命令下方的 运行 增加锁定超时。
db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )
在生产中你可以查看事务错误标签 并重试该操作。