gpt4 book ai didi

java - 追寻失踪的任务: potential discrepancy between reported task numbers and results

转载 作者:行者123 更新时间:2023-12-02 05:27:03 25 4
gpt4 key购买 nike

我有一个相当典型的生产者-消费者场景,其中我有 1 个生产者线程执行查询并将结果放入 BlockingQueue大约 7-8 位消费者从 BlockingQueue 中挑选这些元素并对它们进行相当持久的分析。一旦这些分析完成,所得对象将被放置在 HashMap 中。以原始对象作为键, HashMap<AnalyzedObject, AnalysisResult>

由于底层数据模型中关系的性质,我得到了很多重复的任务,这些任务显然不需要重新处理。我当前的解决方案基本上如下:

public class AnalysisAction implements Runnable{

private Dataset data;
private DbManager dbManager;
private Path path;
private Set<Integer> identifiedElements;
private AnalysisResult res;
private Map<Path, AnalysisResult> analyzedPaths;

public static final AtomicInteger duplicates = new AtomicInteger(0);

public AnalysisAction(Path p, Dataset ds, DbManager dbm, Map<Path, AnalysisResult> paths){
this.data = ds;
this.path = p;
this.dbManager = dbm;
this.analyzedPaths = paths;
this.res = new AnalysisResult(path);
}

@Override
public void run() {

if(!analyzedPaths.containsKey(path)){
t0 = System.currentTimeMillis();
// 1. Check the coverage of the path
this.identifiedElements = getIdentifiedElements();
if(identifiedElements.size() != 0)
{
try{
// TIME CONSUMING STUFF...

analyzedPaths.put(path, res);
}
catch(Exception e){
// Exception handling...
}
}

t_end = System.currentTimeMillis();
DebugToolbox.submitProcTime(t_end - t0);
}
else {
duplicates.incrementAndGet();
logger.finer("Duplicate path encountered..." + System.lineSeparator());
}
}
// PRIVATE METHODS THAT CARRY OUT THE TIME CONSUMING STUFF...
}

然后在控制多线程的类中,我有以下解决方案:

public class ConcurrencyService {
private final ThreadPoolExecutor pool;
private final int poolSize;
private final int qCapacity = 1 << 7;
private final long timeout = 3;
private final Path tainedPath =
new Path(Long.MIN_VALUE, "LAST_PATH_IN_QUEUE", "N/A", "N/A");

private BlockingQueue<PathwayImpl> bq;
private DbManager dbMan;
private Dataset ds;
private Map<Path,AnalysisResult> analyzedPaths;
private volatile boolean started;

public ConcurrencyService(Dataset data, DbManager db){
this.ds = data;
this.bq = new LinkedBlockingQueue<Path>(qCapacity);
this.dbMan = db;
this.analyzedPaths = new ConcurrentHashMap<Path,AnalysisResult>(1<<15);
this.started = false;

poolSize = Runtime.getRuntime().availableProcessors();
pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, new FThreadFactory(-1));
}

public void serve() throws InterruptedException {
try {
ds.finalize();
started = true;

Thread producerThread = new Thread(new QueryingAction(), "f-query-thread");
producerThread.start();

Thread loggerThread = new Thread(new PeriodicLogAction(null), "f-logger-thread");
loggerThread.start();

while((producerThread.getState() != Thread.State.TERMINATED) || !bq.isEmpty()){

Path p = bq.poll(timeout, TimeUnit.MINUTES);
if(p != null){
if (p.equals(tainedPath)) break;
pool.submit(new AnalysisAction(p, ds, dbMan, analyzedPaths));
}else
logger.warning("Timed out while waiting for a path...");

}

} catch (Exception ex) {
// Exception handling...
} finally{

pool.shutdown();

long totalTasks = pool.getTaskCount(),
compTasks = pool.getCompletedTaskCount(),
tasksRemaining = totalTasks - compTasks,
timeout = 10 * tasksRemaining / poolSize;

pool.awaitTermination(timeout, TimeUnit.SECONDS);

logger.info(
"A total of " + DebugToolbox.getNbrProcTimes()
+ " tasks analyzed. Mean process time is: "
+ DebugToolbox.getMeanProcTimeAsString()
+ " milliseconds." + System.lineSeparator());
}

public boolean isDone(){
if(this.started)
return pool.isTerminated();
else
return false;
}
}

