- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章JVM优先级线程池做任务队列的实现方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
前言 。
我们都知道 web 服务的工作大多是接受 http 请求,并返回处理后的结果。服务器接受的每一个请求又可以看是一个任务。一般而言这些请求任务会根据请求的先后有序处理,如果请求任务的处理比较耗时,往往就需要排队了。而同时不同的任务直接可能会存在一些优先级的变化,这时候就需要引入任务队列并进行管理了。可以做任务队列的东西有很多,Java 自带的线程池,以及其他的消息中间件都可以.
同步与异步 。
这个问题在之前已经提过很多次了,有些任务是需要请求后立即返回结果的,而有的则不需要。设想一下你下单购物的场景,付完钱后,系统只需要返回一个支付成功即可,后续的积分增加、优惠券发放、安排发货等等业务都不需要实时返回给用户的,这些就是异步的任务。大量的异步任务到达我们部署的服务上,由于处理效率的瓶颈,无法达到实时处理,因此与需要用队列将他们暂时保存起来,排队处理.
线程池 。
在 Java 中提到队列,我们除了想到基本的数据结构之外,应该还有线程池。线程池自带一套机制可以实现任务的排队和执行,可以满足单点环境下绝大多数异步化的场景。下面是典型的一个处理流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// 注入合适类型的线程池
@Autowired
private
final
ThreadPoolExecutor asyncPool;
@RequestMapping
(value =
"/async/someOperate"
, method = RequestMethod.POST)
public
RestResult someOperate(HttpServletRequest request, String params,String callbackUrl {
// 接受请求后 submit 到线程池排队处理
asyncPool.submit(
new
Task(params,callbackUrl);
return
new
RestResult(ResultCode.SUCCESS.getCode(),
null
) {{
setMsg(
"successful!"
+ prop.getShowMsg());
}};
}
// 异步任务处理
@Slf4j
public
class
Task
extends
Callable<RestResult> {
private
String params;
private
String callbackUrl;
private
final
IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.
class
);
private
final
ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.
class
);
public
ImageTask(String params,String callbackUrl) {
this
.params = params;
this
.callbackUrl = callbackUrl;
}
@Override
public
RestResult call() {
try
{
// 业务处理
CarDamageResult result = algorithmService.someOperate(
this
.params);
// 回调
return
serviceUtils.callback(
this
.callbackUrl,
this
.caseNum, ResultCode.SUCCESS.getCode(), result,
this
.isAsync);
}
catch
(ServiceException e) {
return
serviceUtils.callback(
this
.callbackUrl,
this
.caseNum, e.getCode(),
null
,
this
.isAsync);
}
}
}
|
对于线程池这里就不具体展开讲了,仅仅简单理了下具体的流程:
上面的例子描述了一个生产速度远远大于消费速度的模型,普通面向数据库开发的企业级应用,由于数据库的连接池开发的连接数较大,一般不需要这样通过线程池来处理,而一些 GPU 密集型的应用场景,由于显存的瓶颈导致消费速度慢时,就需要队列来作出调整了.
带优先级的线程池 。
更复杂的,例如考虑到任务的优先级,还需要对线程池进行重写,通过 PriorityBlockingQueue 来替换默认的阻塞队列。直接上代码.
1
2
3
4
5
6
7
8
9
10
11
12
|
import
lombok.Data;
import
java.util.concurrent.Callable;
/**
* @author Fururur
* @create 2020-01-14-10:37
*/
@Data
public
abstract
class
PriorityCallable<T>
implements
Callable<T> {
private
int
priority;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import
lombok.Getter;
import
lombok.Setter;
import
java.util.concurrent.*;
import
java.util.concurrent.atomic.AtomicLong;
/**
* 优先级线程池的实现
*
* @author Fururur
* @create 2019-07-23-10:19
*/
public
class
PriorityThreadPoolExecutor
extends
ThreadPoolExecutor {
private
ThreadLocal<Integer> local = ThreadLocal.withInitial(() ->
0
);
public
PriorityThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime, TimeUnit unit) {
super
(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
}
public
PriorityThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super
(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
}
public
PriorityThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
super
(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
}
public
PriorityThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super
(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
}
private
static
PriorityBlockingQueue getWorkQueue() {
return
new
PriorityBlockingQueue();
}
@Override
public
void
execute(Runnable command) {
int
priority = local.get();
try
{
this
.execute(command, priority);
}
finally
{
local.set(
0
);
}
}
public
void
execute(Runnable command,
int
priority) {
super
.execute(
new
PriorityRunnable(command, priority));
}
public
<T> Future<T> submit(PriorityCallable<T> task) {
local.set(task.getPriority());
return
super
.submit(task);
}
public
<T> Future<T> submit(Runnable task, T result,
int
priority) {
local.set(priority);
return
super
.submit(task, result);
}
public
Future<?> submit(Runnable task,
int
priority) {
local.set(priority);
return
super
.submit(task);
}
@Getter
@Setter
protected
static
class
PriorityRunnable
implements
Runnable, Comparable<PriorityRunnable> {
private
final
static
AtomicLong seq =
new
AtomicLong();
private
final
long
seqNum;
private
Runnable run;
private
int
priority;
PriorityRunnable(Runnable run,
int
priority) {
seqNum = seq.getAndIncrement();
this
.run = run;
this
.priority = priority;
}
@Override
public
void
run() {
this
.run.run();
}
@Override
public
int
compareTo(PriorityRunnable other) {
int
res =
0
;
if
(
this
.priority == other.priority) {
if
(other.run !=
this
.run) {
// ASC
res = (seqNum < other.seqNum ? -
1
:
1
);
}
}
else
{
// DESC
res =
this
.priority > other.priority ? -
1
:
1
;
}
return
res;
}
}
}
|
要点如下:
总结 。
JVM 线程池是实现异步任务队列最简单最原生的一种方式,本文介绍了基本的使用流程和带有优先队列需求的用法。这种方法可有满足到一些简单的业务场景,但也存在一定的局限性:
显然简单的 JVM 线程池是无法 handle 到负载的业务场景的,这就需要引入其他中间件了,在接下来的文章中我们会继续探讨.
参考文献 。
ThreadPoolExecutor 优先级的线程池 。
implementing PriorityQueue on ThreadPoolExecutor 。
ThreadPoolExecutor 的 PriorityBlockingQueue 类型转化问题 。
大搜车异步任务队列中间件的建设实践 。
到此这篇关于JVM优先级线程池做任务队列的实现方法的文章就介绍到这了,更多相关java线程池优先级内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://juejin.im/post/6857383794035556365 。
最后此篇关于JVM优先级线程池做任务队列的实现方法的文章就讲到这里了,如果你想了解更多关于JVM优先级线程池做任务队列的实现方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一个关于 JavaScript 语法的问题。实际上,我在自学 MEAN 堆栈教程时想出了编码(https://thinkster.io/mean-stack-tutorial#adding-aut
在我的书中它使用了这样的东西: for($ARGV[0]) { Expression && do { print "..."; last; }; ... } for 循环不完整吗?另外,do 的意义何
我已经编写了读取开关状态的代码,如果按 3 次 # 则退出。 void allkeypadTest(void) { static uint8_t modeKeyCount=0; do
因此,对于上周我必须做的作业,我必须使用 4 个 do-while 循环和 if 语句在 Java 中制作一个猜谜游戏。我无法成功完成它,类(class)已经继续,没有为我提供任何帮助。如果有人可以查
int i=1,j=0,n=10,k; do{ j+=i; i<<1; printf("%d\n",i); // printf("%d\n",12<<1); }while
此代码用于基本杂货计算器的按钮。当我按下按钮时,一个输入对话框会显示您输入商品价格的位置。我遇到的问题是我无法弄清楚如何获得 do ... while 循环以使输入对话框在输入后弹出。 我希望它始终恢
当我在循环中修改字符串或另一个变量时,它的条件是否每次都重新计算?或者在循环开始前一次 std::string a("aa"); do { a = "aaaa"; } while(a.size<10)
我刚刚写了这个,但我找不到问题。我使用代码块并编写了这个问题 error: expected 'while' before '{' token === Build finished: 1 errors
do { printf("Enter number (0-6): ", ""); scanf("%d", &Num); }while(Num >= 0 && Num 表示“超过”,<表
我有一个包含 10 个项目的 vector (为简单起见,所有项目都属于同一类,称其为“a”)。我想要做的是检查“A”不是 a) 隐藏墙壁或 b) 隐藏另一个“A”。我有一个碰撞函数可以做到这一点。
嗨,这是我的第二个问题。我有下表 |-----|-------|------|------| |._id.|..INFO.|.DONE.|.LAST.| |..1..|...A...|...N..|.
这个问题在这里已经有了答案: 关闭 12 年前。 Possible Duplicates: Why are there sometimes meaningless do/while and if/e
来自 wikibook在 F# 上有一小部分它说: What does let! do?# let! runs an async object on its own thread, then it i
我在 Real World Haskell 书中遇到了以下函数: namesMatching pat | not (isPattern pat) = do exists do
我有一个类似于下面的用例,我创建了多个图并使用 gridExtra 将它们排列到一些页面布局中,最后使用 ggsave 将其保存为 PDF : p1 % mutate(label2
当我使用具有 for 循环的嵌套 let 语句时,如果没有 (do (html5 ..)),我将无法运行内部 [:tr]。 (defpartial column-settings-layout [&
执行 vagrant up 时出现此错误: anr@anr-Lenovo-G505s ~ $ vagrant up Bringing machine 'default' up with 'virtua
# ################################################# # Subroutine to add data to the table Blas
我想创建一个检查特定日期格式的读取主机。此外,目标是检查用户输入是否正确,如果不正确,则提示应再次弹出。 当我刚接触编程时,发现了这段代码,这似乎很合适。我仍然在努力“直到” do {
我关注这个tutorial在谷歌云机器学习引擎上进行培训。我一步一步地跟着它,但是在将 ml 作业提交到云时我遇到了错误。我运行了这个命令。 sam@sam-VirtualBox:~/models/r
我是一名优秀的程序员,十分优秀!