gpt4 book ai didi

Java:停止可调用线程的设计思路

转载 作者:搜寻专家 更新时间:2023-11-01 03:44:01 25 4
gpt4 key购买 nike

我正在编写一个进行批处理的程序。批处理元素可以彼此独立处理,我们希望最大限度地减少整体处理时间。因此,我不是一次循环遍历批处理中的每个元素,而是使用 ExecutorService 并向其提交 Callable 对象:

    public void process(Batch batch)
{
ExecutorService execService = Executors.newCachedThreadPool();
CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>();

for (BatchElement element : batch.getElement())
{
Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),
element));
futures.add(future);
}

boolean done = false;

while (!done)
{
for (Future<BatchElementStatus> future : futures)
{
try
{
if (future.isDone())
{
futures.remove(future);
}
}
catch (Exception e)
{
System.out.println(e.getMessage());
}

if (futures.size() == 0)
{
done = true;
}
}
}
}

我们希望能够取消批处理。因为我没有使用循环,所以我不能只在每个循环的顶部检查是否设置了取消标志。

我们正在使用一个 JMS 主题,BatchProcessor 和 ElementProcessor 都将监听该主题以通知它们批处理已被取消。

在 ElementProcess call() 中有许多步骤,其中一些步骤可以安全地停止处理,但没有返回点。该类具有以下基本设计:

public class ElementProcessor implements Callable, MessageListener
{
private cancelled = false;

public void onMessage(Message msg)
{
// get message object
cancelled = true;
}

public BatchElementStatus call()
{
String status = SUCCESS;

if (!cancelled)
{
doSomehingOne();
}
else
{
doRollback();
status = CANCELLED;
}

if (!cancelled)
{
doSomehingTwo();
}
else
{
doRollback();
status = CANCELLED;
}

if (!cancelled)
{
doSomehingThree();
}
else
{
doRollback();
status = CANCELLED;
}

if (!cancelled)
{
doSomehingFour();
}
else
{
doRollback();
status = CANCELLED;
}

// After this point, we cannot cancel or pause the processing

doSomehingFive();
doSomehingSix();

return new BatchElementStatus("SUCCESS");
}

}

除了在 if(!cancelled) 语句的调用方法中包装方法调用/代码块之外,我想知道是否有更好的方法来检查批处理/元素是否已被取消.

有什么建议吗?

最佳答案

我不认为你可以做得比你现在做的更好,但这里有一个替代方案:

public BatchElementStatus call() {
return callMethod(1);
}

private callMethod(int methodCounter) {
if (cancelled) {
doRollback();
return new BatchElementStatus("FAIL");
}
switch (methodCounter) {
case 1 : doSomethingOne(); break;
case 2 : doSomethingTwo(); break;
...
case 5 : doSomethingFive();
doSomethingSix();
return new BatchElementStatus("SUCCESS");
}
return callMethod(methodCounter + 1);
}

另外,你想制作 cancelled volatile ,因为onMessage将从另一个线程调用。但您可能不想使用 onMessagecancelled无论如何(见下文)。

其他小问题:1) CopyOnWriteArrayList<Future<BatchElementStatus>> futures应该只是一个 ArrayList .使用并发集合误导我们认为 futures在很多线程上。 2) while (!done)应替换为 while (!futures.isEmpty())done删除。 3)你可能应该调用future.cancel(true)而不是“消息”取消。然后你必须检查 if (Thread.interrupted())而不是 if (cancelled) .如果你想杀死所有 future ,那么只需调用 execService.shutdownNow() ;你的任务必须处理中断才能工作。

编辑:

而不是你的while(!done) { for (... futures) { ... }} , 你应该使用 ExecutorCompletionService .它可以完成您正在尝试做的事情,而且可能会做得更好。 API中有一个完整的例子。

关于Java:停止可调用线程的设计思路,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7209731/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com