- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
在面试这一篇我们介绍过 CountDownLatch和CyclicBarrier ,它们都是jdk1.5提供的多线程并发控制类,内部都是用AQS这个同步框架实现。 在我们的实际项目中,有很多场景是需要从数据库查询一批数据,多线池执行某些操作,并且要统计结果,我们对这个过程做了一些封装,由于要统计结果,所以需要等所有任务都处理完成,我们用到了CountDownLatch实现同步。伪代码如下:
ExecuteInstance ei = ExecuteInstance.build(myExecutor); //线程池
//循环
LoopShutdown.build("myTask").loop(() -> {
//不断从数据获取数据
List<Task> list = getFromDb();
//设置countdownlatch
ei.setCountDownSize(list.size());
list.forEach(item -> ei.execute(() -> {
//提交到线程池执行,并且统计
}));
//等待这一批做完
ei.await();
});
//内部使用了CountDownLatch await()
return ei.awaitResult();
代码很简单,容易理解。不过后来有同学提到每次都要setCountDownSize() + await() 这套组合太麻烦,能不能省略这两步呢。另外也不够灵活,有些场景不能提前知道要处理的数据总数,例如从迭代器遍历数据,Iterator接口并没有size方法可以获取到总数.
那怎么实现这个功能呢?就是本篇要介绍的Phaser.
Phaser类是jdk7提供的,可重用的,同步的,在功能上和CountDownLatch,CyclicBarrier类似,但更加灵活的类。 "phaser" google翻译一下是:"移相器"的意思,完全不知道是什么~,不过"phase"是阶段的意思,还是能从名字了解到一些信息.
Phaser运行机制:
Registration(注册) 跟其他barrier不同,在phaser上注册的parties会随着时间的变化而变化。任务可以随时注册(使用方法register,bulkRegister注册,或者由构造器确定初始parties),并且在任何抵达点可以随意地撤销注册(方法arriveAndDeregister)。就像大多数基本的同步结构一样,注册和撤销只影响内部计数;不会创建更深的内部记录,所以任务不能查询他们是否已经注册。(不过,可以通过继承来实现类似的记录) 可以动态的注册是它的特点之一,我们知道CountDownLatch之类的在开始就需要指定一个计数,并且不能更改,而Phaser可以开始指定,也可以运行时更改.
Synchronization(同步机制) 和CyclicBarrier一样,Phaser也可以重复await。方法arriveAndAwaitAdvance的效果类似CyclicBarrier.await。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制到达phaser和等待其他线程的动作,通过下面两种类型的方法
Arrival(到达机制) arrive和arriveAndDeregister方法记录到达状态。这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。重写此方法类似于为CyclicBarrier提供一个barrierAction,但比它更灵活.
Waiting(等待机制) awaitAdvance方法需要一个表示arrival phase number的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务.
Termination(终止机制) 可以用isTerminated方法检查phaser的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止.
Tiering(分层结构) Phaser支持分层结构(树状构造)来减少竞争。注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。 这也是它的主要亮点之一,这一点很像ConcurrentHashMap(对HashTable)和LongAdder(对AtomicLong),通过分散热点来降低资源竞争,提升并发效率.
Monitoring(状态监控) 由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用.
CountDownLatch和CyclicBarrier都非常简单,从Phaser提供的api数量就可以看出为什么说它更加灵活,show me the code,接下来我们通过几个例子感受一下.
例子1:子线程会等全部子线程达到后才开始执行,实现类似CyclicBarrier的效果.
@Test
public void test1() throws InterruptedException {
List<Runnable> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final int j = i;
list.add(() -> System.out.println(j));
}
final Phaser phaser = new Phaser(); // "1" to register self
// create and start threads
int i = 0;
for (final Runnable task : list) {
i++;
final int j = i;
phaser.register();
new Thread(() -> {
try {
Thread.sleep(j * 1000);
} catch (InterruptedException e) {
}
//全部子线程到达后才开始执行
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}).start();
}
Thread.sleep(15000);
}
例子2:task会循环做3次,通过重写onAdvance可以控制phaser结束的条件.
@Test
public void test2() throws InterruptedException {
//重复做3次
int iterations = 3;
List<Runnable> list = Lists.newArrayList();
for (int i = 0; i < 2; i++) {
final int j = i;
list.add(() -> System.out.println(j));
}
final Phaser phaser = new Phaser() {
//每做一次,phase+1,该方法返回true,就会结束
protected boolean onAdvance(int phase, int registeredParties) {
return phase > iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : list) {
phaser.register();
new Thread(() -> {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}).start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
Thread.sleep(5000);
}
例子3:创建多个phaser,并关联到父phaser上,就是上面提到的分层结构.
@Test
public void test3() {
Phaser parent = new Phaser(1);
Phaser phaser1 = new Phaser(parent);
Phaser phaser2 = new Phaser(parent);
for (int i = 0; i < 20; i++) {
final int j = i;
if (i < 10) {
phaser1.register();
new Thread(() -> {
try {
Thread.sleep(1000);
phaser1.arriveAndAwaitAdvance(); // await all creation
System.out.println(j);
} catch (InterruptedException e) {
}
}).start();
} else if (i < 20) {
phaser2.register();
new Thread(() -> {
try {
Thread.sleep(10000);
phaser2.arriveAndAwaitAdvance(); // await all creation
System.out.println(j);
} catch (InterruptedException e) {
}
}).start();
}
}
parent.arriveAndAwaitAdvance();
System.out.println("done");
}
例子4:使用Phaser改写我们的代码,如下:
//维护一个Phaser
public static ExecuteInstance buildWithPhaser(Executor executor) {
ExecuteInstance ei = new ExecuteInstance();
ei.executor = executor;
ei.phaser = new Phaser(1);
return ei;
}
//提交线程池前注册一下
public void executeRR(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
phaser.register();
executor.execute(() -> executeStatistics(task, exceptionHandler, batch));
}
//执行后deregister一下
private void executeStatistics(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
ReturnResult result = ReturnResult.NONE;
try {
//任务处理
result = task.call();
} catch (Exception e) {
if (statistics) {
counter.incrException(batch);
}
if (exceptionHandler != null) {
//自定义异常处理
try {
exceptionHandler.accept(e);
} catch (Exception he) {
}
}
} finally {
phaser.arriveAndDeregister(); //deregister
if (statistics) {
if (ReturnResult.SUCCESS.equals(result)) {
counter.incrSuccess(batch);
} else if (ReturnResult.FAIL.equals(result)) {
counter.incrFail(batch);
} else if (ReturnResult.FILTER.equals(result)) {
counter.incrFilter(batch);
}
}
}
}
//等待结果
public ExecuteResult awaitResult() {
phaser.arriveAndAwaitAdvance();
return getExecuteResult();
}
使用就非常简单了 。
ExecuteInstance ei = ExecuteInstance.buildWithPhaser(myExecutor); //线程池
//循环
LoopShutdown.build("myTask").loop(() -> {
//不断从数据获取数据
List<Task> list = getFromDb();
list.forEach(item -> ei.execute(() -> {
//提交到线程池执行,并且统计
}));
});
return ei.awaitResult();
Phaser是jkd7后提供的同步工具类,它底层并没有使用AQS同步工具。相比CountDownLatch等它提供了更丰富的功能,但也意味着它更复杂,需要更多的资源,一些简单的场景CountDownLatch等工具类能满足的就使用它们即可,考虑性能,还有灵活性时才考虑使用Phaser,如笔者的场景使用Phaser就更加适合.
更多分享,欢迎关注我的github: https://github.com/jmilktea/jtea 。
最后此篇关于并发工具类Phaser的文章就讲到这里了,如果你想了解更多关于并发工具类Phaser的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这个问题在这里已经有了答案: Android ADT version required 20.0.0 and above (10 个答案) 关闭 9 年前。 我刚刚安装了 Eclipse Juno
按照 This page from codeplex 上的指南进行操作后,我无法在我的工具/选项窗口中看到 Python 选项。我认为我与指南的唯一偏差是: 发行版:没有安装 activestate
我有一个非常大的 .sql 脚本。我将此脚本添加到 Visual Studio 2013 下的 SQL Server 项目中。当我尝试构建它时,我收到此错误消息 This T-SQL script e
当我在SpringBoot项目中想加个依赖,但是不确定现有依赖的依赖的依赖.....有没有添加过这个依赖,怎么办呢?如果添加过了但是不知道我需要的这个依赖属于哪个依赖的下面,怎么查呢? IDEA中提供
我正在做一个项目来减少 PDF 的大小,压缩它们。我想知道市场上是否有任何非常好的工具/库(.NET)。 我确实尝试了一些像 Onstream Compression 这样的工具,但结果并不令人满意。
我想从我的源代码编译一个安卓内核。 但我想使用工具或类似的东西。 所以我只需单击一个按钮并获得一个可闪存的 zip 文件... 有工具吗? 我可以用脚本来做吗? 谢谢! 最佳答案 这取决于您从哪里获得
我们生成 pdf 文件,其中包含有关数万名客户每月财务余额的数据。在高峰期(年底有 100.000 个文件),使用在 5 台服务器之间分配负载,该过程可能需要长达 5 天的时间才能完成。工作负载的分配
模块:xmllib xmllib 是一个非验证的低级语法分析器。应用程序员使用的 xmllib 可以覆盖 XMLParser 类,并提供处理文档元素(如特定或类属标记,或字符实体)的方法。从 Py
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 3 年前。
我在一家医疗保健公司工作,拥有有关患者位置(地址、城市、州、 zip )的信息。我试图确定有多少百分比的患者住在离 5 个特定位置最近的地方。我正在寻找的答案是“25% 的患者住在离#1 地点最近的地
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 4年前关闭。 我们不允许在 Stack Overflow 上提出有关通用计算硬件和软件的问题。您可以编辑问
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
请问我在哪里可以得到 SvcTraceViewer 工具? 我尝试下载并安装许多 SDK。 我查看了程序文件的垃圾箱。 我需要它来跟踪我的 WCF 调用出了什么问题。 最佳答案 您可以通过下载 Win
我正在尝试在我最喜欢的编辑器中设置适当的代码完成功能,我们将其称为AnEditor,以避免互联网上充斥着特定于程序的答案。 (您知道语言是ALanguage。)编辑器具有两个我喜欢的功能:它既可以在控
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
当 merge 的两个分支对同一文件有更改时,Mercurial 是否总是使用外部 merge 工具? 或者它是否首先查看它是否可以 merge 文件本身,如果不能,则仅转向外部工具? 我问的原因是我
我正在为我使用的编辑器编写 Scala 插件,该插件将突出显示所有未使用的代码路径(可能未使用 defs 、 vals 、 classes 和 implicits ),并为用户提供一个选项以将它们从.
我有 jquery 工具滚动器...我喜欢它只为 swipeLeft swipeRight 实现触摸选项。 当我使用 touch: true 时,它也会在向上/向下滑动时旋转.. 我按照此处的说明
我已经尝试了一些用于构建 UML(对象/依赖图)的 Eclipse 工具,但我真正需要的是一个工具来生成这样的代码外 UML。 (反之亦然) 我更喜欢一个简单的 UML 工具,它易于安装并且没有任何依
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,因为
我是一名优秀的程序员,十分优秀!