如何将写入操作限制为 1k records/sec?
How do I limit write operations to 1k records/sec?
目前,我能够以500的batchsize写入数据库。但由于内存不足错误和子聚合器与数据库叶节点之间的延迟同步,有时我会运行进入叶节点内存错误。唯一的解决方案是,如果我将我的写入操作限制为每秒 1k 条记录,我就可以消除错误。
dataStream
.map(line => readJsonFromString(line))
.grouped(memsqlBatchSize)
.foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
1k 条记录的原因是,我尝试写入的一些数据可能包含大量 json 条记录,如果批量大小为 500,则可能导致每秒 30k 条记录。有什么办法可以保证无论记录数多少,批量只写入1000条记录?
代码块已被一个线程调用,并且有多个线程运行并行。我可以在此 Scala 代码中使用 Thread.sleep(1000)
或 delay(1.0)
。但是如果我使用 delay()
它将使用一个可能必须在函数外部调用的承诺。看起来 Thread.sleep()
是最佳选择,批量大小为 1000
。执行测试后,我可以毫无问题地对 120,000 records/thread/sec 进行基准测试。
根据 memsql 的体系结构,所有加载到 memsql 中的操作首先进入本地内存中的行存储,然后 memsql 将从那里合并到末尾的列存储中。每次我推送更多数据导致瓶颈时,都会导致叶错误。减少批量大小并引入 Thread.sleep() 帮助我编写了 120,000 records/sec。使用此基准执行测试。
我认为 Thead.sleep 不是处理这种情况的好主意。通常我们不建议在 Scala 中这样做,并且我们不想在任何情况下阻塞线程。
一个建议是使用任何流媒体技术,例如 Akka.Stream、Monix.Observable。这些库之间有一些优点和缺点,我不想在上面花太多的段落。但是当消费者比生产者慢时,他们确实支持背压来控制生产率。例如,在您的情况下,您的消费者正在编写数据库,而您的生产者可能正在读取一些 json 文件并进行一些聚合。
以下代码说明了这个想法,您需要根据需要进行修改:
val sourceJson = Source(dataStream.map(line => readJsonFromString(line)))
val sinkDB = Sink(Events.jm.writeValueAsString) // you will need to figure out how to generate the Sink
val flowThrottle = Flow[String]
.throttle(1, 1.second, 1, ThrottleMode.shaping)
val runnable = sourceJson.via[flowThrottle].toMat(sinkDB)(Keep.right)
val result = runnable.run()
目前,我能够以500的batchsize写入数据库。但由于内存不足错误和子聚合器与数据库叶节点之间的延迟同步,有时我会运行进入叶节点内存错误。唯一的解决方案是,如果我将我的写入操作限制为每秒 1k 条记录,我就可以消除错误。
dataStream
.map(line => readJsonFromString(line))
.grouped(memsqlBatchSize)
.foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
1k 条记录的原因是,我尝试写入的一些数据可能包含大量 json 条记录,如果批量大小为 500,则可能导致每秒 30k 条记录。有什么办法可以保证无论记录数多少,批量只写入1000条记录?
代码块已被一个线程调用,并且有多个线程运行并行。我可以在此 Scala 代码中使用 Thread.sleep(1000)
或 delay(1.0)
。但是如果我使用 delay()
它将使用一个可能必须在函数外部调用的承诺。看起来 Thread.sleep()
是最佳选择,批量大小为 1000
。执行测试后,我可以毫无问题地对 120,000 records/thread/sec 进行基准测试。
根据 memsql 的体系结构,所有加载到 memsql 中的操作首先进入本地内存中的行存储,然后 memsql 将从那里合并到末尾的列存储中。每次我推送更多数据导致瓶颈时,都会导致叶错误。减少批量大小并引入 Thread.sleep() 帮助我编写了 120,000 records/sec。使用此基准执行测试。
我认为 Thead.sleep 不是处理这种情况的好主意。通常我们不建议在 Scala 中这样做,并且我们不想在任何情况下阻塞线程。
一个建议是使用任何流媒体技术,例如 Akka.Stream、Monix.Observable。这些库之间有一些优点和缺点,我不想在上面花太多的段落。但是当消费者比生产者慢时,他们确实支持背压来控制生产率。例如,在您的情况下,您的消费者正在编写数据库,而您的生产者可能正在读取一些 json 文件并进行一些聚合。
以下代码说明了这个想法,您需要根据需要进行修改:
val sourceJson = Source(dataStream.map(line => readJsonFromString(line)))
val sinkDB = Sink(Events.jm.writeValueAsString) // you will need to figure out how to generate the Sink
val flowThrottle = Flow[String]
.throttle(1, 1.second, 1, ThrottleMode.shaping)
val runnable = sourceJson.via[flowThrottle].toMat(sinkDB)(Keep.right)
val result = runnable.run()