gpt4 book ai didi

java - 终止事件总线并获取结果

转载 作者:行者123 更新时间:2023-11-30 06:46:36 24 4
gpt4 key购买 nike

假设我有两个 verticle 正在发现特殊文件名(例如,可以是任何名称)并将它们发布到事件总线,例如一个是从 REST api 读取名称,另一个是从文件系统读取名称:

ScanRestVerticle.java

/**
* Reads file names through an REST API
*/
public class ScanRestVerticle extends AbstractVerticle {

@Override
public void start() throws Exception {
HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");

req.toObservable()
.flatMap(HttpClientResponse::toObservable)
.lift(unmarshaller(Model.class))
.subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
req.exceptionHandler(Throwable::printStackTrace);
req.end();
}
}

ScanFsVerticle.java

/**
* Reads file names from a file
*/
public class ScanFsVerticle.java extends AbstractVerticle {

@Override
public void start() throws Exception {
StringObservable.byLine(vertx.fileSystem()
.rxReadFile("myFileNames.txt")
.map(Buffer::toString)
.toObservable())
.subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
}
}

这里一切都很好,但是,现在,我有一个 verticle,它将来自事件总线的这些名称组合起来并将它们打印在 STD.OUT 上:

PrintVerticle.java

public class PrintVerticle extends AbstractVerticle {

@Override
public void start() throws Exception {
vertx.eventBus()
.<JsonObject>consumer("address")
.bodyStream()
.toObservable()
.reduce(new JsonArray(), JsonArray::add)
.subscribe(j -> System.out.println(j.toString()));
}

问题是这里的reduce从未真正完成,因为事件总线正如我所想的那样正在产生无限流。

那么我如何实际完成此操作并打印两个 verticle 发布的名称?

注意:我对 vert.x 和 rx 非常陌生,我可能会遗漏一些部分或出现问题,所以请不要评判:)

提前致谢。

编辑:我可以在这里调用 scan() 而不是 reduce() 来获得中间结果,但是我如何获得 scan().last ()

最佳答案

您假设正确,reduce() 运算符依赖 onComplete() 事件来知道在哪里停止收集。
scan() 是累加器,它为源流的每次新发射发出累积值,因此 scan 将为您提供中间结果。
这里的问题是,您使用的是 EventBus,从概念上讲,EventBus 是无限流,因此永远不应该调用 onComplete()。从技术上讲,当您 close() `EventBus 时可能会调用它,但我想您不应该也不想依赖它。

  • 如果您确实想使用 EventBus,则应该以某种方式结束您的 PrintVerticle 流,这样您就会有 onComplete() 事件。例如,您可以使用 EventBus 来发布一些事件,这些事件会发布 2 个观察到的顶点中的每一个端点。然后您可以对其进行压缩,并在发出这 2 个事件后在 PrintVerticle 处结束流(使用 takeUntil())。
    像这样的事情:

    Observable vertice1EndStream = vertx.eventBus()
    .<JsonObject>consumer("vertice1_end")
    .bodyStream()
    .toObservable();

    Observable vertice2EndStream = vertx.eventBus()
    .<JsonObject>consumer("vertice2_end")
    .bodyStream()
    .toObservable()

    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object());

    vertx.eventBus()
    .<JsonObject>consumer("address")
    .bodyStream()
    .toObservable()
    .takeUntil(bothVerticesEndStream)
    .reduce(new JsonArray(), JsonArray::add)
    .subscribe(j -> System.out.println(j.toString()));
  • 另一个选项是直接使用 2 个流而不是 EventBus。将它们合并在一起,然后使用 reduce(),因为这 2 个流是有限的,所以不会有问题。

关于java - 终止事件总线并获取结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43582657/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com