gpt4 book ai didi

java - Flux 不等待 'then' 之前的元素完成

转载 作者:行者123 更新时间:2023-12-02 02:47:21 26 4
gpt4 key购买 nike

我不明白这个问题,我不确定我做错了什么。

我想等待Flux结束然后返回serverResponseMono

我附上了代码片段,doOnNext 将填充 categoryIdToPrintRepository

我查看了如何在 flux 结束后返回单声道并找到了“then”,但“then”方法仍然在处理 onNextSite 之前执行,这导致错误:

java.lang.IllegalArgumentException: 'producer' type is unknown to ReactiveAdapterRegistry

我做错了什么?

 public Mono<ServerResponse> retrieveCatalog(ServerRequest ignored) {
return Mono.just("start").flatMap(id ->
Flux.fromIterable(appSettings.getSites())
.subscribeOn(ForkJoinPoolScheduler.create("SiteCatalogScheduler"))
.doOnNext(this::onNextSite)
.then(Mono.from(ServerResponse.ok().body(categoryIdToPrintRepository.getSortedTreeValues(), String.class))));

}

private void onNextSite(Integer siteId) {
IntStream.range(1, appSettings.getCatalogMaxValue()).parallel().forEach(catalogId -> {
Optional<SiteCatalogCategoryDTO> cacheData =
siteCatalogCacheUseCaseService.getSiteCatalogResponseFromCache(siteId, catalogId);
cacheData.ifPresentOrElse(siteCatalogCategoryDTO -> {/*do nothing already exist in cache*/},
() -> {
Mono<SiteCatalogCategoryDTO> catalogCategoryDTOMono = WebClient.create(getUri(siteId, catalogId))
.get().retrieve().bodyToMono(SiteCatalogCategoryDTO.class);
catalogCategoryDTOMono.subscribe(siteCatalogCategoryDTO ->
handleSiteServerResponse(siteCatalogCategoryDTO, siteId, catalogId));
});
});
}


private void handleSiteServerResponse(SiteCatalogCategoryDTO siteCatalogCategoryDTO,
int siteId, int catalogId) {
if (siteCatalogCategoryDTO.getResponseStatus().equals(ResponseStatus.SUCCESS))
Flux.fromIterable(siteCatalogCategoryDTO.getMappingList())
.subscribe(mapSCC -> {
categoryIdToPrintRepository.insertIntoTree(mapSCC.getCategoryId(),
"Site " + siteId + " - Catalog " + catalogId + " is mapped to category " + "\"" +
mapSCC.getCategoryName() + "\" (" + mapSCC.getCategoryId() + ")");
siteCatalogCacheUseCaseService.insertIntoSiteCatalogCache(siteId, catalogId, siteCatalogCategoryDTO);
});
}

最佳答案

你做错了几件你不应该做的事subscribe在您的应用程序中,并且您有 void 方法,除非在特定位置,否则不应在响应式(Reactive)编程中使用这些方法。

这里是一些示例代码:


// Nothing will happen, we are not returning anything, we can't subscribe
private void doSomething() {
Mono.just("Foo");
}

// complier error
doSomething().subscribe( ... );

您的申请是 publisher调用客户端是订阅者,这就是为什么我们将 Mono 或 Flux 返回给调用客户端,他们 subscribe .

你是这样解决的:

private void doSomething() {
Mono.just("Foo").subscribe( ... );
}

doSomething();

现在您正在订阅自己以运行,这不是正确的方法,如前所述,调用客户端是订阅者,而不是您。

正确的做法:

private Mono<String> doSomething() {
return Mono.just("Foo");
}

// This is returned out to the calling client, they subscribe
return doSomething();

当 Mono/Flux 完成时,它会发出一个信号,这个信号将触发链中的下一个、下一个和下一个。

所以我认为您需要做的事情如下:

  • 删除所有 subscribes ,如果你想做一些事情,有类似的功能,flatmap , map , doOnSuccess等等。保持链条完好无损,一直到客户。
  • 删除所有void 函数,确保它们返回FluxMono如果您想返回,请返回 Mono<Void>通过使用 Mono.empty()功能,使链条完整。

一旦您使用了 Mono/Flux,您就需要处理返回,以便其他人可以继续使用。

更新:

为了then要触发,你必须返回一些东西,它会在前一个 mono/flux 完成时返回。

例子:

private Flux<String> doSomething() {
return Flux.just("Foo", "Bar", "FooBar")
.doOnNext(string -> {
return // return something
});
}

// Ignore what was return from doSomething and return something else when the flux has completed (so only trigger on the completed signal from the flux)
return doSomething().then( ... );

关于java - Flux 不等待 'then' 之前的元素完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62591575/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com