gpt4 book ai didi

java - Java 中的动态计划并发任务执行

转载 作者:行者123 更新时间:2023-12-01 13:00:06 25 4
gpt4 key购买 nike

我正在尝试实现一个根据某些用户输入对任务进行编程的应用程序。用户可以将多个IP与与它们关联的telnet命令(一对一关系)、执行频率和2个组(集群、对象类)。

用户应该能够在运行时添加/删除 IP、集群、命令等。他们还应该能够中断执行。

该应用程序应该能够向 IP 发送 telnet 命令,等待响应并根据频率将响应保存在数据库中。我遇到的问题是尝试使所有这些都是多线程的,因为至少有 60,000 个 IP 可以进行 telnet,并且在单个线程中执行此操作会花费太多时间。一个线程应该处理同一集群中具有相同 objectClass 的一组 IP。

我看过Quartz安排工作。使用 Quartz,我尝试创建一个动态作业,获取 IP 列表(带有命令),对其进行处理并将结果保存到数据库中。但后来我遇到了用户提供的不同计时器的问题。 Quartz 网页上的示例不完整,并且没有详细介绍。

然后我尝试用老式的方法,使用java线程,但我需要异常处理和参数传递,线程不这样做。然后我发现了 Callables 和 Executors,但我无法使用 Callables 安排任务。

现在我很困惑,我该怎么办?

最佳答案

好的,这里有一些想法。与必要的盐一起服用。

首先,创建一个您需要完成的所有工作的列表。我假设您在表中的某处有这个,您可以进行如下所示的连接:

cluster | objectClass | ip-address | command | frequency | last-run-time

这代表了您的系统需要完成的所有工作。为了便于解释,我会说频率可以采用“每天 1 次”、“每小时 1 次”、“每小时 4 次”、“每分钟”的形式。该表每个(集群、对象类、IP 地址、命令)一行。假设另一个表有运行历史记录,包含错误消息和其他内容。

现在您需要做的是阅读该表并安排工作。对于调度,请使用以下之一:

ScheduledExecutorService exec = Executors...

当您安排某些事情时,您需要告诉它运行的频率(使用我们给出的频率就足够简单了)和延迟。如果某件事每分钟运行一次,并且最后一次运行是在 4 分 30 秒前,则初始延迟为零。如果每小时运行一次,则初始延迟为(60 分钟 - 4.5 分钟 = 55.5 分钟)。

ScheduledFuture<?> handle = exec.scheduleAtFixedRate(...);

更复杂的调度类型是像 Quartz 这样的东西存在的原因,但基本上你只需要一种解决方法,给定(调度,上次运行)到下一次执行的耗时。如果您可以做到这一点,那么您可以使用schedule(...)而不是scheduleAtFixedRate(...),然后在该任务完成时安排该任务的下一次运行。

无论如何,当你安排某件事时,你会得到它的句柄

ScheduledFuture<?> handle = exec.scheduleAtFixedRate(...);

将此 handle 放在可触及的地方。为了便于讨论,我们假设它是 TaskKey 的映射。 TaskKey 是(cluster | objectClass | ip-address | command)一起作为一个对象。

Map<TaskKey,ScheduledFuture<?>> tasks = ...;

您可以使用该句柄来取消和安排新作业。

cancelForCustomer(CustomerId id) {
List<TaskKey> keys = db.findAllTasksOwnedByCustomer(id);
for(TaskKey key : keys) {
ScheduledFuture<?> f = tasks.get(key);
if(f!=null) f.cancel();
}
}

对于参数传递,创建一个对象来代表您的工作。使用您需要的所有参数创建其中之一。

class HostCheck implements Runnable {
private final Address host;
private final String command;
private final int something;
public HostCheck(Address host, String command; int something) {
this.host = host; this.command = command; this.something = something;
}
....
}

对于异常处理,将其全部本地化到您的对象中

class HostCheck implements Runnable {
...
public void run() {
try {
check();
scheduleNextRun(); // optionally, if fixed-rate doesn't work
} catch( Exception e ) {
db.markFailure(task); // or however.
// Point is tell somebody about the failure.
// You can use this to decide to stop scheduling checks for the host
// or whatever, but just record the info now and us it to influence
// future behavior in, er, the future.
}
}
}

好的,到目前为止,我认为我们的状态非常好。有很多细节需要填写,但感觉是可以管理的。现在我们遇到了一些复杂性,这就是“cluster/objectClass”对的执行是串行的要求。

有几种方法可以处理这个问题。

如果唯一对的数量较少,您可以直接制作 Map<ClusterObjectClassPair,ScheduledExecutorService> ,确保创建单线程执行程序服务(例如 Executors.newSingleThreadScheduledExecutor() )。因此,您拥有的不是单个调度服务(exec,上面)。足够简单。

如果您需要控制同时尝试的工作量,那么您可以让每个 HealthCheck 在执行前获得许可。拥有一些全局许可对象

public static final Semaphore permits = java.util.concurrent.Semaphore(30);

然后

class HostCheck implements Runnable {
...
public void run() {
permits.acquire()
try {
check();
scheduleNextRun();
} catch( Exception e ) {
// regular handling
} finally {
permits.release();
}
}
}

每个ClusterObjectClassPair只有一个线程,它序列化该工作,然后只允许限制数量 ClusterObjectClassPair您可以一次交谈。

我想这会是一个相当长的答案。祝你好运。

关于java - Java 中的动态计划并发任务执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23573312/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com