反应性 Mongo:已超过最大操作数 (maxQueueWaitSize) 500

Reactive Mongo: Max number of operations (maxQueueWaitSize) of 500 has been exceeded

我正在使用 ReactiveMongoTemplate MongoDB ChangeStream 来监听 MongoDB 集合的更新,执行一些查询,并将文档保存到另一个集合。 虽然它在本地运行良好,但在部署到具有大量容量的 UAT 后开始出现以下错误:

Too many operations are already waiting for a collection. Max number of operations (maxWaitQueueSize) of 500 has been exceeded.

有什么方法可以解决这个问题?

我在 application.yml 文件中有以下内容

spring:
   data: 
      mongodb:
          uri: mongodb://host:port/db?authMechanism=<val1>&authSource=<val2>&authechanismProperties=<val3>

这就是简化的更改流光的样子:

@Autowired
ReactiveMongoTemplate reactiveMongoTemplate;

reactiveMongoTemplate
  .changeStream(Sample.class)
  .watchCollection("sample_collection")
  .filter(
     new Criteria.orOperator(
        where("operationType").is("update"),
        where("operationType").is("insert")
     )
  )
  .listen()
  .flatMap(r->processMessage(r)). // processMessage does some queries to collections including this collection being listened to and upserts to same mongodb in a different collection
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe();

我知道我可能需要添加一些连接池才能处理多个连接?但是我如何使用 Reactive MongoDB 配置它? 我是反应式编程的新手。任何指点都会很有帮助。

您可以在这里做几件事:

  1. 检查是否有一些长时间的阻塞调用,导致线程被阻塞,并导致创建大量连接,因为之前的连接仍在执行繁重的任务。尝试检查对阻止这些调用的代码的一些优化。 在响应式编程中,您可以使用 BlockHound.

    检查是否存在阻塞代码
  2. 通过指定 waitQueueMultiplemaxPoolSize 增加连接限制 - https://docs.mongodb.com/manual/reference/connection-string/#connection-pool-options

在此之前,您可以使用

检查 mongo 数据库统计信息以查看当前和允许的连接
db.serverStatus().connections