- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个相当典型的生产者-消费者场景,其中我有 1 个生产者线程执行查询并将结果放入 BlockingQueue
大约 7-8 位消费者从 BlockingQueue
中挑选这些元素并对它们进行相当持久的分析。一旦这些分析完成,所得对象将被放置在 HashMap
中。以原始对象作为键,即 HashMap<AnalyzedObject, AnalysisResult>
由于底层数据模型中关系的性质,我得到了很多重复的任务,这些任务显然不需要重新处理。我当前的解决方案基本上如下:
public class AnalysisAction implements Runnable{
private Dataset data;
private DbManager dbManager;
private Path path;
private Set<Integer> identifiedElements;
private AnalysisResult res;
private Map<Path, AnalysisResult> analyzedPaths;
public static final AtomicInteger duplicates = new AtomicInteger(0);
public AnalysisAction(Path p, Dataset ds, DbManager dbm, Map<Path, AnalysisResult> paths){
this.data = ds;
this.path = p;
this.dbManager = dbm;
this.analyzedPaths = paths;
this.res = new AnalysisResult(path);
}
@Override
public void run() {
if(!analyzedPaths.containsKey(path)){
t0 = System.currentTimeMillis();
// 1. Check the coverage of the path
this.identifiedElements = getIdentifiedElements();
if(identifiedElements.size() != 0)
{
try{
// TIME CONSUMING STUFF...
analyzedPaths.put(path, res);
}
catch(Exception e){
// Exception handling...
}
}
t_end = System.currentTimeMillis();
DebugToolbox.submitProcTime(t_end - t0);
}
else {
duplicates.incrementAndGet();
logger.finer("Duplicate path encountered..." + System.lineSeparator());
}
}
// PRIVATE METHODS THAT CARRY OUT THE TIME CONSUMING STUFF...
}
然后在控制多线程的类中,我有以下解决方案:
public class ConcurrencyService {
private final ThreadPoolExecutor pool;
private final int poolSize;
private final int qCapacity = 1 << 7;
private final long timeout = 3;
private final Path tainedPath =
new Path(Long.MIN_VALUE, "LAST_PATH_IN_QUEUE", "N/A", "N/A");
private BlockingQueue<PathwayImpl> bq;
private DbManager dbMan;
private Dataset ds;
private Map<Path,AnalysisResult> analyzedPaths;
private volatile boolean started;
public ConcurrencyService(Dataset data, DbManager db){
this.ds = data;
this.bq = new LinkedBlockingQueue<Path>(qCapacity);
this.dbMan = db;
this.analyzedPaths = new ConcurrentHashMap<Path,AnalysisResult>(1<<15);
this.started = false;
poolSize = Runtime.getRuntime().availableProcessors();
pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, new FThreadFactory(-1));
}
public void serve() throws InterruptedException {
try {
ds.finalize();
started = true;
Thread producerThread = new Thread(new QueryingAction(), "f-query-thread");
producerThread.start();
Thread loggerThread = new Thread(new PeriodicLogAction(null), "f-logger-thread");
loggerThread.start();
while((producerThread.getState() != Thread.State.TERMINATED) || !bq.isEmpty()){
Path p = bq.poll(timeout, TimeUnit.MINUTES);
if(p != null){
if (p.equals(tainedPath)) break;
pool.submit(new AnalysisAction(p, ds, dbMan, analyzedPaths));
}else
logger.warning("Timed out while waiting for a path...");
}
} catch (Exception ex) {
// Exception handling...
} finally{
pool.shutdown();
long totalTasks = pool.getTaskCount(),
compTasks = pool.getCompletedTaskCount(),
tasksRemaining = totalTasks - compTasks,
timeout = 10 * tasksRemaining / poolSize;
pool.awaitTermination(timeout, TimeUnit.SECONDS);
logger.info(
"A total of " + DebugToolbox.getNbrProcTimes()
+ " tasks analyzed. Mean process time is: "
+ DebugToolbox.getMeanProcTimeAsString()
+ " milliseconds." + System.lineSeparator());
}
public boolean isDone(){
if(this.started)
return pool.isTerminated();
else
return false;
}
}
protected class QueryingAction implements Runnable {
// Use this to limit the number of paths to be analyzed
// private final int debugLimiter = 1500;
private final int debugLimiter = Integer.MAX_VALUE;
public void run() {
try {
int i = 0;
outer: for(String el : ds.getElements()){
inner: for(Path path : dbMan.getAllPathsWithElement(el)){
if(i++ > debugLimiter)
break outer;
else
bq.put(path);
}
}
logger.info("Total number of queried paths: " + i);
} catch (SQLException e) {
// Exception handling...
} catch (InterruptedException e) {
// Exception handling...
}
bq.offer(tainedPath);
}
}
protected class PeriodicLogAction implements Runnable {
private final PrintStream ps;
private final long period;
private final static long DEF_PERIOD = 30000;
private final String nL = System.getProperty("line.separator");
private volatile boolean loop;
private int counter = 0;
private ConcurrencyService cs;
private int inQueryQueue, inPoolQueue,
completedTasks, inProccessedSet,duplicates;
boolean sanityCheck;
StringBuffer sb;
PeriodicLogAction(PrintStream ps, long timePeriod) {
this.ps = ps;
this.period = timePeriod;
this.loop = true;
this.cs = ConcurrencyService.this;
}
// Alternative constructors
@SuppressWarnings("rawtypes")
public void run() {
logger.config("PeriodicLogAction started on thread: " +
Thread.currentThread().getName() +
System.lineSeparator());
while(loop){
// log # of paths created, analyzed and are in queue
outputLogInfo();
// wait designated time period
try {
Thread.sleep(period);
} catch (InterruptedException e) {}
if(cs.isDone()){
this.loop = false;
outputLogInfo();
}
}
}
private void outputLogInfo(){
synchronized (pool) {
Queue queryQueue = cs.bq,
poolQueue = cs.pool.getQueue();
Map<PathwayImpl,AnalysisResult> processedSet = cs.analyzedPaths;
inQueryQueue = queryQueue.size();
inPoolQueue = poolQueue.size();
completedTasks = (int) pool.getCompletedTaskCount();
inProccessedSet = processedSet.size();
duplicates = AnalysisAction.duplicates.get();
sanityCheck = (completedTasks == inProccessedSet + duplicates);
}
sb = new StringBuffer();
sb.append("Checkpoint ").append(++counter).append(": ")
.append("QQ: ").append(inQueryQueue).append("\t")
.append("PQ: ").append(inPoolQueue).append("\t")
.append("CT: ").append(completedTasks).append("\t")
.append("AP: ").append(inProccessedSet).append("\t")
.append("DP: ").append(duplicates).append("\t")
.append("Sanity: ").append(sanityCheck);
if(ps == null)
logger.info(sb.toString() + nL);
else
ps.println(sb.toString());
}
}
}
这是我在日志中看到的内容:
Sep 17, 2014 5:30:00 PM main.ConcurrencyService$QueryingAction run
INFO: Total number of queried paths: 81128
Sep 17, 2014 5:30:00 PM main.ConcurrencyService serve
INFO: All paths are queried and queued...
Initiating a timely shutdown of the pool..
...
Sep 17, 2014 5:49:49 PM main.ConcurrencyService serve
INFO: A total of 8620 tasks analyzed. Mean process time is: 1108.208 milliseconds.
...
Sep 17, 2014 5:49:54 PM main.ConcurrencyService$PeriodicLogAction outputLogInfo
INFO: Checkpoint 41: QQ: 0 PQ: 0 CT: 81128 AP: 8565 DP: 72508 Sanity: false
...这表明:
已完成任务的数量与查询和排队的对象数量一致。所以什么都没有错过..
分析路径的数量(即结果)和重复的数量不等于已完成的任务数量:81128 - (8565 + 72508) = 55
累积的结果数与 AnalysisAction
报告的处理时间不匹配类别:8565 与 8620(即缺少 55 个结果)
不确定造成这种差异的原因是什么,或者从哪里开始调试。我显然无法单步执行 81128 个任务来调查缺少哪 55 个任务以及原因..
有什么建议吗?
编辑:以下是针对评论中问题的一些说明
DebugToolbox.submitProcTimes(long t)
是一个同步静态方法,它只是将 t 添加到 ArrayList
。
isDone()
是 ConcurrencyService 中的一个方法,我在尝试缩短此处发布的代码时不小心删除了它。我编辑了代码以反射(reflect)该方法的实现方式。
最佳答案
您检查映射中是否存在键,然后花时间生成值,然后将值放入映射中。
当您生成值时,另一个线程可以处理相同的键。由于尚未添加,因此现在有两个线程生成相同的值。因此生成的值的数量大于 map 的最终大小。
解决方案是添加结果(可能是占位符)并使用 putIfAbsent()
以原子方式检查 key 是否存在。
关于java - 追寻失踪的任务: potential discrepancy between reported task numbers and results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25909311/
从 angular 5.1 更新到 6.1 后,我开始从我的代码中收到一些错误,如下所示: Error: ngc compilation failed: components/forms/utils.
我正在学习 Typescript 并尝试了解类型和接口(interface)的最佳实践。我正在玩一个使用 GPS 坐标的示例,想知道一种方法是否比另一种更好。 let gps1 : number[];
type padding = [number, number, number, number] interface IPaddingProps { defaultValue?: padding
这两种格式在内存中保存结果的顺序上有什么区别吗? number = number + 10; number += 10; 我记得一种格式会立即保存结果,因此下一行代码可以使用新值,而对于另一种格式,
在 Python 匹配模式中,如何匹配像 1 这样的文字数字在按数字反向引用后 \1 ? 我尝试了 \g用于此目的的替换模式中可用的语法,但它在我的匹配模式中不起作用。 我有一个更大的问题,我想使用一
我的源文件here包含 HTML 代码,我想将电话号码更改为可在我的应用程序中单击。我正在寻找一个正则表达式来转换字符串 >numbernumber(\d+)$1numbernumber<",我们在S
我们有一个包含 2 个字段和一个按钮的表单。我们想要点击按钮来输出位于 int A 和 int B 之间的随机整数(比如 3、5 或 33)? (不需要使用 jQuery 或类似的东西) 最佳答案 你
我收到以下类型错误(TypeScript - 3.7.5)。 error TS2345: Argument of type '(priority1: number, priority2: number
只想创建简单的填充器以在其他功能中使用它: function fillLine(row, column, length, bgcolor) { var sheet = SpreadsheetApp
我有一个问题。当我保存程序输出的 *.txt 时,我得到以下信息:0.021111111111111112a118d0 以及更多的东西。 问题是: 这个数字中的“d0”和“a”是什么意思? 我不知道“
首先:抱歉标题太长了,但我发现很难用一句话来解释这个问题;)。是的,我也四处搜索(这里和谷歌),但找不到合适的答案。 所以,问题是这样的: 数字 1-15 将像这样放在金字塔中(由数组表示):
我想从字符串中提取血压。数据可能如下所示: text <- c("at 10.00 seated 132/69", "99/49", "176/109", "10.12 I 128/51, II 1
当尝试执行一个简单的 bash 脚本以将前面带有 0 的数字递增 1 时,原始数字被错误地解释。 #!/bin/bash number=0026 echo $number echo $((number
我有一个类型为 [number, number] 的字段,TypeScript 编译器(strict 设置为 true)出现问题,提示初始值值(value)。我尝试了以下方法: public shee
你能帮我表达数组吗:["232","2323","233"] 我试试这个:/^\[("\d{1,7}")|(,"\d{1,7}")\]$/ 但是这个表达式不能正常工作。 我使用 ruby(rail
这个问题在这里已经有了答案: meaning of (number) & (-number) (4 个回答) 关闭6年前. 例如: int get(int i) { int res = 0;
我正在考虑使用 Berkeley DB作为高度并发的移动应用程序后端的一部分。对于我的应用程序,使用 Queue对于他们的记录级别锁定将是理想的。但是,如标题中所述,我需要查询和更新概念建模的数据,如
我正在尝试解决涉及重复数字的特定 JavaScript 练习,为此我需要将重复数字处理到大量小数位。 目前我正在使用: function divide(numerator, denominator){
我有这个数组类型: interface Details { Name: string; URL: string; Year: number; } interface AppState {
我们正在使用 Spring 3.x.x 和 Quartz 2.x.x 实现 Web 应用程序。 Web 服务器是 Tomcat 7.x.x。我们有 3 台服务器。 Quartz 是集群式的,因此所有这
我是一名优秀的程序员,十分优秀!