如何根据用户定义的参数执行节流?
How to perform throttling based on user defined argument?
我正在以用户在多线程环境中定义的批量大小写入内存分布式数据库。但是我想限制写入ex的行数。 1000 rows/sec。这个要求的原因是我的生产者写得太快,消费者 运行 进入叶内存错误。在批处理记录时是否有任何标准做法来执行节流。
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
我希望如果我可以将用户定义的参数作为 throttleMax 传递,并且如果每个线程写入的总记录达到 throttleMax,将调用 thread.sleep() 1 秒。但这会使整个过程变得非常缓慢。是否有任何其他有效方法可用于将数据加载限制到 1000 rows/sec?
正如其他人所建议的(请参阅问题的评论),您有比在此处限制更好的选择。但是,您可以使用如下一些简单的代码来限制 Java 中的操作:
/**
* Given an Iterator `inner`, returns a new Iterator which will emit items upon
* request, but throttled to at most one item every `minDelayMs` milliseconds.
*/
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
return new Iterator<T>() {
private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public T next() {
long now = System.currentTimeMillis();
long requiredDelayMs = now - lastEmittedMillis;
if (requiredDelayMs > 0) {
try {
Thread.sleep(requiredDelayMs);
} catch (InterruptedException e) {
// resume
}
}
lastEmittedMillis = now;
return inner.next();
}
};
}
以上代码使用了Thread.sleep
,所以不适合在Reactive系统中使用。在这种情况下,您可能希望使用该系统中提供的 Throttle 实现,例如throttle
in Akka
我正在以用户在多线程环境中定义的批量大小写入内存分布式数据库。但是我想限制写入ex的行数。 1000 rows/sec。这个要求的原因是我的生产者写得太快,消费者 运行 进入叶内存错误。在批处理记录时是否有任何标准做法来执行节流。
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}
// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}
我希望如果我可以将用户定义的参数作为 throttleMax 传递,并且如果每个线程写入的总记录达到 throttleMax,将调用 thread.sleep() 1 秒。但这会使整个过程变得非常缓慢。是否有任何其他有效方法可用于将数据加载限制到 1000 rows/sec?
正如其他人所建议的(请参阅问题的评论),您有比在此处限制更好的选择。但是,您可以使用如下一些简单的代码来限制 Java 中的操作:
/**
* Given an Iterator `inner`, returns a new Iterator which will emit items upon
* request, but throttled to at most one item every `minDelayMs` milliseconds.
*/
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
return new Iterator<T>() {
private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public T next() {
long now = System.currentTimeMillis();
long requiredDelayMs = now - lastEmittedMillis;
if (requiredDelayMs > 0) {
try {
Thread.sleep(requiredDelayMs);
} catch (InterruptedException e) {
// resume
}
}
lastEmittedMillis = now;
return inner.next();
}
};
}
以上代码使用了Thread.sleep
,所以不适合在Reactive系统中使用。在这种情况下,您可能希望使用该系统中提供的 Throttle 实现,例如throttle
in Akka