gpt4 book ai didi

java - 线程池处理 'duplicate' 任务

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:28:36 24 4
gpt4 key购买 nike

我想并行执行一些不同的任务,但有一个概念,即如果任务已经排队或正在处理,则不会重新排队。我已经阅读了一些关于 Java API 的内容,并提出了下面的代码,它似乎可以工作。任何人都可以阐明我使用的方法是否是最好的方法。任何危险(线程安全?)或更好的方法来做到这一点?代码如下:

import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecution implements Runnable {
String key1;
String key2;
static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);

public static void main(String[] args) {
try {
execute(new TestExecution("A", "A"));
execute(new TestExecution("A", "A"));
execute(new TestExecution("B", "B"));
Thread.sleep(8000);
execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

static boolean execute(TestExecution e) {
System.out.println("Handling "+e.key1+":"+e.key2);
if (executions.containsKey(e)) {
Future<?> f = (Future<?>) executions.get(e);
if (f.isDone()) {
System.out.println("Previous execution has completed");
executions.remove(e);
} else {
System.out.println("Previous execution still running");
return false;
}
}
else {
System.out.println("No previous execution");
}
Future<?> f = tpe.submit(e);
executions.put(e, f);
return true;
}

public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}

public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}

public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}

public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

跟进以下评论:
计划是触发任务执行将由 cron 调用 RESTful web 服务来处理。例如,下面是每天 9:30 触发的一项任务的设置,以及每两分钟安排的另一项任务。

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22

在这种情况下,如果任务 key11:key12 正在运行,或者已经排队等待运行,我不想让另一个实例排队。我知道我们还有其他计划安排选项,但我们倾向于将 cron 用于其他任务,所以我想尽量保留这一点。

第二次更新。为了回应目前的评论,我已经重写了代码,您能否评论以下更新解决方案的任何问题?

import java.util.concurrent.LinkedBlockingQueue;

public class TestExecution implements Runnable {
String key1;
String key2;
static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());

public static void main(String[] args) {
try {
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("B", "B"));
Thread.sleep(8000);
tpe.execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}

public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}

public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}

public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestThreadPoolExecutor extends ThreadPoolExecutor {
Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());

public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {
super(2, 5, 1, TimeUnit.MINUTES, q);
}

public void execute(Runnable command) {
if (executions.contains(command)) {
System.out.println("Previous execution still running");
return;
}
else {
System.out.println("No previous execution");
}
super.execute(command);
executions.add(command);
}

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
executions.remove(r);
}
}

最佳答案

下面是我将如何处理和避免重复

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.*;

public class TestExecution implements Callable<Void> {
private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>());

private final String key1;
private final String key2;

public static void main(String... args) throws InterruptedException {
new TestExecution("A", "A").execute();
new TestExecution("A", "A").execute();
new TestExecution("B", "B").execute();
Thread.sleep(8000);
new TestExecution("A", "A").execute();
new TestExecution("B", "B").execute();
new TestExecution("B", "B").execute();
TPE.shutdown();
}

public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}

void execute() {
if (TE_SET.add(this)) {
System.out.println("Handling " + this);
TPE.submit(this);
} else {
System.out.println("... ignoring duplicate " + this);
}
}

public boolean equals(Object obj) {
return obj instanceof TestExecution &&
key1.equals(((TestExecution) obj).key1) &&
key2.equals(((TestExecution) obj).key2);
}

public int hashCode() {
return key1.hashCode() * 31 + key2.hashCode();
}

@Override
public Void call() throws InterruptedException {
if (!TE_SET.remove(this)) {
System.out.println("... dropping duplicate " + this);
return null;
}
System.out.println("Start processing " + this);
Thread.sleep(4000);
System.out.println("Finish processing " + this);
return null;
}

public String toString() {
return key1 + ':' + key2;
}
}

打印

Handling A:A
... ignoring duplicate A:A
Handling B:B
Start processing A:A
Start processing B:B
Finish processing A:A
Finish processing B:B
Handling A:A
Handling B:B
Start processing A:A
Start processing B:B
... ignoring duplicate B:B
Finish processing B:B
Finish processing A:A

关于java - 线程池处理 'duplicate' 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8905780/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com