gpt4 book ai didi

java - 如何根据用户定义的参数执行 throttle ?

转载 作者:行者123 更新时间:2023-11-30 10:08:33 26 4
gpt4 key购买 nike

我正在以用户在多线程环境中定义的批量大小写入内存分布式数据库。但是我想限制写入ex的行数。 1000 行/秒。这个要求的原因是我的生产者写得太快而消费者遇到叶内存错误。在批处理记录时是否有任何标准做法来执行 throttle 。

dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.map(m => (m, Events.transform(m)))
dbRecords.map { record =>
try {
Events.setValues(eventInsert, record._2)
eventInsert.addBatch
} catch {
case e: Exception =>
logger.error(s"error adding batch: ${e.getMessage}")
val error_event = Events.jm.writeValueAsString(mapAsJavaMap(record._1.asInstanceOf[Map[String, Object]]))
logger.error(s"event: $error_event")
}
}

// Bulk Commit Records
try {
eventInsert.executeBatch
} catch {
case e: java.sql.BatchUpdateException =>
val updates = e.getUpdateCounts
logger.error(s"failed commit: ${updates.toString}")
updates.zipWithIndex.filter { case (v, i) => v == Statement.EXECUTE_FAILED }.foreach { case (v, i) =>
val error = Events.jm.writeValueAsString(mapAsJavaMap(dbRecords(i)._1.asInstanceOf[Map[String, Object]]))
logger.error(s"insert error: $error")
logger.error(e.getMessage)
}
}
finally {
connection.commit
eventInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}
}

我希望如果我可以将用户定义的参数作为 throttleMax 传递,并且如果每个线程写入的总记录达到 throttleMax,thread.sleep() 将被调用 1 秒。但这会使整个过程变得非常缓慢。是否有任何其他有效方法可用于将数据加载限制为 1000 行/秒?

最佳答案

正如其他人所建议的那样(请参阅问题的评论),与在此处限制相比,您有更好的选择。但是,您可以使用如下一些简单的代码限制 Java 中的操作:

/**
* Given an Iterator `inner`, returns a new Iterator which will emit items upon
* request, but throttled to at most one item every `minDelayMs` milliseconds.
*/
public static <T> Iterator<T> throttledIterator(Iterator<T> inner, int minDelayMs) {
return new Iterator<T>() {
private long lastEmittedMillis = System.currentTimeMillis() - minDelayMs;

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public T next() {
long now = System.currentTimeMillis();
long requiredDelayMs = now - lastEmittedMillis;
if (requiredDelayMs > 0) {
try {
Thread.sleep(requiredDelayMs);
} catch (InterruptedException e) {
// resume
}
}
lastEmittedMillis = now;

return inner.next();
}
};
}

以上代码使用了Thread.sleep,所以不适合在Reactive系统中使用。在这种情况下,您可能希望使用该系统中提供的 Throttle 实现,例如throttle in Akka

关于java - 如何根据用户定义的参数执行 throttle ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53715890/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com