gpt4 book ai didi

Java Fork Join 池吃掉所有线程资源

转载 作者:太空宇宙 更新时间:2023-11-04 06:55:23 25 4
gpt4 key购买 nike

我有一个字符串解析器(解析大文本 blob),需要在 java fork join 池中运行。该池比其他线程更快,并且在使用正则表达式和 xpath 时将我的解析时间减少了 30 分钟以上。然而,创建的线程数量急剧增加,并且我需要能够终止它们,因为线程池被多次调用。如何减少线程的增加,而不将 4 核系统上的池限制为仅 1 个核?

我的线程数超过 40000,我需要它接近 5000,因为程序运行了 10 次,而我的用户执行限制为 50000 个线程。

Windows 和 Linux 上都会出现此问题。

我是:

  • 将最大处理器设置为可用处理器的数量*当前为 1 的可配置数量
  • 调用 get() 后取消任务
  • 在重新实例化之前拼命将 forkjoin 池设置为 null,因为我很绝望

任何帮助将不胜感激。谢谢。

这是我用来停止、获取和重新启动池的代码。我可能还应该注意到,我使用 fjp.submit(TASK) 提交每个任务,然后在关闭时调用它们。

while(pages.size()>0) { log.info("当前 Activity 线程:"+Thread.activeCount()); log.info("迭代中找到的页面 "+j+": "+pages.size());

        if(fjp.isShutdown())
{
fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
}

i=0;
//if asked to generate a hash, due this first
if(getHash==true){
log.info("Generating Hash");
int s=pages.size();
while(i<s){
String withhash=null;
String str=pages.get(0);

if(str != null){
jmap=Json.read(str).asJsonMap();
jmap.put("offenderhash",Json.read(genHash(jmap.get("offenderhash").asString()+i)));

for(String k:jmap.keySet()){
withhash=(withhash==null)?"{\""+k+"\":\""+jmap.get(k).asString()+"\"":withhash+",\""+k+"\":\""+jmap.get(k).asString()+"\"";
}

if(withhash != null){
withhash+=",}";
}

pages.remove(0);
pages.add((pages.size()-1), withhash);
i++;
}
}
i=0;
}

if(singlepats != null)
{

log.info("Found Singlepats");
for(String row:pages)
{

String str=row;
str=str.replaceAll("\t|\r|\r\n|\n","");
jmap=Json.read(str).asJsonMap();

if(singlepats.containsKey("table"))
{
if(fjp.isShutdown())
{
fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));
}

fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));

if(jmap.get(column)!=null)
{

if(test){
System.out.println("//////////////////////HTML////////////////////////\n"+jmap.get(column).asString()+"\n///////////////////////////////END///////////////////////////\n\n");
}

if(mustcontain != null)
{
if(jmap.get(column).asString().contains(mustcontain))
{
if(cannotcontain != null)
{
if(jmap.get(column).asString().contains(cannotcontain)==false)
results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
}
else
{
results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
}
}
}
else if(cannotcontain != null)
{
if(jmap.get(column).asString().contains(cannotcontain)==false)
{
results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
}
}
else
{
results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
}
}
}

i++;

if(((i%commit_size)==0 & i != 0) | i==pages.size() |pages.size()==1 & singlepats != null)
{
log.info("Getting Regex Results");

log.info("Shutdown");

try {
fjp.awaitTermination(termtime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

fjp.shutdown();
while(fjp.isTerminated()==false)
{
try{
Thread.sleep(5);
}catch(InterruptedException e)
{
e.printStackTrace();
}
}


for(Future<String> r:results)
{
try {
add=r.get();
if(add.contains("No Data")==false)
{
parsedrows.add(add);
}

add=null;
if(r.isDone()==false)
{
r.cancel(true);
}

if(fjp.getActiveThreadCount()>0 && fjp.getRunningThreadCount()>0)
{
fjp.shutdownNow();
}

fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

results=new ArrayList<ForkJoinTask<String>>();

if(parsedrows.size()>=commit_size)
{

if(parsedrows.size()>=SPLITSIZE)
{
sendToDb(parsedrows,true);
}
else
{
sendToDb(parsedrows,false);
}

parsedrows=new ArrayList<String>();
}


//hint to the gc in case it actually pays off (think if i were a gambling man)
System.gc();
Runtime.getRuntime().gc();
}


}
}
log.info("REMAINING ROWS TO COMMIT "+parsedrows.size());
log.info("Rows Left"+parsedrows.size());
if(parsedrows.size()>0)
{


if(parsedrows.size()>=SPLITSIZE)
{
sendToDb(parsedrows,true);
}
else
{
sendToDb(parsedrows,false);
}


parsedrows=new ArrayList<String>();
}

records+=i;
i=0;

//Query for more records to parse

最佳答案

看起来您正在为每个结果创建一个新的 ForkJoinPool。您真正想做的是创建一个供所有任务共享的 ForkJoinPool。额外的池不会提供额外的并行性,所以一个应该没问题。当您获得准备运行的任务时,获取您的 fjp 并调用 fjp.execute(ForkJoinTask)ForkJoinTask.fork()(如果您已经在任务中)。

创建多个池似乎是一场簿记噩梦。尝试只使用共享的一个。

关于Java Fork Join 池吃掉所有线程资源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22817672/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com