gpt4 book ai didi

java - ScheduledExecutorService 有点奇怪

转载 作者:行者123 更新时间:2023-12-01 10:06:47 24 4
gpt4 key购买 nike

抱歉标题含糊,但我不知道如何简洁地描述问题。

使用 ScheduledExecutorService,我安排一个 Runnable 每 5 秒运行一次,没有初始延迟。我有一个计划任务,它会在 60 秒后调用 ScheduledExecutorService 上的 shutdownw() 。当关闭发生时,主线程似乎停止了。它不退出。

在下面的代码中,每 5 秒就会从阻塞队列中正确拾取“Ping”。当 ScheduledExecutorService 关闭时,它会停止打印“Ping”,isDone() 检查下面的行不会执行,测试结束时的记录器也不会执行,但 Eclipse 显示测试仍在运行。

“测试”(不测试任何内容,只是使用它代替 main() )

package scheduledExecutorTest;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.paul.scheduledexecutortest.service.HeartBeatService;
import com.paul.scheduledexecutortest.service.MasterScheduler;

public class SchedulerTest {

private Logger LOGGER = LoggerFactory.getLogger(SchedulerTest.class);
@Test
public void testScheduleTasks() throws InterruptedException {

MasterScheduler.scheduleToRunOnceWithInitialDelay(new Runnable() {

@Override
public void run() {
MasterScheduler.shutDown();
}
}, 60L);

HeartBeatService heartbeatService = new HeartBeatService();
heartbeatService.doStart();
LOGGER.debug("doStart() returned"); //THIS NEVER GETS HIT
}
}

安排心跳的心跳服务:

package com.paul.scheduledexecutortest.service;

import java.util.concurrent.ScheduledFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.paul.scheduledexecutortest.HeartBeat;

public class HeartBeatService {

private Logger LOGGER = LoggerFactory.getLogger(HeartBeatService.class);

public void doStart() throws InterruptedException {

Scheduler<String> scheduler = new Scheduler<String>();
ScheduledFuture<String> taskStatus = scheduler.scheduleToRunPeriodically(new HeartBeat(), 5L);

try {
while (taskStatus.isDone() == false) {
LOGGER.debug(scheduler.getTaskOutput());
}
}
catch (Exception ex) {
LOGGER.error("Something happened");
}

LOGGER.debug("COMPLETE"); //THIS NEVER GETS HIT

}
}

调度程序:

package com.paul.scheduledexecutortest.service;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.paul.scheduledexecutortest.ScheduledTask;

public class Scheduler<T> {

private Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(1);

@SuppressWarnings("unchecked")
public ScheduledFuture<T> scheduleToRunPeriodically(ScheduledTask<T> scheduledTask,
long timeIntervalSeconds) {

Runnable task = transformIntoRunnable(scheduledTask);
return (ScheduledFuture<T>) MasterScheduler.scheduleToRunPeriodically(task,
timeIntervalSeconds);
}

@SuppressWarnings("unchecked")
public ScheduledFuture<T> scheduleToRunPeriodicallyWithInitialDelay(
ScheduledTask<T> scheduledTask, long repeatTimeIntervalSeconds,
long initalDelaySeconds) {

Runnable task = transformIntoRunnable(scheduledTask);
return (ScheduledFuture<T>) MasterScheduler.scheduleToRunPeriodicallyWithInitialDelay(task,
repeatTimeIntervalSeconds, initalDelaySeconds);
}

public void scheduleToRunOnceWithInitialDelay(ScheduledTask<T> scheduledTask,
long timeIntervalSeconds) {

Runnable task = transformIntoRunnable(scheduledTask);
MasterScheduler.scheduleToRunOnceWithInitialDelay(task, timeIntervalSeconds);
}

private Runnable transformIntoRunnable(final ScheduledTask<T> scheduledTask) {
LOGGER.debug("Converting ScheduledTask into Runnable");
return () -> queue.add(scheduledTask.invoke());
}

public T getTaskOutput() throws InterruptedException {
return queue.take();
}

}

单例主调度器

package com.paul.scheduledexecutortest.service;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MasterScheduler {

private static Logger LOGGER = LoggerFactory.getLogger(MasterScheduler.class);

public static final int THREAD_POOL_SIZE = 10;

private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREAD_POOL_SIZE);

public static ScheduledFuture<?> scheduleToRunPeriodically(Runnable task, long timeIntervalSeconds) {

LOGGER.debug("Scheduling task to run async every " + timeIntervalSeconds + " seconds without delay");
return scheduler.scheduleAtFixedRate(task, 0, timeIntervalSeconds, TimeUnit.SECONDS);
}

public static ScheduledFuture<?> scheduleToRunPeriodicallyWithInitialDelay(Runnable task,
long repeatTimeIntervalSeconds, long initalDelaySeconds) {

LOGGER.debug("Seceduling task to run every " + repeatTimeIntervalSeconds + " seconds after initial delay of "
+ initalDelaySeconds + " seconds");

return scheduler.scheduleAtFixedRate(task, initalDelaySeconds, repeatTimeIntervalSeconds, TimeUnit.SECONDS);
}

public static void scheduleToRunOnceWithInitialDelay(Runnable task, long timeIntervalSeconds) {

scheduler.schedule(task, timeIntervalSeconds, TimeUnit.SECONDS);
}

public static void shutDown() {
System.err.println("SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED");
scheduler.shutdown();
}
}

Heatbeat(计划的任务)

package com.paul.scheduledexecutortest;

public class HeartBeat implements ScheduledTask<String> {

@Override
public String invoke() {
return "Ping";
}

}

输出:

13:48:41.721 [main] DEBUG com.paul.scheduledexecutortest.service.Scheduler - Converting ScheduledTask into Runnable
13:48:41.780 [main] DEBUG com.paul.scheduledexecutortest.service.MasterScheduler - Scheduling task to run async every 5 without delay
13:48:41.781 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:48:46.807 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:48:51.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:48:56.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:01.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:06.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:11.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:16.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:21.784 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:26.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:31.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
13:49:36.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping
SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED

最佳答案

似乎 doStart() 方法正在阻塞,因为 taskStatus.isDone() 几乎总是 false,请参阅 this .

在您的 Scheduler.java 类中,当它只包含 1 个元素时,为什么需要一个阻塞队列?

我建议执行以下更改:

测试.java:

MasterScheduler.scheduleToRunOnceWithInitialDelay(() -> MasterScheduler.shutDown(), 60L);
HeartBeatService heartbeatService = new HeartBeatService();
heartbeatService.doStart();

HeartBeatService:

public void doStart() throws InterruptedException {
Scheduler<String> scheduler = new Scheduler<String>();
scheduler.scheduleToRunPeriodically(new HeartBeat(), 5L);
}

调度程序.java:

private Runnable transformIntoRunnable(final ScheduledTask<T> scheduledTask) {
LOGGER.debug("Converting ScheduledTask into Runnable");
return () -> LOGGER.debug((String)scheduledTask.invoke());
}

MasterScheduler.java:

public static void shutDown() {
System.err.println("SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED");
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
}
}

关于java - ScheduledExecutorService 有点奇怪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36388948/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com