gpt4 book ai didi

java - 是否可以使用 Project Reactor 等待事件而不阻塞线程?

转载 作者:行者123 更新时间:2023-11-30 05:32:59 27 4
gpt4 key购买 nike

Project Reactor 是否可以在单声道中等待事件/条件,而无需为每个单声道使用阻塞线程?与 CompletableFuture我可以完成这样的事情,但我不知道如何使用 Project Reactor 来做到这一点。

我的问题是我需要将请求与响应关联起来。响应时间差异很大,有些甚至永远不会得到回复和超时。在客户端,每个请求的阻塞线程不是问题,但由于这是一个服务器应用程序,我不想最终为每个请求生成一个阻塞等待响应的线程。

API 看起来像这样:

Mono<Response> doRequest(Mono<Request> request);

由于我不知道如何使用 Reactor 来做到这一点,我将解释如何使用 CompletableFuture 来做到这一点澄清我在寻找什么。 API 看起来像这样:

CompletableFuture<Response> doRequest(Request request);

当调用者调用时,会向服务器发出请求,其中包含由此方法生成的相关 ID。调用者返回 CompletableFuture并且该方法存储对此 CompletableFuture 的引用在以相关 ID 作为键的映射中。

还有一个线程(池),用于接收服务器的所有响应。当它收到响应时,它会从响应中获取相关 ID,并使用它在映射中查找原始请求(即 CompletableFuture )并调用 complete(response); 。就在上面。

在此实现中,您不需要每个请求都有一个阻塞线程。这基本上更像是一种 Vert.X/Netty 的思维方式?我想知道如何使用 Project Reactor 来实现这样的事情(如果可能的话)。

编辑 2019 年 7 月 25 日:

根据评论中的要求来澄清我所得到的内容,下面是我如何使用 CompleteableFuture 实现这一点的示例。的。

我还注意到我犯了一个可能相当令人困惑的错误:在 CompletableFuture 中例如我通过了 Mono作为论证。这应该只是一个“正常”的争论。我很抱歉,我希望我没有让人们对此感到太困惑。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

/**
* This example shows how to implement correlating requests with responses without needing a (sleeping)
* thread per request to wait for the response with the use of {@link CompletableFuture}'s.
*
* So the main feat of this example is that there is always a fixed (small) number of threads used even if one
* would fire a thousands requests.
*/
public static void main(String[] args) throws Exception {

RequestResponseService requestResponseService = new RequestResponseService();

Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";

CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

// The blocking call here is just so the application doesn't exit until the demo is completed.
responseFuture.get();
}

static class RequestResponseService {

/** The key in this map is the correlation ID. */
private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();

CompletableFuture<Response> doRequest(Request request) {
Response response = new Response();
response.correlationId = request.correlationId;
CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
responses.put(response.correlationId, reponseFuture);

doNonBlockingFireAndForgetRequest(request);

return reponseFuture;
}

private void doNonBlockingFireAndForgetRequest(Request request) {
// In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
// Right now we will just make a call which will simulate a response message coming in after a while.
simulateResponses();
}

private void processResponse(Response response) {
// There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
// in a response topic and calls this method to handle those messages.
CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
responseFuture.complete(response);
}

void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);

Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";

processResponse(response);

} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

static class Request {
long correlationId;
String question;
}

static class Response {
long correlationId;
String answer;
}

}

最佳答案

是的,这是可能的。您可以使用reactor.core.publisher.Mono#create方法来实现

举个例子:

public static void main(String[] args) throws Exception {
RequestResponseService requestResponseService = new RequestResponseService();

Request request = new Request();
request.correlationId = 1;
request.question = "Do you speak Spanish?";


Mono<Request> requestMono = Mono.just(request)
.doOnNext(rq -> System.out.println(rq.question));
requestResponseService.doRequest(requestMono)
.doOnNext(response -> System.out.println(response.answer))
// The blocking call here is just so the application doesn't exit until the demo is completed.
.block();
}

static class RequestResponseService {
private final ConcurrentHashMap<Long, Consumer<Response>> responses =
new ConcurrentHashMap<>();

Mono<Response> doRequest(Mono<Request> request) {
return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
.then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
}

private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
return Mono.fromRunnable(this::simulateResponses);
}

private void processResponse(Response response) {
responses.get(response.correlationId).accept(response);
}

void simulateResponses() {
// This is just to make the example work. Not part of the example.
new Thread(() -> {
try {
// Simulate a delay.
Thread.sleep(10_000);

Response response = new Response();
response.correlationId = 1;
response.answer = "Si!";

processResponse(response);

} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

关于java - 是否可以使用 Project Reactor 等待事件而不阻塞线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57178625/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com