- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要创建一个服务(使用 Java)来接受任务图并并行执行它们,同时考虑这些任务之间的依赖关系。
例如,我们有 6 个任务:A、B、C、D、E、F。
依赖关系是:
A -> C
B -> C、D
C -> F
D -> E
E -> F
这将创建(可能的)并行执行组:A+B、C+D、E、F。
如果任务执行不成功(返回错误),则其依赖的任务将不会被执行。
另一个要求是对任务执行(或失败)产生副作用:通知其他一些服务(这意味着我们也必须将失败任务的依赖项视为失败?)。
处理所有任务后(直接成功或失败+依赖项失败),我想将此“批处理”标记为已完成(调用另一个服务)。
我考虑采用响应式(Reactive)方法来解决这个问题,并使用 RxJava,因为它具有异步性质。
我对这种方法还很陌生,虽然 zip/switchMap 与 doOnComplete/doOnError 相结合似乎是一个不错的方向,但我不太确定如何在这种情况下使用它们。
很高兴在这里得到一些建议:)
最佳答案
对于根据依赖关系图(严格来说是依赖非循环图)运行 Observables 的查询,您可以使用 concat()和 merge()运算符组成可观察量以按照给定图(DAG)运行。
对于您的示例,以下是如何构建 Observable DAG 来并行执行它们:
package rxtest;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
public class ReactiveDagTest {
private static final Logger logger = LoggerFactory.getLogger(ReactiveDagTest.class);
private static Executor customExecutor = Executors.newFixedThreadPool(20);
@Test
public void stackOverflowTest() {
Observable<Character> a = createObservable('A', 100);
Observable<Character> b = createObservable('B', 150);
Observable<Character> c = createObservable('C', 500);
Observable<Character> d = createObservable('D', 200);
Observable<Character> e = createObservable('E', 300);
Observable<Character> f = createObservable('F', 400);
logger.info("BEGIN");
// As Observable for B is referred at two places in the graph, it needs to be cached to not to execute twice
Observable<Character> bCached = b.cache();
Observable.concat(
Observable.merge(
Observable.concat(
Observable.merge(a, bCached),
c),
Observable.concat(bCached, d, e)),
f)
.toBlocking()
.subscribe(i -> logger.info("Executed : " + i));
logger.info("END");
}
private Observable<Character> createObservable(char c, int sleepMs) {
Observable<Character> single = Observable.just(c)
.flatMap(i -> Observable.<Character> create(s -> {
logger.info("onSubscribe Start Executing : {}", i);
sleep(sleepMs);
s.onNext(Character.valueOf(i));
s.onCompleted();
}).subscribeOn(Schedulers.from(customExecutor)));
return single;
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
}
}
}
输出将是
22:19:22.107 [main] INFO rxtest.ReactiveDagTest BEGIN
22:19:22.181 [pool-1-thread-1] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : A
22:19:22.181 [pool-1-thread-2] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : B
22:19:22.284 [main] INFO rxtest.ReactiveDagTest Executed : A
22:19:22.333 [main] INFO rxtest.ReactiveDagTest Executed : B
22:19:22.333 [main] INFO rxtest.ReactiveDagTest Executed : B
22:19:22.333 [pool-1-thread-3] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : C
22:19:22.334 [pool-1-thread-4] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : D
22:19:22.534 [main] INFO rxtest.ReactiveDagTest Executed : D
22:19:22.534 [pool-1-thread-5] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : E
22:19:22.833 [main] INFO rxtest.ReactiveDagTest Executed : C
22:19:22.835 [main] INFO rxtest.ReactiveDagTest Executed : E
22:19:22.835 [pool-1-thread-6] INFO rxtest.ReactiveDagTest onSubscribe Start Executing : F
22:19:23.236 [main] INFO rxtest.ReactiveDagTest Executed : F
22:19:23.236 [main] INFO rxtest.ReactiveDagTest END
If a task execution was not successful (returned an error), its dependent tasks will not be executed.
这隐含在上述解决方案中,如果图中的任何节点失败,则图中的其他节点将不会执行。
关于java - 依赖图的响应式(Reactive)执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50990248/
我刚刚更新了 Ruby,现在我在尝试启动 compass 时遇到以下错误: Encoding::CompatibilityError on line ["28"] of /usr/local/Cell
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 6 年前。
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在尝试在我的 iOS 应用程序中开发可折叠/ Accordion 式的功能。这将是您可以在网站上找到的典型 FAQ 类型功能。我想点击标题,然后显示详细信息。 因为这是帮助部分,只有几个项目,我认
我正在尝试设计一个基于 REST 的 Web 服务来与我正在开发的农场动物管理系统进行交互。 为了详细说明问题,我收藏了动物 属于一个农场。每只动物都有自己的信息——例如姓名、身份证号、品种年龄等。因
我有 3 种不同的表单,其中复选框数量不同,每个部分基本上代表一个表单,因此当用户选择该部分中的复选框时,它会显示他们在该部分的总金额中 checkout 了多少 HTML
我有一份 32 页的 PDF 版家谱。与其将家谱全部放在一个非常大的 PDF 页面上(这是我想要的),不如将其格式化为一组 8 个单独的美国信纸大小的页面应该在整个宽度上缝合; 4 行这样就完成了树。
指SASS implementation for Java? : 在 Maven 目标编译包中自动编译 compass-style.org 样式表的最佳方法是什么? 我不想发送太多的自编译库,也不想通
鉴于以下 XAML... 我正在寻找一种绑定(bind) ComboBox、Button 和 Command 的方法,以便当 ComboBox 的值更改时,在 Command 上调用 CanExe
在玩具应用程序中,我有一个显示所有帖子标题的“帖子”模板。当您单击每个标题时,我不想直接进入“显示” View ,而是直接内联展开该帖子的其余内容。 我考虑过让 postRoute 重用 postsR
我需要一些使用 Twitter Bootstrap 或其他响应式框架的自定义 Swagger-UI 实现。需要在我的移动设备上使用这样的 UI 测试我的 API,但 swagger-ui 不能很好地扩
我正在做一个项目,我真的在尝试编写面向对象的 JavaScript 代码。我刚刚开始阅读Douglas Crockford's JavaScript: The Good Parts我很快开始意识到用
在 C# 中,我通过执行以下操作来加密文本数据(请注意我正在以 block ( block )的形式加密数据): public string EncryptData(string pu
我正在构建一个社交网站,该网站将向全世界公开 REST API (WCF WebAPI),以便任何开发人员都能够为该网站创建客户端应用程序、将其与其他服务集成等。 我想为 API 实现 Faceboo
我是一名优秀的程序员,十分优秀!