gpt4 book ai didi

Spring Boot Webflux/Netty - 检测关闭的连接

转载 作者:行者123 更新时间:2023-12-04 13:56:47 28 4
gpt4 key购买 nike

我一直在使用 spring-boot 2.0.0.RC1使用 webflux 启动器 ( spring-boot-starter-webflux )。我创建了一个返回无限通量的简单 Controller 。我希望发布者只有在有客户(订阅者)的情况下才可以工作。假设我有一个像这样的 Controller :

@RestController
public class Demo {

@GetMapping(value = "/")
public Flux<String> getEvents(){
return Flux.create((FluxSink<String> sink) -> {

while(!sink.isCancelled()){

// TODO e.g. fetch data from somewhere

sink.next("DATA");
}
sink.complete();
}).doFinally(signal -> System.out.println("END"));
}

}

现在,当我尝试运行该代码并访问端点 http://localhost:8080/用 Chrome,然后我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 关闭浏览器后如何终止/取消流式传输?

从此 answer我引用:

Currently with HTTP, the exact backpressure information is not transmitted over the network, since the HTTP protocol doesn't support this. This can change if we use a different wire protocol.



我假设,由于 HTTP 协议(protocol)不支持背压,这意味着也不会发出取消请求。

进一步调查,通过分析网络流量,发现浏览器发送了一个 TCP FIN只要我关闭浏览器。有没有办法配置 Netty(或其他东西),以便半关闭的连接将触发发布者的取消事件,使 while 循环停止?

还是我必须自己编写类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter 的适配器?我在哪里实现自己的订阅者?

谢谢你的帮助。

编辑:
一个 IOException如果没有客户端,将在尝试将数据写入套接字时引发。正如您在 stack trace 中看到的那样.

但这还不够好,因为可能需要一段时间才能准备好发送下一个数据 block ,因此检测消失的客户端需要相同的时间。正如 Brian Clozel's answer 中指出的那样这是 Reactor Netty 中的一个已知问题。我尝试通过将依赖项添加到 POM.xml 来改用 Tomcat。 .像这样:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

尽管它取代了 Netty 并改用了 Tomcat,但由于浏览器不显示任何数据,它似乎没有反应。但是,控制台中没有警告/信息/异常。 spring-boot-starter-webflux从这个版本开始( 2.0.0.RC1 )应该与 Tomcat 一起工作吗?

最佳答案

由于这是一个已知问题(参见 Brian Clozel's answer ),我最终使用了一个 Flux获取我的真实数据并拥有另一个数据以实现某种 ping/heartbeat 机制。结果,我将两者与 Flux.merge() 合并在一起。 .

在这里您可以看到我的解决方案的简化版本:

@RestController
public class Demo {

public interface Notification{}

public static class MyData implements Notification{

public boolean isEmpty(){…}
}

@GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
return Flux.merge(getEventMessageStream(), getHeartbeatStream());
}

private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
return Flux.interval(Duration.ofSeconds(2))
.map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
.doFinally(signalType ->System.out.println("END"));
}

private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
return Flux.interval(Duration.ofSeconds(30))
.map(i -> {

// TODO e.g. fetch data from somewhere,
// if there is no data return an empty object

return data;
})
.filter(data -> !data.isEmpty())
.map(data -> ServerSentEvent
.builder(data)
.event("message").build());
}
}

我将所有内容都包装为 ServerSentEvent<? extends Notification> . Notification只是一个标记界面。我使用 event来自 ServerSentEvent 的字段类以区分数据和 ping 事件。自心跳 Flux以较短的间隔不断地发送事件,检测到客户端离开所花费的时间最多是该间隔的长度。请记住,我需要它,因为我可能需要一段时间才能获得一些可以发送的真实数据,因此,它也可能需要一段时间才能检测到客户端已消失。像这样,一旦无法发送 ping(或者可能是消息事件),它就会检测到客户端已经消失。

关于标记界面的最后一个注释,我称之为通知。这并不是真正必要的,但它提供了一些类型安全性。没有它,我们可以写 Flux<ServerSentEvent<?>>而不是 Flux<ServerSentEvent<? extends Notification>>作为 getNotificationStream() 方法的返回类型。或者也可以,让 getHeartbeatStream() 返回 Flux<ServerSentEvent<MyData>> .然而,像这样它会允许任何对象可以被发送,这是我不想要的。结果,我添加了接口(interface)。

关于Spring Boot Webflux/Netty - 检测关闭的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48806452/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com