gpt4 book ai didi

java - 来自无限 java 流的通量端点

转载 作者:行者123 更新时间:2023-12-02 10:22:26 26 4
gpt4 key购买 nike

我在处理由 Stream.generate 构建的助焊剂时遇到问题构造。

Java 流正在从远程源获取一些数据,因此我实现了一个嵌入了数据获取逻辑的自定义供应商,然后用它来填充流。

Stream.generate(new SearchSupplier(...))

我的想法是检测一个空列表并使用 takeWhile 的 Java9 功能->

Stream.generate(new SearchSupplier(this, queryBody))
.takeWhile(either -> either.isRight() && either.get().nonEmpty())

(使用 Vavr 的 Either 构造)

存储库层通量将执行以下操作:

return Flux.fromStream (
this.searchStream(...) //this is where the stream gets generated
)
.map(Either::get)
.flatMap(Flux::fromIterable);

“服务”层由通量上的一些转换步骤组成,但方法签名类似于 Flux<JsonObject> search(...)

最后,controller层有一个GetMapping:

@GetMapping(produces = "application/stream+json")
public Flux search(...) {
return searchService.search(...) //this is the Flux<JsonObject> parth
.subscriberContext(...) //stuff I need available during processing
.doOnComplete(() -> log.debug("DONE"));
}

我的问题是 Flux 似乎永远不会终止。例如,从 postman 调用电话只是在响应部分中拍摄“正在加载...”部分。当我从 IDE 终止该进程时,结果会刷新到 postman ,我会看到我所期望的结果。 doOnComplete lambda 也永远不会被调用

我注意到,如果我更改 Flux 的来源:

Flux.fromArray(...) //harcoded array of lists of jsons

doOnComplete lambda 被调用,http 连接也关闭,结果显示在 postman 中。

知道可能是什么问题吗?

谢谢。

最佳答案

您可以使用如下所示的代码直接创建 Flux。请注意,我添加了一些假定的方法,您需要根据您的 SearchSupplier 的工作方式来实现这些方法:

Flux<SearchResultType> flux = Flux.generate(
() -> new SearchSupplier(this, queryBody),
(supplier, sink) -> {
SearchResultType current = supplier.next();
if (isNotLast(current)) {
sink.next(current);
} else {
sink.complete();
}
return supplier;
},
supplier -> anyCleanupOperations(supplier)
);

关于java - 来自无限 java 流的通量端点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54239928/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com