gpt4 book ai didi

java - Spring TaskExecutor无界队列

转载 作者:行者123 更新时间:2023-12-02 11:09:58 25 4
gpt4 key购买 nike

我实现了 Spring-TaskExecutor(相当于 JDK 1.5 的 Executor)来处理从外部系统接收的通知。

只有一种方法的接口(interface):

 public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}

以及相应的实现:

public class AsynchronousServiceImpl implements AsynchronousService {

private TaskExecutor taskExecutor;

@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}

@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}

任务执行器的 Xml 配置(旧应用程序):

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<!--<property name="queueCapacity" value="100"/>-->
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

为 corePoolSize 和 maxPoolSize 设置为 1,因为我希望任务按顺序执行(处理任务的池仅创建 1 个线程)。

我想根据收到通知的日期对我的任务进行排序,因此我需要重写此函数以允许优先排序:

public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}

这是通知任务类:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {

private final NotificationService notificationService;
private final Notification notification;

public NotificationService(NotificationService notificationService,
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}

@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}

@Override
public void run() {
notificationService.processNotification(notification);
}
}

这就是我执行它的方式:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

这对于有界队列来说效果很好,但是我需要一个无界队列。正如您在 xml 配置中看到的,定义队列容量的行已被注释掉:

<!--<property name="queueCapacity" value="100"/>-->

但是,如果我这样做,则会收到 OutOfMemoryException。看起来它试图在应用程序启动时创建一个无界队列。但是,Executor-Service 允许我们使用无界队列 - 但我不知道如何在这里执行此操作。

最佳答案

根据java文档:

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError)

所以基本上,它是无界的,您不需要关心队列大小。

但是,队列由数组支持:

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
// Here is the initialization of backed array
this.queue = new Object[initialCapacity];
}

因此您需要提供合理的初始队列大小。之后,您就完成了。

但是,如上所述,随着越来越多的元素到来,如果内部数组已满,队列在尝试增长内部数组时可能会抛出 OutOfMemory 异常。

private void tryGrow(Object[] array, int oldCap)

但这不太可能,除非您的项目在短时间内产生数百万条通知

关于java - Spring TaskExecutor无界队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50674568/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com