- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试了解响应式编程的真正工作原理。为此,我准备了简单的演示:Spring Framework 中的响应式(Reactive) WebClient
将请求发送到简单的 REST API,并且该客户端在每个操作中打印线程的名称。
其余API:
@RestController
@SpringBootApplication
public class RestApiApplication {
public static void main(String[] args) {
SpringApplication.run(RestApiApplication.class, args);
}
@PostMapping("/resource")
public void consumeResource(@RequestBody Resource resource) {
System.out.println(String.format("consumed resource: %s", resource.toString()));
}
}
@Data
@AllArgsConstructor
class Resource {
private final Long id;
private final String name;
}
还有最重要的 - 响应式 Web 客户端:
@SpringBootApplication
public class ReactorWebclientApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorWebclientApplication.class, args);
}
private final TcpClient tcpClient = TcpClient.create();
private final WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.baseUrl("http://localhost:8080")
.build();
@PostConstruct
void doRequests() {
var longs = LongStream.range(1L, 10_000L)
.boxed()
.toArray(Long[]::new);
var longsStream = Stream.of(longs);
Flux.fromStream(longsStream)
.map(l -> {
System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
return new Resource(l, String.format("name %s", l));
})
.filter(res -> {
System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
return !res.getId().equals(11_000L);
})
.flatMap(res -> {
System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
return webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
.doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
.doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
})
.blockLast();
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
private final Long id;
private final String name;
@JsonCreator
Resource(@JsonProperty("id") Long id, @JsonProperty("name") String name) {
this.id = id;
this.name = name;
}
Long getId() {
return id;
}
String getName() {
return name;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Resource{");
sb.append("id=").append(id);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}
问题是行为与我的预测不同。
我预计 .map()
、.filter()
和 .flatMap()
的每次调用都将在 上执行>main
线程,每次调用 .doOnSuccess()
或 .doOnError
都将在 nio 线程池的线程上执行。所以我期望日志看起来像:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)
但是我得到的日志是:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------
并且.map()
、.filter()
和.flatMap()
中的每个下一个日志都是在reactor-的线程上完成的http-nio。
下一个令人费解的事实是主线程和reactor-http-nio上执行的操作之间的比例总是不同的。有时,所有操作 .map()
、.filter()
和 .flatMap()
都在主线程上执行。
最佳答案
Reactor 与 RxJava 一样,可以被认为是与并发无关的。也就是说,它不强制执行并发模型。相反,它让你(开发人员)来指挥。但是,这并不妨碍该库帮助您处理并发性。
获得Flux
或Mono
并不一定意味着它在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身在进行 subscribe()
调用的线程上运行。
Project Reactor相关文档可以找到here .
从您的代码中,得到以下代码片段:
webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
导致线程从main切换到netty的工作池。之后,以下所有操作均由netty工作线程执行。
如果您想控制此行为,您应该在代码中添加 publishOn(...)
语句,例如:
webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
.publishOn(Schedulers.elastic())
这样,接下来的任何操作都将由弹性调度程序线程池执行。
另一个例子是使用专用调度程序来处理 HTTP 请求执行后的繁重任务。
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;
@ExtendWith({
WiremockResolver.class,
WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {
private static int concurrency = 1;
private final WebClient webClient = WebClient.create();
@Test
public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {
String requestUri = "/slow-response";
server.stubFor(get(urlEqualTo(requestUri))
.willReturn(aResponse().withStatus(200)
.withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
);
Flux
.generate(() -> Integer.valueOf(1), (i, sink) -> {
System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
sink.next(i);
return i + 1;
})
.subscribeOn(Schedulers.single())
.flatMap(i ->
executeGet(uri + requestUri)
.publishOn(Schedulers.elastic())
.map(response -> {
heavyTask();
return true;
})
, concurrency)
.subscribe();
blockForever();
}
private void blockForever() {
Object monitor = new Object();
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException ex) {
}
}
}
private Mono<ClientResponse> executeGet(String path) {
System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
return webClient
.get()
.uri(path)
.exchange();
}
private void heavyTask() {
try {
System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
} catch (InterruptedException ex) {
}
}
}
关于java - Reactor - 了解 .flatMap() 中的线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57079395/
我有一个对象Foo,其中包含Bar列表。这些类的描述如下: class Foo { String name; List bars = new ArrayList(); Foo(
根据 Mozilla 开发者网站: flatMap() 方法首先使用映射函数映射每个元素,然后将结果展平到一个新数组中。它与 map 后跟深度为 1 的 flat 相同,但 flatMap 通常非常有
我对无法找到该问题的现有问题感到非常惊讶。这是为什么,鉴于: val p: Int => Option[Int] = Some(_) List(1, 2, 3).flatMap(p) 我得到: :14
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 5 年前。 Improve th
我想知道两种平面映射情况之间是否存在显着差异。 案例 1: someCollection .stream() .map(CollectionElement::getAnotherCol
以下是flatMap的定义取自 scala.util.Success。 final case class Success[+T](value: T) extends Try[T] { def fl
我正在寻找一个函数来展平列表数组。首先,我在 RDD 系统上使用 Apach Spark 函数 flatMap 实现了我的解决方案,但我想在本地执行此操作。但是,我无法找到 的等价物 samples
我想知道是否存在忽略 flatMap 中的结果的函数(在 scala 或 cat 中) .例如。 Some("ignore this").ignoreArgumentFlatMap(Some("res
我正在学习 Scala 并解决了 99 个 Scala 问题。对于以下练习: 展平嵌套列表结构。示例: scala> flatten(List(List(1, 1), 2, List(3, List(
当编译器进入无限循环时,是否有人遇到过使用此类 flatMap 链(或什至更长)的问题。 let what = Future.init { (promise) in promise(.succ
有没有更好的函数方式来写 flatMap ? def flatMap[A,B](list: List[A])(f: A => List[B]): List[B] = list.map(x =>
我试图从两个 中变出笛卡尔积潜在无限然后我通过 limit() 限制的流. 到目前为止,这(大约)是我的策略: @Test void flatMapIsLazy() { Stream.
为什么以下声明对 .map() 有效但不适用于 .flatMap() ? val tupled = input.map(x => (x*2, x*3)) //Compilation error:
我正在寻找可以同时映射和展平 Lists 和 Maybes 的代码。我在 this topic 中发现了这样一个 flatMap 函数: flatMap :: (t -> [a]) -> [t] ->
考虑在某些大小写匹配上编写的 flatMap。例如: list.flatMap( v => v match { case Cond1 => if(something) Some
我无法使用ListKOf平面映射T -> Option。 例如 listOf(1,2,3).k().flatMap { i -> if (i % 2 == 0) Some(i) else None
有人可以解释我如何在RxJava中通过flatMap运算符传递onComplete信号吗? 如果对flatMap运算符进行注释,则可以获取1到10的数字列表,这意味着toList将收到onComple
我正在做一个在线类(class)并误读了一个问题(这就是为什么我认为可以发布这个问题,因为答案与类(class)中的问题无关!)。 data class Trip( val drive
给定作为数据类的二维坐标列表 data class Point(val x: Int, val y:Int) val points: List 和 TornadoFX(Kotlin 中的 JavaFX
这个问题已经有答案了: What is the difference between .stream() and Stream.of? (5 个回答) 已关闭 3 年前。 我有以下代码: List p
我是一名优秀的程序员,十分优秀!