- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
Spring Boot
应用:
一个@RestController
接收以下有效负载:
{
"cartoon": "The Little Mermaid",
"characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}
我需要按以下方式处理:
转换 Controller 接收到的数据:使用在上一步中从“cartoon-characters”微服务接收到的适当 ID 替换角色名称。 {
"cartoon": "The Little Mermaid",
"characterIds": [1, 2, 3, 4]
}
使用转换后的数据向“cartoon-db”微服务发送 HTTP POST 请求。
我遇到的问题:
我需要使用 Reactive Programming
的范例来实现所有这些步骤(非阻塞\异步处理)与 Spring WebFlux
( Mono
| Flux
) 和 Spring Reactive WebClient
- 但我对该堆栈的经验为零,尽可能多地阅读它,加上大量谷歌搜索,但仍然有很多 Unresolved 问题,例如:
Q1。我已经配置了向“卡通人物”微服务发送请求的响应式 webClient:
public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
return WebClient.builder().baseUrl("http://cartoon-characters").build()
.get()
.uri("/character/{characterName}", characterName)
.retrieve()
.bodyToMono(Integer.class);
}
如您所见,我有一个卡通人物名称列表,我需要为每个人物调用 getCartoonCharacterIdbyName(String name)
方法,我不确定串行调用它的正确选项,相信正确的选项:并行执行。
写了下面的方法:
public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
.flatMap(this::getCartoonCharacterIdbyName);
return StreamSupport.stream(flux.toIterable().spliterator(), false)
.collect(Collectors.toList());
但我怀疑这段代码是否与 WebClient
并行执行以及代码调用 flux.toIterable()
会阻塞线程,所以在这个实现中我失去了非阻塞机制。
我的假设是否正确?
我需要如何将其重写为具有并行性和非阻塞性?
第二季度。在技术上是否有可能以响应式(Reactive)方式转换 Controller 接收到的输入数据(我的意思是用 ID 替换名称):当我们使用 Flux<Integer>
操作时characterIds,但不是 List<Integer>
characterIds?
Q3. 是否有可能在第 2 步 之后不仅获得转换后的 Data 对象,而且获得 Mono<> 可以在 Step 中被另一个 WebClient 使用3?
最佳答案
实际上,这是一个很好的问题,因为了解 WebFlux 或项目 react 器框架,在链接微服务时需要几个步骤。
首先是意识到一个WebClient
应该接受一个发布者并返回一个发布者。将此推断为 4 种不同的方法签名以帮助思考。
当然,在所有情况下,它只是 Publisher->Publisher,但在您更好地理解之前先保留它。前两个很明显,您只需使用 .map(...)
处理流程中的对象,但您需要学习如何处理后两个。如上所述,从 Flux->Mono 可以用 .collectList()
完成, 或者也用 .reduce(...)
.从 Mono->Flux 似乎通常用 .flatMapMany
完成或 .flatMapIterable
或者它的一些变体。可能还有其他技术。你不应该使用 .block()
在任何 WebFlux 代码中,如果您尝试这样做,通常会出现运行时错误。
在你的例子中你想去
如你所说,你想要
第二部分是了解链接流。你可以做
这会链接 p1->p2->p3,但我总是发现制作一个“服务层”更容易理解。
这段代码更易于阅读和维护,并且随着时间的推移,您会逐渐理解该声明的值(value)。
我在您的示例中遇到的唯一问题是执行 Flux<String>
与 WebClient
作为 @RequestBody
.不起作用。参见 WebClient bodyToFlux(String.class) for string list doesn't separate individual values .除此之外,它是一个非常简单的应用程序。当你调试它时,你会发现它到达了 .subscribe(System.out::println)
。在它到达 Flux<Integer> ids = mapNamesToIds(fn)
之前的行线。这是因为流程是在执行之前设置的。需要一段时间才能理解这一点,但这是项目 react 堆框架的重点。
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")}
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}
还有:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}
关于spring-boot - 响应式(Reactive)编程 : Spring WebFlux: How to build a chain of micro-service calls?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60858717/
类型‘AbstractControl’上不存在属性‘Controls’。
主要是我很好奇。 我们有一个名为 Unit 的对象在我们的代码库中 - 代表桥梁或道路的组件。在我们的例子中,看到带有 Unit 的 ReactiveUI 命令可能会模棱两可。作为声明中的泛型之一。
我一直听说六边形架构必须与任何框架无关,并使用接口(interface) (SPI) 来委托(delegate)不属于业务层的每个代码部分。 但是如何在不使用额外框架的情况下通过六边形架构创建一个响应
我读了 Reactive Manifesto . 但我无法理解 event driven architectures 之间的核心差异和 message driven architectures .结果
申请要求: 订阅两个事件流 A 和 B 对于每个 A 事件,一段时间后应该有相应的 B 事件 如果没有相应的 B 到达(及时),应用程序会监视 A 事件并发出警报 B 事件可以以与 A 事件不同的顺序
Closed. This question is opinion-based。它当前不接受答案。 想改善这个问题吗?更新问题,以便editing this post用事实和引用来回答。 4年前关闭。
我有一个 ViewModel,它在其初始化程序中有一个输入 init(sliderEvents: Reactive) { 在测试中我想做类似的事情 slider.send(.touchDownInsi
经典实时搜索示例: var searchResults = from input in textBoxChanged from results in GetDa
我有一个响应式(Reactive)管道来处理传入的请求。对于每个请求,我需要调用一个与业务相关的函数 ( doSomeRelevantProcessing )。 完成后,我需要将发生的事情通知一些外部
是否可以为响应式扩展实现基于硬件计时器的自定义调度程序?我该如何开始,有什么好的例子吗? 我有一个硬件可以每毫秒向我发送一个准确的中断。我想利用它来创建更精确的 RX 调度程序。 更新 感谢 Asti
我正在通过网络浏览 Rx 框架 Material ,我发现了很多。 现在,每当我为此在 google 上搜索时,我还会在 wikipedia 链接中找到“响应式(Reactive)编程”。 由于响应式
关闭。这个问题是opinion-based .它目前不接受答案。 想改进这个问题?更新问题,以便 editing this post 可以用事实和引用来回答它. 6年前关闭。 Improve this
SignalR 与响应式扩展是同一回事吗?你能解释一下为什么或为什么不吗? 最佳答案 不,它们绝对不是同一件事。 Reactive Extensions 是一个用于创建和组合可观察的数据流或事件流的库
我知道有一种简单的方法可以做到这一点 - 但今晚它打败了我...... 我想知道两个事件是否在 300 毫秒内发生,就像双击一样。 在 300 毫秒内单击两次左键鼠标 - 我知道这是构建响应式(Rea
我们的应用程序使用 Reactive Extensions (Rx)。这些通常通过 Microsoft 的可下载包安装。但是,当我们发布应用程序时,我们会提供 dll 的副本(即 System.Cor
我想了解 Reactive 和 ReactiveStreams 之间的区别,特别是在 RxJava 的上下文中? 我能想到的最多的是 Reactive Streams 在规范中有一些背压的概念,但它已
我想探索来自 Quarkus 的响应式 REST 客户端的慢速后端,并在他们建议的样本 (https://github.com/quarkusio/quarkus-quickstarts/tree/m
假设我有一个存储桶,我需要从中获取日期早于现在的文档。 该文档如下所示: { id: "1", date: "Some date", otherObjectKEY: "key1" } 对于每个文档,我
我有一个 RIA 服务数据服务,它有几个函数调用,如下所示: public InvokeOperation SomeFunc( SomeData data, Action> callb
我一直在使用 Rx 在单个应用程序中创建事件总线(想想 CQRS/ES),它似乎工作得很好。然而,在调查了一堆不同的事件溯源框架之后,我还没有看到使用过一次 Rx。与基于反射/容器的调度程序相比,它似
我是一名优秀的程序员,十分优秀!