protected class QueryingAction implements Runnable {

// Use this to limit the number of paths to be analyzed
// private final int debugLimiter = 1500;
private final int debugLimiter = Integer.MAX_VALUE;

public void run() {
try {
int i = 0;
outer: for(String el : ds.getElements()){
inner: for(Path path : dbMan.getAllPathsWithElement(el)){
if(i++ > debugLimiter)
break outer;
else
bq.put(path);
}
}

logger.info("Total number of queried paths: " + i);

} catch (SQLException e) {
// Exception handling...
} catch (InterruptedException e) {
// Exception handling...
}
bq.offer(tainedPath);
}
}

protected class PeriodicLogAction implements Runnable {
private final PrintStream ps;
private final long period;
private final static long DEF_PERIOD = 30000;
private final String nL = System.getProperty("line.separator");
private volatile boolean loop;
private int counter = 0;
private ConcurrencyService cs;
private int inQueryQueue, inPoolQueue,
completedTasks, inProccessedSet,duplicates;

boolean sanityCheck;
StringBuffer sb;

PeriodicLogAction(PrintStream ps, long timePeriod) {
this.ps = ps;
this.period = timePeriod;
this.loop = true;
this.cs = ConcurrencyService.this;
}

// Alternative constructors

@SuppressWarnings("rawtypes")
public void run() {
logger.config("PeriodicLogAction started on thread: " +
Thread.currentThread().getName() +
System.lineSeparator());

while(loop){
// log # of paths created, analyzed and are in queue
outputLogInfo();

// wait designated time period
try {
Thread.sleep(period);
} catch (InterruptedException e) {}

if(cs.isDone()){
this.loop = false;
outputLogInfo();
}
}
}

private void outputLogInfo(){

synchronized (pool) {
Queue queryQueue = cs.bq,
poolQueue = cs.pool.getQueue();
Map<PathwayImpl,AnalysisResult> processedSet = cs.analyzedPaths;

inQueryQueue = queryQueue.size();
inPoolQueue = poolQueue.size();
completedTasks = (int) pool.getCompletedTaskCount();
inProccessedSet = processedSet.size();
duplicates = AnalysisAction.duplicates.get();
sanityCheck = (completedTasks == inProccessedSet + duplicates);
}

sb = new StringBuffer();
sb.append("Checkpoint ").append(++counter).append(": ")
.append("QQ: ").append(inQueryQueue).append("\t")
.append("PQ: ").append(inPoolQueue).append("\t")
.append("CT: ").append(completedTasks).append("\t")
.append("AP: ").append(inProccessedSet).append("\t")
.append("DP: ").append(duplicates).append("\t")
.append("Sanity: ").append(sanityCheck);

if(ps == null)
logger.info(sb.toString() + nL);
else
ps.println(sb.toString());
}
}
}

这是我在日志中看到的内容:

Sep 17, 2014 5:30:00 PM main.ConcurrencyService$QueryingAction run
INFO: Total number of queried paths: 81128
Sep 17, 2014 5:30:00 PM main.ConcurrencyService serve
INFO: All paths are queried and queued...
Initiating a timely shutdown of the pool..
...
Sep 17, 2014 5:49:49 PM main.ConcurrencyService serve
INFO: A total of 8620 tasks analyzed. Mean process time is: 1108.208 milliseconds.
...
Sep 17, 2014 5:49:54 PM main.ConcurrencyService$PeriodicLogAction outputLogInfo
INFO: Checkpoint 41: QQ: 0 PQ: 0 CT: 81128 AP: 8565 DP: 72508 Sanity: false

...这表明:

  1. 已完成任务的数量与查询和排队的对象数量一致。所以什么都没有错过..

  2. 分析路径的数量(即结果)和重复的数量不等于已完成的任务数量:81128 - (8565 + 72508) = 55

  3. 累积的结果数与 AnalysisAction 报告的处理时间不匹配类别:8565 与 8620(缺少 55 个结果)

不确定造成这种差异的原因是什么,或者从哪里开始调试。我显然无法单步执行 81128 个任务来调查缺少哪 55 个任务以及原因..

有什么建议吗?

编辑:以下是针对评论中问题的一些说明

  • DebugToolbox.submitProcTimes(long t)是一个同步静态方法,它只是将 t 添加到 ArrayList

  • isDone()是 ConcurrencyService 中的一个方法,我在尝试缩短此处发布的代码时不小心删除了它。我编辑了代码以反射(reflect)该方法的实现方式。

最佳答案

您检查映射中是否存在键,然后花时间生成值,然后将值放入映射中。

当您生成值时,另一个线程可以处理相同的键。由于尚未添加,因此现在有两个线程生成相同的值。因此生成的值的数量大于 map 的最终大小。

解决方案是添加结果(可能是占位符)并使用 putIfAbsent() 以原子方式检查 key 是否存在。

关于java - 追寻失踪的任务: potential discrepancy between reported task numbers and results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25909311/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com