gpt4 book ai didi

java - 依赖图的响应式(Reactive)执行

转载 作者:行者123 更新时间:2023-11-30 06:07:32 25 4
gpt4 key购买 nike

我需要创建一个服务(使用 Java)来接受任务图并并行执行它们,同时考虑这些任务之间的依赖关系。

例如,我们有 6 个任务:A、B、C、D、E、F。

依赖关系是:

A -> C

B -> C、D

C -> F

D -> E

E -> F

这将创建(可能的)并行执行组:A+B、C+D、E、F。

如果任务执行不成功(返回错误),则其依赖的任务将不会被执行。

另一个要求是对任务执行(或失败)产生副作用:通知其他一些服务(这意味着我们也必须将失败任务的依赖项视为失败?)。

处理所有任务后(直接成功或失败+依赖项失败),我想将此“批处理”标记为已完成(调用另一个服务)。

我考虑采用响应式(Reactive)方法来解决这个问题,并使用 RxJava,因为它具有异步性质。

我对这种方法还很陌生,虽然 zip/switchMap 与 doOnComplete/doOnError 相结合似乎是一个不错的方向,但我不太确定如何在这种情况下使用它们。

很高兴在这里得到一些建议:)

最佳答案

对于根据依赖关系图(严格来说是依赖非循环图)运行 Observables 的查询,您可以使用 concat()merge()运算符组成可观察量以按照给定图(DAG)运行。

对于您的示例,以下是如何构建 Observable DAG 来并行执行它们:

package rxtest;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;


public class ReactiveDagTest {
private static final Logger logger = LoggerFactory.getLogger(ReactiveDagTest.class);
private static Executor customExecutor = Executors.newFixedThreadPool(20);

@Test
public void stackOverflowTest() {
Observable<Character> a = createObservable('A', 100);
Observable<Character> b = createObservable('B', 150);
Observable<Character> c = createObservable('C', 500);
Observable<Character> d = createObservable('D', 200);
Observable<Character> e = createObservable('E', 300);
Observable<Character> f = createObservable('F', 400);

logger.info("BEGIN");

// As Observable for B is referred at two places in the graph, it needs to be cached to not to execute twice
Observable<Character> bCached = b.cache();

Observable.concat(
Observable.merge(
Observable.concat(
Observable.merge(a, bCached),
c),
Observable.concat(bCached, d, e)),
f)
.toBlocking()
.subscribe(i -> logger.info("Executed : " + i));

logger.info("END");
}

private Observable<Character> createObservable(char c, int sleepMs) {
Observable<Character> single = Observable.just(c)
.flatMap(i -> Observable.<Character> create(s -> {
logger.info("onSubscribe Start Executing : {}", i);
sleep(sleepMs);
s.onNext(Character.valueOf(i));
s.onCompleted();
}).subscribeOn(Schedulers.from(customExecutor)));
return single;
}

private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
}
}
}

输出将是

22:19:22.107 [main] INFO rxtest.ReactiveDagTest BEGIN
22:19:22.181 [pool-1-thread-1] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : A
22:19:22.181 [pool-1-thread-2] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : B
22:19:22.284 [main] INFO rxtest.ReactiveDagTest Executed : A
22:19:22.333 [main] INFO rxtest.ReactiveDagTest Executed : B
22:19:22.333 [main] INFO rxtest.ReactiveDagTest Executed : B
22:19:22.333 [pool-1-thread-3] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : C
22:19:22.334 [pool-1-thread-4] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : D
22:19:22.534 [main] INFO rxtest.ReactiveDagTest Executed : D
22:19:22.534 [pool-1-thread-5] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : E
22:19:22.833 [main] INFO rxtest.ReactiveDagTest Executed : C
22:19:22.835 [main] INFO rxtest.ReactiveDagTest Executed : E
22:19:22.835 [pool-1-thread-6] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : F
22:19:23.236 [main] INFO rxtest.ReactiveDagTest Executed : F
22:19:23.236 [main] INFO rxtest.ReactiveDagTest END

If a task execution was not successful (returned an error), its dependent tasks will not be executed.

这隐含在上述解决方案中,如果图中的任何节点失败,则图中的其他节点将不会执行。

关于java - 依赖图的响应式(Reactive)执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50990248/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com