gpt4 book ai didi

java - 使用 ExecutorCompletionService 时的同步问题

转载 作者:行者123 更新时间:2023-12-01 13:32:08 24 4
gpt4 key购买 nike

我有一个场景

  1. 文本文件每天动态生成。 0 至每天 8 个。每个文件的大小可以从小到大。取决于当天的数据。
  2. 需要对其进行一些检查(业务检查、规则)。

我按照以下方式实现,它的行为不符合预期,似乎我做错了什么

对于存储结果,我有以下类,1 个文件将有 1 个结果类

public class Result {

private String fileName;
private Map<RuleTypes, String> allResult = new HashMap<RuleTypes, String>();

// setter , getter , constructor .. POJO
}

规则就像

public class ValidateRule1 implements Rule {

private String fileName;

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
public void init() {
// TODO Auto-generated method stub

}

@Override
public void runRule() {
System.out.println("Start running ... Rule 1 for "+fileName);
try {
Random r = new Random();
int sleepRandomTime = r.nextInt(15-1) + 1;
Thread.sleep(sleepRandomTime) ; // simulate rule execution
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End running ... Rule 1 for "+fileName);

}

@Override
public RuleTypes getRuleName() {
return RuleTypes.Rule1;
}

}

规则工厂就像

public static Rule getRule(RuleTypes ruleName) {
Rule result=null;

switch(ruleName) {

case Rule1 :
result = new ValidateRule1(); // todo singleton
break;

case Rule2 :
result = new ValidateRule2(); // todo singleton
break;

case Rule3 :
result = new ValidateRule3(); // todo singleton
break;
...
}
}

我通过以下方式调用规则,我使用RuleFactory创建规则(为规则创建单例对象)

final ConcurrentLinkedQueue<Rule> rulesToExecuteForModel = new ConcurrentLinkedQueue<Rule>();
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule1));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule2));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule3));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule4));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule5));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule6));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule7));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule8));


// pick 1 file and run all rules for it , different threads can pick up different files concurrently ... dont think will need synchronization here
List<File> fileQueue = new LinkedList<File>();
fileQueue.add(new File("../test/files/File1.20140203"));
fileQueue.add(new File("../test/files/File2.20140203"));
fileQueue.add(new File("../test/files/File3.20140203"));
fileQueue.add(new File("../test/files/File4.20140203"));
fileQueue.add(new File("../test/files/File5.20140203"));
fileQueue.add(new File("../test/files/File6.20140203"));

// Results Display ... 1 Result obj for 1 File
ConcurrentLinkedQueue<Result> fileWiseResult = new ConcurrentLinkedQueue<Result>();
int maxNumOfFiles = fileQueue.size();

// TODO : how can i exploit the fact that this program runs on 8 core machine ? does 1 thread correspond to 1 CPU ? i kept 8 here because it will run on 8 core machine
final ExecutorService pool = Executors.newFixedThreadPool(8);
final ExecutorCompletionService<Result> completionService = new ExecutorCompletionService<Result>(pool);

for (final File file : fileQueue) {
System.out.println("picked file "+file.getName()+" running ALL rules for it");
final Future<Result> contentFuture = completionService.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
Result r = new Result(); // 1 file 1 Result object
r.setFileName(file.getName());
Iterator<Rule> itr=rulesToExecuteForModel.iterator();
// sequentially run different rules for same file
while (itr.hasNext()) {
Rule currentRule = itr.next();
currentRule.setFileName(file.getName());
currentRule.runRule();
// take fileName / File as parameter , String result for currentFile and currentRule
r.getFileResult().put(currentRule.getRuleName(), "result for "+currentRule.getRuleName().toString());
}
return r;
}
});
}

for(int i = 0; i <maxNumOfFiles; ++i) {
Future<Result> future;
try {
future = completionService.take();
Result currentResult=null;
try {
currentResult = future.get();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Result for file ["+currentResult.getFileName()+"] is ["+currentResult.getFileResult()+"]");
fileWiseResult.add(currentResult);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

输出就像

picked file File1.20140203 running rules for it
Start running ... Rule 1 for File1.20140203
End running ... Rule 1 for File1.20140203

E
Start running ... Rule 2 for File1.20140203
End running ... Rule 2 for File1.20140203
End running ... Rule 2 for File1.20140203

Start running ... Rule 3 for File1.20140203
End running ... Rule 3 for File1.20140203

Start running ... Rule 4 for File1.20140203
End running ... Rule 4 for File1.20140203
End running ... Rule 4 for File1.20140203

Start running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203

Start running ... Rule 6 for File1.20140203
End running ... Rule 6 for File1.20140203
End running ... Rule 6 for File1.20140203

Start running ... Rule 7 for File1.20140203
End running ... Rule 7 for File1.20140203
End running ... Rule 7 for File1.20140203

Start running ... Rule 8 for File1.20140203
End running ... Rule 8 for File1.20140203
End running ... Rule 8 for File1.20140203
Result for file [File1.20140203] is [{Rule2=result for Rule2, Rule5=result for Rule5, Rule1=result for Rule1, Rule6=result for Rule6, Rule4=result for Rule4, Rule7=result for Rule7, Rule3=result for Rule3, Rule8=result for Rule8}]

期待一个声明,例如“开始运行...File1.20140203 的规则 2”和一个类似“结束运行...File1 的规则 2。 20140203”

但是从输出中可以看出,“End”的次数>“Start”的次数

我也观察到

Start running ... Rule1 for File5.20140203
Start running ... Rule1 for File6.20140203
Start running ... Rule1 for File6.20140203
Start running ... Rule1 for File4.20140203
Start running ... Rule1 for File5.20140203
Start running ... Rule1 for File4.20140203

我期望上面的日志消息中有 6 个唯一的文件名

第一个问题:我做错了什么?我该如何纠正它?

第二个问题(优化..不是实际问题)该程序将在 8 核机器上运行...如果我将池大小保持为 8 是否意味着 8 个线程将并行运行。 .. 每个核心一个...有什么方法可以确保这一点?

最佳答案

But as seen in output , the number of Times "End " > number of Times "Start "

您的错误在这一行:

currentRule.setFileName(file.getName());

多个线程正在使用相同的规则集合。因此,规则不应具有任何持久状态。您应该在每个规则方法调用中传递文件名。

您应该更改您的 runRule() 方法以采用 fileName 参数,并且将其作为规则类的字段。

This program will run on an 8 core machine .... if i keep a pool size of 8 does it mean 8 threads will run parallely ... one each core ... is there a way i can ensure this ??

他们应该是的,但没有办法确保这一点。操作系统上运行的其他进程也需要得到服务。它还取决于应用程序中发生的 IO 和其他阻塞操作的数量,以及它们是否全部并行运行。正确的做法是改变池中的线程数量,直到获得应用程序的最佳速度。

关于java - 使用 ExecutorCompletionService 时的同步问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21511978/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com