- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要执行一些任务。有些任务是独立的,有些则依赖于其他任务的成功执行。可以并行运行独立任务以提高性能。我称这些任务为服务。link
列告诉您哪些服务将串行执行,哪些将并行执行。 order
列描述了执行顺序,随后将执行一组已定义的服务。对于下面的示例,服务A和B应该并行运行。如果它们已成功执行,则服务C将执行。请注意,服务C并不直接依赖于其先前服务的输出,但是它必须在成功执行其先前服务之后运行,因为服务C在其执行期间将需要由其先前服务产生的一些数据。在成功执行服务C之后,将执行下一个服务D,依此类推,直到该列表中的所有服务都被消耗为止。
Tasks service link order
Service A 01 03 1
Service B 02 03 2
Service C 03 04 3
Service D 04 05 4
Service E 05 07 5
Service F 06 07 6
Service G 07 (null) 7
public void executeTransactionFlow(DataVo dataVo) throws Exception {
List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;
// Iterating through service flows map
for (Map<String, String> map : serviceFlowsMap) {
joinTo = map.get("link");
serviceId = map.get("service");
// A simple flag to differentiate which services should execute parallel and which in serial.
if (null == prevJoinTo) {
prevJoinTo = joinTo;
}
// Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
if (null != joinTo && joinTo.equals(prevJoinTo)) {
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
/*
* 1. Run the threads in the list
* 2. Empty the thread list
* 3. Empty serviceIds list
* 4. Set prevJoinTo
*/
else {
if (threadList.size() > 0) {
prevJoinTo = joinTo;
try {
// If list contain only 1 service then call, otherwise invokeAll
futureList = MyExecutor.executeServices(threadList, dataVo);
// During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
if (dataVo.isTimedout()) {
throw new Exception("Transaction thread is Interrupted or Timed-out");
}
// Validate service response codes and get decision in case of any failure
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
// If validationRespCode is non 00 then do not process further
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
break;
}
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
finally {
// clear thread list and serviceIds list. It will be populated for next parallel set of threads
threadList.clear();
serviceIds.clear();
}
}
// Start preparing new thread list
// Adding current service_id into threadList after executing previous services in parallel.
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
}
// Run remaining services
if (!threadList.isEmpty()) {
try {
futureList = MyExecutor.executeServices(threadList, dataVo);
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
}
catch (Throwable e) {
throw new Exception(e.getMessage(), e);
}
}
// Check validation response code
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
}
}
/**
* This method iterates through the thread list and checks for exceptions and service responses.
* If service response is not success or if any exception has occurred then exception is thrown
*/
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
String finalResponse = "200", serviceResponse = null;
/*
* future list will be null if single service is executed (no other parallel transactions). The reason is that we do
* not use invokeAll() on single service.
*/
if (null != futureList && futureList.size() > 0) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
}
}
// Iterate through serviceIds and check responses.
for (String serviceId : serviceIds) {
serviceResponse = dataVo.getServiceResponse(serviceId);
/*
* if one of following response is found then consider it exception
*/
if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
throw new Exception("One of the service has been declined");
}
}
return finalResponse;
}
CompletableFuture
在这里可能是有益的,那么我如何有效地使用它呢?
future.get()
是一个阻止调用。如果我有10个并行执行的服务,则此
future.get()
将阻止其他服务,即使它们在我们正在等待的当前服务之前执行也是如此。如何避免这种阻塞?
03
中都具有
link
值,因此它们仍将并行执行。我认为现在不再需要@Thomas在评论中建议的基于依赖关系图的方法。
最佳答案
很棒的问题。尽管从技术上讲,确实可以纯粹使用ExecutorService
和Future
来做到这一点,但对我而言,更好的方法是使用响应式(Reactive)编程,而不是仅依赖于Future
或CompletableFuture
或CompletionService
等。主要原因是它可能很快成为难以阅读的代码。
这是我使用RxJava 2.2.16
和ExecutorService
的方法:
ExecutorService
到submit()
Action 已完成了不依赖于其他 Action 或它们的所有依存关系的执行 Action 。 BehaviorSubject
。操作完成后,针对其每个依赖项触发步骤(1)。 ExecutorService
。为此,请使用另一个BehaviorSubject
。 Action
中的
createActions()
模型类和
AppRxjava
方法。从那里,您应该能够遵循该代码。为了模拟一些时间消耗,我使用了著名的
Thread.sleep()
技术。
public class AppRxJava{
/* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();
/* To listen to the completion of all tasks, so that ExecutorService may shut down. */
private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();
private ExecutorService SVC = Executors.newCachedThreadPool();
private List<Action> allActions;
public static void main( String[] args ){
new AppRxJava().start();
}
private void start() {
this.allActions = createActions();
subscribeToActionCompletions();
subscribeToSvcShutdown();
startAllActions( this.allActions );
}
private void subscribeToSvcShutdown(){
/* If all actions have been completed, shut down the ExecutorService. */
this.allActionCompletedSub.subscribe( allScheduled -> {
if( allScheduled ) {
SVC.shutdown();
try {
SVC.awaitTermination( 2, TimeUnit.SECONDS );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
private void subscribeToActionCompletions(){
this.completionSub.subscribe( complAction -> {
/* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
List<Action> deps = getDeps( complAction, this.allActions );
startAllActions( deps );
/* If all actions have got completed, raise the flag. */
if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
});
}
/* Attempts to start all actions that are present in the passed list. */
private void startAllActions( List<Action> actions ){
for( Action action : actions ) {
startAction( action, actions );
}
}
/* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
private void startAction( Action a, List<Action> list ){
if( !a.isPending() ) return;
if( !allDepsCompleted( a, allActions ) ) return;
if( a.isPending() ) {
synchronized (a.LOCK ) {
if( a.isPending() ) {
a.setStatus( 1 ); //Set to running, so that it is not picked up twice.
SVC.submit( () -> {
try {
a.getAction().call();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
this.completionSub.onNext( a );
} );
}
}
}
}
private boolean allActionsCompleted(){
for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
return true;
}
private static boolean allDepsCompleted( Action a, List<Action> allActions ){
for( Action dep : allActions ) {
if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
}
return true;
}
/* Returns the actions that are dependent on Action <code>a</code>. */
private List<Action> getDeps( Action a, List<Action> list ){
List<Action> deps = new ArrayList<>();
for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
return deps;
}
/* Creates the action list with respective dependencies. */
private List<Action> createActions(){
List<Action> actions = new ArrayList<>();
Action a = createAction( 5000, "ServiceA", null );
Action b = createAction( 5000, "ServiceB", null );
Action c = createAction( 2000, "ServiceC", a, b );
Action d = createAction( 2000, "ServiceD", c );
Action e = createAction( 2000, "ServiceE", d );
actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
return actions;
}
private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
List<Action> deps = null;
if( dependencies != null ) {
deps = new ArrayList<>();
for( Action a : dependencies ) deps.add( a );
}
return Action.of( () -> {
System.out.println( "Service (" + name + ") started" );
try {
Thread.sleep( sleepMillis );
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println( "Service (" + name + ") completed" );
return true;
}, name, deps );
}
}
public class Action{
Callable<Boolean> action;
String name;
List<Action> dependencies = new ArrayList<>();
AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
public static final Object LOCK = new Object();
private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
super();
this.action = action;
this.name = name;
if( dependencies != null ) this.dependencies = dependencies;
}
public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
return new Action( action, name, dependencies );
}
public Callable<Boolean> getAction(){
return action;
}
public String getName(){
return name;
}
public List<Action> getDependencies(){
return dependencies;
}
public boolean isCompleted(){
return this.status.get() == 2;
}
public boolean isPending(){
return this.status.get() == 0;
}
public boolean isScheduled(){
return this.status.get() == 1;
}
public void setStatus( int status ){
this.status.getAndSet( status );
}
@Override
public int hashCode(){
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@Override
public boolean equals( Object obj ){
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
Action other = (Action) obj;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equalsIgnoreCase( other.name )) return false;
return true;
}
}
关于java - 通过CompletableFuture并行执行一些线程,并串行执行一些线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59716051/
有没有办法同时运行 2 个不同的代码块。我一直在研究 R 中的并行包,它们似乎都基于在循环中运行相同的函数。我正在寻找一种同时运行不同函数的方法(循环的 1 次迭代)。例如,我想在某个数据对象上创建一
无论如何增加 Parallel.For 启动后的循环次数?示例如下: var start = 0; var end = 5; Parallel.For(start, end, i => { C
我是 Golang 的新手,正在尝试了解并发和并行。我阅读了下面提到的关于并发和并行的文章。我执行了相同的程序。但没有得到相同的(混合字母和字符)输出。首先获取所有字母,然后获取字符。似乎并发不工作,
我正在寻找同时迭代 R 中两个或多个字符向量/列表的方法,例如。有没有办法做这样的事情: foo <- c('a','c','d') bar <- c('aa','cc','dd') for(i in
我对 Raku 很陌生,我对函数式方法有疑问,尤其是 reduce。 我最初有这样的方法: sub standardab{ my $mittel = mittel(@_); my $foo =
我最近花了很多时间来学习实时音频处理的细节,我发现的大多数库/工具都是c / c++代码或脚本/图形语言的形式,并在其中编译了c / c++代码。引擎盖。 使用基于回调的API,与GUI或App中的其
我正在使用 JMeter 进行图像负载测试。我有一个图像名称数组并遍历该数组,我通过 HTTP 请求获取所有图像。 -> loop_over_image - for loop controller
我整个晚上都在困惑这个问题...... makeflags = ['--prefix=/usr','--libdir=/usr/lib'] rootdir='/tmp/project' ps = se
我正在尝试提高计算图像平均值的方法的性能。 为此,我使用了两个 For 语句来迭代所有图像,因此我尝试使用一个 Parallel For 来改进它,但结果并不相同。 我做错了吗?或者是什么导致了差异?
假设您有一个并行 for 循环实现,例如ConcRT parallel_for,将所有工作放在一个 for 循环体内总是最好的吗? 举个例子: for(size_t i = 0; i < size()
我想并行运行一部分代码。目前我正在使用 Parallel.For 如何让10、20或40个线程同时运行 我当前的代码是: Parallel.For(1, total, (ii) =>
我使用 PAY API 进行了 PayPal 自适应并行支付,其中无论用户(买家)购买什么,都假设用户购买了总计 100 美元的商品。在我的自适应并行支付中,有 2 个接收方:Receiver1 和
我正在考虑让玩家加入游戏的高效算法。由于会有大量玩家,因此算法应该是异步的(即可扩展到集群中任意数量的机器)。有细节:想象有一个无向图(每个节点都是一个玩家)。玩家之间的每条边意味着玩家可以参加同一场
我有一个全局变量 volatile i = 0; 和两个线程。每个都执行以下操作: i++; System.out.print(i); 我收到以下组合。 12、21 和 22。 我理解为什么我没有得到
我有以下称为 pgain 的方法,它调用我试图并行化的方法 dist: /***************************************************************
我有一个 ruby 脚本读取一个巨大的表(约 2000 万行),进行一些处理并将其提供给 Solr 用于索引目的。这一直是我们流程中的一大瓶颈。我打算在这里加快速度,我想实现某种并行性。我对 Ru
我正在研究 Golang 并遇到一个问题,我已经研究了几天,我似乎无法理解 go routines 的概念以及它们的使用方式。 基本上我是在尝试生成数百万条随机记录。我有生成随机数据的函数,并将创建一
我希望 for 循环使用 go 例程并行。我尝试使用 channel ,但没有用。我的主要问题是,我想在继续之前等待所有迭代完成。这就是为什么在它不起作用之前简单地编写 go 的原因。我尝试使用 ch
我正在使用 import Control.Concurrent.ParallelIO.Global main = parallel_ (map processI [1..(sdNumber runPa
我正在尝试通过 makePSOCKcluster 连接到另一台计算机: library(parallel) cl ... doTryCatch -> recvData -> makeSOCKm
我是一名优秀的程序员,十分优秀!