- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
by emanjusaka from https://www.emanjusaka.top/archives/4 彼岸花开可奈何 本文欢迎分享与聚合,全文转载请留下原文地址.
Reactor 是一个响应式编程的基础类库,其中有两个很关键的类:Flux 和 Mono。掌握这两个类和相关概念有助于我们学习响应式编程.
Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者.
具有 rx 运算符的响应式流发布器,发出 0 到 N 个元素,然后完成(成功或有错误).
下图显示了 Flux 如何转换项目:
Flux 是一个标准的Publisher ,表示一个异步的0到N个发出的项目序列,可选择终止于完成信号或错误信号。根据Reactive Streams规范,这三种类型的信号转换为对下游Subscriber的onNext、onComplete和onError方法的调用。 由于可能出现的信号范围很大,Flux是通用的响应式类型。请注意,所有事件,包括终止事件,都是可选的:没有onNext事件但有onComplete事件表示一个空的有限序列,但如果去掉onComplete,则得到一个无限的空序列(除了用于取消测试之外,不是特别有用)。同样,无限序列不一定为空。例如,Flux.interval(Duration)会生成一个无限的Flux ,从时钟发出定期的滴答声。Flux 是标准的 Publisher ,它表示 0 到 N 个发出项的异步序列,可以选择由完成信号或错误终止。与 Reactive Streams 规范中一样,这三种类型的信号转换为对下游订阅者的 onNext、onComplete 和 onError 方法的调用.
凭借如此大范围的可能信号,Flux 是通用的无功类型。请注意,所有事件,甚至终止事件,都是可选的:没有 onNext 事件,但 onComplete 事件表示一个空的有限序列,但删除 onComplete 并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列不一定是空的。例如, Flux.interval(Duration) 生成无限的 Flux 并从时钟发出规则的滴答声.
具有基本 rx 运算符的 Reactive Streams Publisher 通过 onNext 信号最多发出一项,然后以 onComplete 信号终止(成功的 Mono,有或没有值),或者仅发出单个 onError 信号(失败的 Mono).
下图显示了 Mono 如何转换项目:
Mono 是一种特殊的Publisher ,通过onNext信号发出最多一个项目,然后通过onComplete信号终止(成功的Mono,有或没有值),或者只发出一个onError信号(失败的Mono)。 大多数Mono实现在调用onNext后立即调用其Subscriber的onComplete。Mono.never()是一个例外:它不发出任何信号,在技术上并不禁止,但在测试之外没有太大用处。另一方面,明确禁止使用onNext和onError的组合。 Mono只提供了Flux可用的操作符的子集,而某些操作符(特别是将Mono与另一个Publisher组合的操作符)会切换到Flux。例如,Mono#concatWith(Publisher)返回一个Flux,而Monothen(Mono)返回另一个Mono。 请注意,您可以使用Mono来表示只有完成概念的无值异步过程(类似于Runnable)。要创建一个,您可以使用一个空的Mono .
创建一个Flux,发出一系列字符串元素并订阅打印出来:
package top.emanjusaka;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.just("Hello", "emanjusaka", "!");
flux.subscribe(System.out::println);
}
}
// 输出
Hello
emanjusaka
!
创建一个Mono,发出一个字符串元素并订阅打印出来:
package top.emanjusaka;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
Mono<String> mono = Mono.just("Hello");
mono.subscribe(System.out::println);
}
}
// 输出
Hello
使用Flux的操作符进行元素转换和过滤:
package top.emanjusaka;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10);
numbers.map(num -> num * 2)
.filter(num -> num % 3 == 0)
.subscribe(System.out::println);
}
}
// 输出
6
12
18
使用Mono的操作符进行元素转换和错误处理:
package top.emanjusaka;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
Mono<Integer> number = Mono.just(5);
number.map(num -> num * 2)
.doOnError(Throwable::printStackTrace)
.subscribe(System.out::println);
}
}
// 输出
10
Flux 和 Mono 都是位于 reactor.core.publisher 包下的类.
Reactor中的Flux和Mono是用于实现响应式编程的两种基本类型:
这两种类型都是Publisher的实现,遵循Reactive Streams规范,并可以与其他响应式库和框架进行互操作.
Flux和Mono都可以表示无限序列,也可以表示空序列。它们提供了丰富的操作符来处理和转换序列,例如映射、过滤、合并、扁平化等。此外,它们还支持异步和并发处理,可以与其他操作符和操作进行组合使用.
总的来说,Flux适用于处理多个项目的情况,而Mono适用于处理单个项目的情况。它们是Reactor中用于实现响应式编程的基本类型,提供了丰富的操作符和功能来处理和转换异步序列.
本文原创,才疏学浅,如有纰漏,欢迎指正。尊贵的朋友,如果本文对您有所帮助,欢迎点赞,并期待您的反馈,以便于不断优化.
原文地址: https://www.emanjusaka.top/archives/4 。
微信公众号:emanjusaka的编程栈 。
最后此篇关于响应式编程——初识Flux和Mono的文章就讲到这里了,如果你想了解更多关于响应式编程——初识Flux和Mono的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在尝试构建一个应用程序 A(如适配器),它将: 1)接收带有某些 key (JSON格式)的POST请求 2) 它应该以某种方式修改该 key 并向另一个系统 B 创建 POST 请求。 3)应用
除了语义之外,是否有任何理由为 View 和服务器操作创建不同的调度方法?我见过的所有教程和示例(最值得注意的是 this )在监听已分派(dispatch)的有效负载时完全忽略源常量,以支持打开有效
我的密码 https://gist.github.com/ButuzGOL/707d1605f63eef55e4af 因此,当我收到登录成功回调时,我想进行重定向, 重定向也可以通过调度程序进行。 我
我试图找出使用 Flux 架构处理中型复杂应用程序中相当常见的情况的最佳方法是什么,当组成数据的模型之间存在依赖关系时如何从服务器检索数据。例如: 商店网络应用程序,具有以下模型: 购物车(用户可以拥
我有点坚持一项琐碎的任务:每当我使用响应式 spring WebClient 查询外部 API 或查询响应式 MongoDBRepository 时,我想记录有多少实体通过了我的通量,例如。记录消息,
我有这种情况。我有一个分页 API,它为我提供了过去 12 个月的数据。 API 的响应如下: public class PagedTransfersDto { private List cont
我有两个 Flux,一个用于成功元素,另一个用于保存错误元素 Flux success= Flux.just("Orange", "Apple", "Banana","Grape", "Strawbe
我现在正在使用 Flux .我想创建一个 Flux来自两个不同的对象 Flux .我知道我必须使用 BiFunction但我不知道怎么办。第一个对象对第一个对象有 PK,第二个 FK。我想压缩 PK=
我们正在重构一个大型 Backbone 应用程序以使用 Flux 来帮助解决一些紧密耦合和事件/数据流问题。但是,我们还没有弄清楚如何处理需要知道特定 ajax 请求状态的情况 当 Controlle
作为主题,Flux.concatMapIterable 和 Flux.flatMapIterable 都不会根据大理石图交错,这与 Flux.concatMap 和 Flux.flatMap 其中 f
我有一个flux这是由 Iterable 构建的8 个元素 ( Flux.fromIterable(..) )。对于每个通量排放,我想异步调用一个方法。我尝试了各种方法 dispatchOn和publ
Mono mono1 = repository.get(id); // data from reactive mongo Flux availabilityInfo = getAvailability
使用同构应用程序设置应用程序初始状态的一般做法是什么?如果没有 Flux,我会简单地使用类似的东西: var props = { }; // initial state var html = Reac
你好,我的代码是这样的: fun mapBatch(batch: List): Mono> ... fun myFun(stream: Flux): Flux { return stream
所以我从文档中了解到,并行 Flux 本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。因此,让我们考虑这样的情况。所有这些都将在通过 runOn() 方法提
我一直看到使用 flatMap 的例子对于 1 对 1 操作,例如: Flux.just("a", "b", "c") .flatMap(s -> Mono.just(s.toUpperCas
Flux.create 和有什么区别和 Flux.generate ?我正在寻找 - 最好是使用示例用例 - 了解我何时应该使用其中一个。 最佳答案 简而言之: Flux::create doesn'
我想发布 key 列表,但仅限于修改 key 时。 通过以下内容,即使没有任何更改,它也会以无限循环的方式发布值 RedisReactiveCommands commands = connec
我正在尝试从我从服务器获得的许多项目中实现无限滚动,但我找不到任何适当的方法来保持通量架构设计规则。 想法是:在第一次加载时,我从服务器获取完整的项目列表(只有 id),然后使用 ajax 每次获取
我有端点:/upstreams 它将返回以下格式的 Json: { "next" : "String", "data" : [ { "id" : "String",
我是一名优秀的程序员,十分优秀!