gpt4 book ai didi

java - 通过CompletableFuture并行执行一些线程,并串行执行一些线程

转载 作者:行者123 更新时间:2023-12-04 14:53:33 24 4
gpt4 key购买 nike

我需要执行一些任务。有些任务是独立的,有些则依赖于其他任务的成功执行。可以并行运行独立任务以提高性能。我称这些任务为服务。link列告诉您哪些服务将串行执行,哪些将并行执行。 order列描述了执行顺序,随后将执行一组已定义的服务。对于下面的示例,服务A和B应该并行运行。如果它们已成功执行,则服务C将执行。请注意,服务C并不直接依赖于其先前服务的输出,但是它必须在成功执行其先前服务之后运行,因为服务C在其执行期间将需要由其先前服务产生的一些数据。在成功执行服务C之后,将执行下一个服务D,依此类推,直到该列表中的所有服务都被消耗为止。

Tasks       service     link      order
Service A 01 03 1
Service B 02 03 2
Service C 03 04 3
Service D 04 05 4
Service E 05 07 5
Service F 06 07 6
Service G 07 (null) 7

以下是我的代码。
    public void executeTransactionFlow(DataVo dataVo) throws Exception {

List<Callable<Boolean>> threadList = new ArrayList<>();
List<String> serviceIds = new ArrayList<>();
List<Future<Boolean>> futureList;
String validatedRespCode = null, joinTo, prevJoinTo = null, serviceId;

// Iterating through service flows map
for (Map<String, String> map : serviceFlowsMap) {
joinTo = map.get("link");
serviceId = map.get("service");

// A simple flag to differentiate which services should execute parallel and which in serial.
if (null == prevJoinTo) {
prevJoinTo = joinTo;
}

// Check for join condition. If join condition is same as previous then do not execute the thread list yet add current service in list
if (null != joinTo && joinTo.equals(prevJoinTo)) {
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}

/*
* 1. Run the threads in the list
* 2. Empty the thread list
* 3. Empty serviceIds list
* 4. Set prevJoinTo
*/
else {
if (threadList.size() > 0) {
prevJoinTo = joinTo;

try {

// If list contain only 1 service then call, otherwise invokeAll
futureList = MyExecutor.executeServices(threadList, dataVo);

// During execution we cannot interrupt services, so we check here after they get back to here and interrupt if it has been timedout.
if (dataVo.isTimedout()) {
throw new Exception("Transaction thread is Interrupted or Timed-out");
}

// Validate service response codes and get decision in case of any failure
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);

// If validationRespCode is non 00 then do not process further
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
break;
}
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
finally {
// clear thread list and serviceIds list. It will be populated for next parallel set of threads
threadList.clear();
serviceIds.clear();
}
}

// Start preparing new thread list
// Adding current service_id into threadList after executing previous services in parallel.
threadList.add(new Callable<String, DataVo>(serviceId, dataVo));
}
}

// Run remaining services
if (!threadList.isEmpty()) {

try {
futureList = MyExecutor.executeServices(threadList, dataVo);
validatedRespCode = validateResponseOfExecutedServices(dataVo, futureList, serviceIds);
}
catch (Throwable e) {
throw new Exception(e.getMessage(), e);
}
}

// Check validation response code
if (null != validatedRespCode && !"200".equals(validatedRespCode)) {
MyExecutor.callDeclineFlow(dataVo, validatedRespCode, null);
}

}


/**
* This method iterates through the thread list and checks for exceptions and service responses.
* If service response is not success or if any exception has occurred then exception is thrown
*/
public String validateResponseOfExecutedServices(DataVo dataVo, List<Future<Boolean>> futureList, List<String> serviceIds) throws Exception {
String finalResponse = "200", serviceResponse = null;

/*
* future list will be null if single service is executed (no other parallel transactions). The reason is that we do
* not use invokeAll() on single service.
*/

if (null != futureList && futureList.size() > 0) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
throw new Exception(e.getMessage(), e);
}
}
}

