gpt4 book ai didi

java - 如何将通量链接到另一个通量/单声道并施加另一个背压?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:07:15 25 4
gpt4 key购买 nike

我有以下在 react 器核心中使用通量的响应式(Reactive)代码:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();

如您所见,我对流程的外部源进行了背压处理 (FluxSink.OverflowStrategy.LATEST)。但是,我还想为我的进程配置背压到 redis (redisHashReactiveCommands.hmset(key, map)),因为它可能是比我的进程的外部源更大的瓶颈。我预计我需要为 redis 部分创建另一个 flux 并将其与该 flux 链接,但我该如何实现这一点,因为 .flatMap 适用于单个项目而不是项目流?

此外,我也想将相同的发射项存储到 Kafka 中,但是链接 flapMap 似乎不起作用.. 有没有一种简单的方法可以在一组函数调用中将所有这些链接在一起(外部源 -> 我的进程,我的进程 -> redis,我的进程 -> kafka)?

最佳答案

如果您对主序列中的结果对象不感兴趣,您可以将flatMap 中的两个保存结合起来。您必须移动 subscribeOn 并登录 flatMap 以及将它们放在内部保存发布者上:

Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),

kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();

或者,如果您确定两个进程都会发出一个结果元素或一个错误,您可以通过替换 when< 将这两个结果合并到一个 Tuple2zip

关于java - 如何将通量链接到另一个通量/单声道并施加另一个背压?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53138423/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com