gpt4 book ai didi

java - 如何在没有嵌套订阅的情况下组合/链接包含不同数据类型的多个 Mono/Flux

转载 作者:行者123 更新时间:2023-12-02 08:40:39 26 4
gpt4 key购买 nike

我们正在使用project-reactor从外部网络服务检索一些数据并生成一堆结果对象。

首先,我们需要获取触发下一个 Web 服务调用所需的一些主数据。主数据可用后,我们根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建结果对象。

我们在 react 流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想做的事情。

问题1

Masterdata_A 和 Masterdata_B 可以并行获取,但如何以响应式(Reactive)方式表达而不嵌套?getFluxMasterdata_B 的每个结果都应与 getMonoMasterdata_A 的一个结果合并。

问题2

包含两个主数据的 Tupel 应该以某种方式受到限制,以免大量数据请求淹没 Web 服务。 1 秒的实际延迟只是一个猜测,似乎可行,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待 Web 服务调用。

问题3

将来我们可能必须从网络服务获取更多数据来构建ProcessingResult。是否有最佳实践来定义 react 流以保持其可读/可理解? react 流的嵌套可以吗?还是应该避免(将所有内容都放在顶层)?

<小时/>

域模型

    private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}

private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }

private static class ProcessingResult { /* ... business relevant fields */ }

WebserviceImpl

    private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }

private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}

BusinessServiceImpl

    public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();

// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}

最佳答案

Question 1

Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)

Question 2

    Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)

Question 3

看看这个 https://github.com/reactor/reactive-streams-commons/issues/21

complete example

 Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)
.map(data -> {
// for the mapping u don't need flatmap because it's an expensive operation
// map is the right choice
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();

// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ???;
return result;
})
// it's always better to save in batch
// 100 is a random value u should put a value that most suitable for your datasource
.bufferTimeout(100, Duration.ofMillis(100))
.concatMap(processingResults -> {
return batchSave(processingResults)
// because batchSave is blocking op
.subscribeOn(Schedulers.boundedElastic());
})
.subscribe();

关于java - 如何在没有嵌套订阅的情况下组合/链接包含不同数据类型的多个 Mono/Flux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61409185/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com