MongoDB 反应式模板交易

MongoDB reactive template transactions

我已经将 mongodb 用于我的开源项目一年多了,最近我决定尝试交易。在为使用事务的方法编写了一些测试后,我发现它们会抛出一些奇怪的异常,但我无法弄清楚问题出在哪里。所以我有一个使用自定义 coroutine contextmethod delete 和一个 mutex:

  open suspend fun delete(photoInfo: PhotoInfo): Boolean {
    return withContext(coroutineContext) {
      return@withContext mutex.withLock {
        return@withLock deletePhotoInternalInTransaction(photoInfo)
      }
    }
  }

然后调用 method 执行一些删除操作:

  //FIXME: doesn't work in tests
  //should be called from within locked mutex
  private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
    check(!photoInfo.isEmpty())

    val transactionMono = template.inTransaction().execute { txTemplate ->
      return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
        .flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
        .flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
    }.next()

    return try {
      transactionMono.awaitFirst()
      true
    } catch (error: Throwable) {
      logger.error("Could not delete photo", error)
      false
    }
  }

这里我有五个操作,从五个不同的文档中删除数据。这是其中一项操作的示例:

open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
    val query = Query()
      .addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))

    return template.remove(query, PhotoInfo::class.java)
      .map { deletionResult -> deletionResult.wasAcknowledged() }
      .doOnError { error -> logger.error("DB error", error) }
      .onErrorReturn(false)
  }

我希望这个操作在任何一个删除失败时失败,所以我使用了一个事务。

然后我有一些 tests 用于使用此 delete 方法的处理程序:

  @Test
  fun `photo should not be uploaded if could not enqueue static map downloading request`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.DatabaseError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

通常第一个测试通过:

以下两个失败,但出现以下异常:

17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017. 

然后:

Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.

并且:

17:22:36.951 [Thread-16] WARN  reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017. 

有时其中两个通过而最后一个失败。

看起来只有第一个事务成功,任何后续事务都会失败,我想原因是我必须手动关闭它(或 ClientSession)。但是我找不到关于如何关闭 transactions/sessions 的任何信息。 Here 是我能找到的为数不多的几个例子之一,他们使用反应模板进行交易,我没有看到他们做任何额外的事情来关闭 transaction/session。

或者可能是因为我在模拟一个在事务中抛出异常的方法?也许在这种情况下它没有被关闭?

您可以检查连接 options and accord you driver

val connection = MongoConnection(List("localhost"))
val db = connection.database("plugin")
...
connection.askClose()

你可以搜索askClose()方法,希望对你有帮助

客户端 sessions/tranactions 已正确关闭,但在测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待锁请求超时。

基本上您必须管理您的索引创建,这样它们就不会干扰来自客户​​的交易。

一个快速修复方法是通过 shell 中命令下方的 运行 增加锁定超时。

db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )

在生产中你可以查看事务错误标签 并重试该操作。

这里有更多内容https://docs.mongodb.com/manual/core/transactions-production-consideration/#pending-ddl-operations-and-transactions