gpt4 book ai didi

Java支持嵌入式多线程

转载 作者:行者123 更新时间:2023-12-01 18:17:39 25 4
gpt4 key购买 nike

需要 Java 多线程方面的帮助

我有一个案例如下:

有很多记录。每条记录大约有 250 个字段。每个字段都需要根据预定义的规则进行验证。

所以我定义了一个类 FieldInfo 来表示每个字段:

public class FieldInfo {
private String name;
private String value;
private String error_code;
private String error_message;

// ignore getters and setters
}

代表一条记录的Record类:

public class Record {
List<FieldInfo> fields;

// omit getter and setter here
}

以及规则接口(interface)和类:

public interface BusinessRule {
// validating one field needs some other fields' value in the same record. So the list of all fields for a certain record passed in as parameter
public FieldInfo validate(List<FieldInfo> fields);
}

public class FieldName_Rule implements BusinessRule {

public FieldInfo validate(List<FieldInfo> fields) {
// will do
// 1. pickup those fields required for validating this target field, including this target field
// 2. performs validation logics A, B, C...

// note: all rules only read data from a database, no update/insert operations.
}
}

用户一次可以提交 5000 条或更多记录进行处理。性能要求高。我正在考虑使用多个线程来提交,例如 5000 条记录(意味着一个线程运行多条记录),并且在每个线程中,在每条记录上 fork 另一个多个线程来运行规则。

但不幸的是,这种嵌入式多线程在我的情况下总是死掉。

以下是上述解决方案的一些关键部分:

public class BusinessRuleService {

@Autowired
private ValidationHandler handler;

public String process(String xmlRequest) {
List<Record> records = XmlConverter.unmarshall(xmlRequest).toList();
ExecutorService es = Executors.newFixedThreadPool(100);
List<CompletableFuture<Integer> futures =
records.stream().map(r->CompletableFuture.supplyAsync(()-> handler.invoke(r), es)).collect(Collectors.toList());
List<Integer> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return XmlConverter.marshallObject(records);
}
}

@Component
public class ValidationHandlerImpl implements ValidationHandler {

@Autowired
private List<BusinessRule> rules;

@Override
public int invoke(Record record) {

ExecutorService es = Executors.newFixedThreadPool(250);
List<CompletableFuture<FieldInfo> futures =
rules.stream().map(r->CompletableFuture.supplyAsync(()-> r.validate(record.getFields()), es)).collect(Collectors.toList());
List<FieldInfo> result = future.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println("total records %d processed.", result.size());
es.shutdown();
return 0;
}
}

工作流程是:用户以 xml 字符串格式提交记录列表。应用程序端点之一启动 BusinessRuleService 对象中的 process 方法。该流程使用 CompletableFuture 来组合任务,并将任务提交给 ExecutorService,该服务具有大小为 100 的线程池。然后 CompletableFuture 列表中的每个任务都会启动 ValidationHandler 对象。 ValidationHandler 对象组成另一个 CompletableFuture 任务,并将该任务提交给另一个 ExecutorService,该服务的池大小与规则列表大小相同。

上述解决方案是否正确?

注意:我目前的解决方案是:按顺序处理提交的记录。对于每条记录,并行处理 250 条规则。采用此方案,5000条记录需要2个多小时。如此糟糕的表现是企业无法接受的。

我对并发/多线程编程非常陌生。非常感谢各种帮助!

最佳答案

这是众所周知的“单一生产者 - 多个消费者”模式。经典的解决方案是创建一个 BlockingQueue<Record> queue ,并按照他们的阅读速度将记录放在那里。在队列的另一端,许多工作线程从queue读取记录。并处理它们(在我们的例子中,验证字段):

class ValidatingThread extends Tread {
BlockingQueue<Record> queue;
FieldName_Rule validator = new FieldName_Rule();

public Validator (BlockingQueue<Record> queue) {
this.queue = queue;
}

public void run() {
Record record = queue.take();
validator.validate(collectFields(record));
}
}

最佳线程数等于 Runtime.getRuntime().availableProcessors() 。一开始就全部启动,不要使用“嵌入式多线程”。如何在处理完所有记录后停止线程的任务留作学习作业。

关于Java支持嵌入式多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60336218/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com