// Iterate through serviceIds and check responses.
for (String serviceId : serviceIds) {
serviceResponse = dataVo.getServiceResponse(serviceId);

/*
* if one of following response is found then consider it exception
*/
if (null != serviceResponse && "400,401,402,403,404,500,501".contains(serviceResponse)) {
throw new Exception("One of the service has been declined");
}
}

return finalResponse;
}

如果 CompletableFuture在这里可能是有益的,那么我如何有效地使用它呢?
future.get()是一个阻止调用。如果我有10个并行执行的服务,则此 future.get()将阻止其他服务,即使它们在我们正在等待的当前服务之前执行也是如此。如何避免这种阻塞?

我添加了问题说明的更多详细信息,即添加了order列。服务需要遵循定义的顺序。服务A和B的顺序分别为1和2,但由于它们在 03中都具有 link值,因此它们仍将并行执行。我认为现在不再需要@Thomas在评论中建议的基于依赖关系图的方法。

最佳答案

很棒的问题。尽管从技术上讲,确实可以纯粹使用ExecutorServiceFuture来做到这一点,但对我而言,更好的方法是使用响应式(Reactive)编程,而不是仅依赖于FutureCompletableFutureCompletionService等。主要原因是它可能很快成为难以阅读的代码。

这是我使用RxJava 2.2.16ExecutorService的方法:

  • 使用ExecutorServicesubmit() Action 已完成了不依赖于其他 Action 或它们的所有依存关系的执行 Action 。
  • 要知道操作已完成,请使用RxJava的BehaviorSubject。操作完成后,针对其每个依赖项触发步骤(1)。
  • 完成所有操作后,关闭ExecutorService。为此,请使用另一个BehaviorSubject

  • 抱歉,由于这种新方法,我以自己的方式编写了整个逻辑。但这仍然围绕您给出的主要要求。最好先看看 Action中的 createActions()模型类和 AppRxjava方法。从那里,您应该能够遵循该代码。为了模拟一些时间消耗,我使用了著名的 Thread.sleep()技术。
    public class AppRxJava{
    /* To listen to the completion of a task, so that the dependent tasks may be scheduled. */
    private Subject<Action> completionSub = io.reactivex.subjects.BehaviorSubject.create();

    /* To listen to the completion of all tasks, so that ExecutorService may shut down. */
    private Subject<Boolean> allActionCompletedSub = io.reactivex.subjects.BehaviorSubject.create();

    private ExecutorService SVC = Executors.newCachedThreadPool();
    private List<Action> allActions;

    public static void main( String[] args ){
    new AppRxJava().start();
    }

    private void start() {
    this.allActions = createActions();
    subscribeToActionCompletions();
    subscribeToSvcShutdown();

    startAllActions( this.allActions );
    }

    private void subscribeToSvcShutdown(){
    /* If all actions have been completed, shut down the ExecutorService. */
    this.allActionCompletedSub.subscribe( allScheduled -> {
    if( allScheduled ) {
    SVC.shutdown();
    try {
    SVC.awaitTermination( 2, TimeUnit.SECONDS );
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    });
    }

    private void subscribeToActionCompletions(){
    this.completionSub.subscribe( complAction -> {
    /* Get the actions that are dependent on this recently completed action and "attempt" to start them. */
    List<Action> deps = getDeps( complAction, this.allActions );
    startAllActions( deps );

    /* If all actions have got completed, raise the flag. */
    if( allActionsCompleted() ) this.allActionCompletedSub.onNext( true );
    });
    }

    /* Attempts to start all actions that are present in the passed list. */
    private void startAllActions( List<Action> actions ){
    for( Action action : actions ) {
    startAction( action, actions );
    }
    }

    /* Attempts to start an action. Only if it is still pending and all of its dependencies are completed. */
    private void startAction( Action a, List<Action> list ){
    if( !a.isPending() ) return;
    if( !allDepsCompleted( a, allActions ) ) return;

    if( a.isPending() ) {
    synchronized (a.LOCK ) {
    if( a.isPending() ) {
    a.setStatus( 1 ); //Set to running, so that it is not picked up twice.
    SVC.submit( () -> {
    try {
    a.getAction().call();
    } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    a.setStatus( 2 ); //Set to completed. (We may have to synchronize this.)
    this.completionSub.onNext( a );
    } );
    }
    }
    }
    }

    private boolean allActionsCompleted(){
    for( Action a : this.allActions ) if( !a.isCompleted() ) return false;
    return true;
    }

    private static boolean allDepsCompleted( Action a, List<Action> allActions ){
    for( Action dep : allActions ) {
    if( a.getDependencies().contains( dep ) && !dep.isCompleted() ) return false;
    }

    return true;
    }

    /* Returns the actions that are dependent on Action <code>a</code>. */
    private List<Action> getDeps( Action a, List<Action> list ){
    List<Action> deps = new ArrayList<>();
    for( Action dep : list ) if( dep.getDependencies().contains( a ) ) deps.add( dep );
    return deps;
    }

    /* Creates the action list with respective dependencies. */
    private List<Action> createActions(){
    List<Action> actions = new ArrayList<>();

    Action a = createAction( 5000, "ServiceA", null );
    Action b = createAction( 5000, "ServiceB", null );
    Action c = createAction( 2000, "ServiceC", a, b );
    Action d = createAction( 2000, "ServiceD", c );
    Action e = createAction( 2000, "ServiceE", d );

    actions.add( a ); actions.add( b ); actions.add( c ); actions.add( d ); actions.add( e );
    return actions;
    }

    private Action createAction( final long sleepMillis, final String name, Action... dependencies ) {
    List<Action> deps = null;
    if( dependencies != null ) {
    deps = new ArrayList<>();
    for( Action a : dependencies ) deps.add( a );
    }
    return Action.of( () -> {
    System.out.println( "Service (" + name + ") started" );
    try {
    Thread.sleep( sleepMillis );
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    System.out.println( "Service (" + name + ") completed" );
    return true;
    }, name, deps );
    }


    }

    还有 Action 模型类。这表示一个 Action 及其依赖的 Action 列表。 (与您的原始表示形式略有不同。但是,我认为,无论哪种方法都可以,如果您对此进行适当的处​​理)。
    public class Action{
    Callable<Boolean> action;
    String name;
    List<Action> dependencies = new ArrayList<>();
    AtomicInteger status = new AtomicInteger( 0 ); //0 = Pending, 1 = Scheduled, 2 = Completed
    public static final Object LOCK = new Object();

    private Action(Callable<Boolean> action, String name, List<Action> dependencies) {
    super();
    this.action = action;
    this.name = name;
    if( dependencies != null ) this.dependencies = dependencies;
    }

    public static Action of( Callable<Boolean> action, String name, List<Action> dependencies ){
    return new Action( action, name, dependencies );
    }

    public Callable<Boolean> getAction(){
    return action;
    }

    public String getName(){
    return name;
    }

    public List<Action> getDependencies(){
    return dependencies;
    }

    public boolean isCompleted(){
    return this.status.get() == 2;
    }

    public boolean isPending(){
    return this.status.get() == 0;
    }

    public boolean isScheduled(){
    return this.status.get() == 1;
    }

    public void setStatus( int status ){
    this.status.getAndSet( status );
    }

    @Override
    public int hashCode(){
    final int prime = 31;
    int result = 1;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    return result;
    }

    @Override
    public boolean equals( Object obj ){
    if (this == obj) return true;
    if (obj == null) return false;
    if (getClass() != obj.getClass()) return false;
    Action other = (Action) obj;
    if (name == null) {
    if (other.name != null)
    return false;
    } else if (!name.equalsIgnoreCase( other.name )) return false;
    return true;
    }

    }

    关于java - 通过CompletableFuture并行执行一些线程,并串行执行一些线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59716051/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com