gpt4 book ai didi

java - 如何让 Flux 的多个订阅者在不同的执行上下文/线程上运行

转载 作者:行者123 更新时间:2023-11-30 05:24:28 28 4
gpt4 key购买 nike

我正在开发一个用于 IoT 实时数据可视化的 Spring Boot WebFlux 应用程序。

我有一个Flux,它可以模拟来自设备的数据,我希望在建立每个事件的Websocket连接时:

  • 必须通过 websocket 发送以实现实时可视化(使用响应式 WebSocketHandler)
  • 必须根据给定条件进行检查,以便通过 HTTP REST 调用 (RestTemplate) 发送通知

从我的日志来看,两个订阅者(websocket 处理程序和通知程序)似乎获得了两个具有完全不同值的不同流(在日志下方)。

我还尝试了在 MySource 类中的 map 之后链接 share 方法的变体,在这种情况下,看起来虽然我有只有一个 Flux,只有一个线程,因此一切都处于阻塞状态(我可以看到 REST 调用阻塞了通过 websocket 的发送)。

这里发生了什么?如何使两个订阅者在不同的执行上下文(不同的线程)中运行,从而完全相互独立?

下面是相关代码片段和日志。

先谢谢大家了!

更新:为了清楚起见,我必须指定 MyEvent 具有随机生成的值,因此我通过使用 @NikolaB 的回答解决了一个问题ConnectableFlux/share 保证具有相同的 Flux,但我仍然希望两个订阅者拥有单独的执行上下文。

public class MyWebSocketHandler implements WebSocketHandler {

@Autowired
public MySource mySource;

@Autowired
public Notifier notifier;

public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<MyEvent> events = mySource.events();
events.subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(events.map(this::toJson).map(webSocketSession::textMessage));
}

private String toJson(MyEvent event) {
log.info("websocket toJson " + event.getValue());
...
}
}
public class MySource {
public Flux<MyEvent> events() {
return Flux.interval(...).map(i -> new MyEvent(*Random Generate Value*);
}
}
public class Notifier {

public void sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
if (condition met)
restTemplate.exchange(...)
}
}
2019-11-19 11:58:55.375 INFO [     parallel-3] i.a.m.websocket.MyWebSocketHandler  : websocket toJson 4.09
2019-11-19 11:58:55.375 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.86
2019-11-19 11:58:57.366 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.24
2019-11-19 11:58:57.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.11
2019-11-19 11:58:59.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.61
2019-11-19 11:58:59.374 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.03
2019-11-19 11:59:01.365 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.88
2019-11-19 11:59:01.375 INFO [ parallel-3] i.a.m.websocket.MyWebSocketHandler : websocket toJson 4.29
2019-11-19 11:59:03.364 INFO [ parallel-1] i.a.m.notifier.Notifier : notifier sendNotification 4.37

最佳答案

这里有几个问题,首先是 RestTemplate是同步/阻塞 HTTP 客户端,因此您应该使用 WebClient这是 react 性的,也可以创建 ConnectableFlux ( Flux 可以有多个订阅者)您需要在 map 之前共享它运算符并创建新的 Flux -es 从连接的创建。

示例:

Flux<MyEvent> connectedFlux = mySource.events().share();
Flux.from(connectedFlux).subscribe(event -> notifier.sendNotification(event));
return webSocketSession.send(Flux.from(connectedFlux).map(this::toJson).map(webSocketSession::textMessage));

还有sendNotification方法应返回 Mono<Void>因为响应式方法应该始终返回 MonoFlux类型。

要启动独立执行,您可以 Zip那两个Mono s。

编辑

首先如上所述使用WebClient对于响应式 HTTP 客户端的传出 HTTP 调用,并重新设计 Notifier类:

public class Notifier {

public Mono<Void> sendNotification (MyEvent event) {
log.info("notifier sendNotification " + event.getValue());
return Mono.just(event)
.filter(e -> /* your condition */)
.flatMap(e -> WebClient.builder().baseUrl("XXX")...)
.then();
}

}

现在看看执行上下文是否不同。

关于java - 如何让 Flux 的多个订阅者在不同的执行上下文/线程上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58932752/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com