如何将写入操作限制为 1k records/sec?

How do I limit write operations to 1k records/sec?

目前,我能够以500的batchsize写入数据库。但由于内存不足错误和子聚合器与数据库叶节点之间的延迟同步,有时我会运行进入叶节点内存错误。唯一的解决方案是,如果我将我的写入操作限制为每秒 1k 条记录,我就可以消除错误。

dataStream
  .map(line => readJsonFromString(line))
  .grouped(memsqlBatchSize)
  .foreach { recordSet =>
          val dbRecords = recordSet.map(m => (m, Events.transform(m)))
          dbRecords.map { record =>
            try {
              Events.setValues(eventInsert, record._2)
              eventInsert.addBatch
            } catch {
              case e: Exception =>
                logger.error(s"error adding batch: ${e.getMessage}")
                val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"event: $error_event")
            }
          }

          // Bulk Commit Records
          try {
            eventInsert.executeBatch
          } catch {
            case e: java.sql.BatchUpdateException =>
              val updates = e.getUpdateCounts
              logger.error(s"failed commit: ${updates.toString}")
              updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
                val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
                logger.error(s"insert error: $error")
                logger.error(e.getMessage)
              }
          }
          finally {
            connection.commit
            eventInsert.clearBatch
            logger.debug(s"committed: ${dbRecords.length.toString}")
          }
        }

1k 条记录的原因是,我尝试写入的一些数据可能包含大量 json 条记录,如果批量大小为 500,则可能导致每秒 30k 条记录。有什么办法可以保证无论记录数多少,批量只写入1000条记录?

代码块已被一个线程调用,并且有多个线程运行并行。我可以在此 Scala 代码中使用 Thread.sleep(1000)delay(1.0)。但是如果我使用 delay() 它将使用一个可能必须在函数外部调用的承诺。看起来 Thread.sleep() 是最佳选择,批量大小为 1000。执行测试后,我可以毫无问题地对 120,000 records/thread/sec 进行基准测试。

根据 memsql 的体系结构,所有加载到 memsql 中的操作首先进入本地内存中的行存储,然后 memsql 将从那里合并到末尾的列存储中。每次我推送更多数据导致瓶颈时,都会导致叶错误。减少批量大小并引入 Thread.sleep() 帮助我编写了 120,000 records/sec。使用此基准执行测试。

我认为 Thead.sleep 不是处理这种情况的好主意。通常我们不建议在 Scala 中这样做,并且我们不想在任何情况下阻塞线程。

一个建议是使用任何流媒体技术,例如 Akka.Stream、Monix.Observable。这些库之间有一些优点和缺点,我不想在上面花太多的段落。但是当消费者比生产者慢时,他们确实支持背压来控制生产率。例如,在您的情况下,您的消费者正在编写数据库,而您的生产者可能正在读取一些 json 文件并进行一些聚合。

以下代码说明了这个想法,您需要根据需要进行修改:

val sourceJson = Source(dataStream.map(line => readJsonFromString(line)))
val sinkDB = Sink(Events.jm.writeValueAsString) // you will need to figure out how to generate the Sink
val flowThrottle = Flow[String]
  .throttle(1, 1.second, 1, ThrottleMode.shaping)

val runnable = sourceJson.via[flowThrottle].toMat(sinkDB)(Keep.right)
val result = runnable.run()