gpt4 book ai didi

java - Webflux websocketclient,如何在同一个 session 中发送多个请求[设计客户端库]

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:54:40 26 4
gpt4 key购买 nike

TL; 博士;

我们正在尝试使用 spring webflux WebSocket 实现设计一个 WebSocket 服务器。服务器具有通常的 HTTP 服务器操作,例如create/fetch/update/fetchall .使用 WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为 WebSockets 就是为此目的而设计的。 webflux 和 WebSockets 的设计是否正确?

长版

我们正在启动一个项目,该项目将使用来自 spring-webflux 的响应式(Reactive) Web 套接字。 .我们需要构建一个响应式(Reactive)客户端库,消费者可以使用它来连接到服务器。

在服务器上,我们收到一个请求,读取一条消息,保存它并返回一个静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
Flux<WebSocketMessage> response = webSocketSession.receive()
.map(WebSocketMessage::retain)
.concatMap(webSocketMessage -> Mono.just(webSocketMessage)
.map(parseBinaryToEvent) //logic to get domain object
.flatMap(e -> service.save(e))
.thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
);

return webSocketSession.send(response);
}

在客户端 , 我们想在有人来电时拨打 save方法并返回来自 server 的响应.
public Mono<String> save(Event message) {
new ReactorNettyWebSocketClient().execute(uri, session -> {
session
.send(Mono.just(session.binaryMessage(formatEventToMessage)))
.then(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(System.out::println).then()); //how to return this to client
});
return null;
}

我们不确定,如何设计这个。理想情况下,我们认为应该有

1) client.execute应该只调用一次并以某种方式保持 session .应该使用相同的 session 在后续调用中发送数据。

2) 如何从服务器返回我们在 session.receive 中得到的响应?

3) 万一 fetch怎么办?当 session.receive 中的响应很大时(不仅仅是静态字符串,而是事件列表) ?

我们正在做一些研究,但我们无法在线找到 webflux-websocket-client 文档/实现的合适资源。关于如何前进的任何指示。

最佳答案

请!使用 RSocket !

这是绝对正确的设计,值得为所有可能的操作节省资源并为每个客户端仅使用一个连接。

但是,不要实现轮子并使用为您提供所有这些类型通信的协议(protocol)。

  • RSocket 有一个请求-响应模型,它允许您进行当今最常见的客户端-服务器交互。
  • RSocket 具有请求流通信模型,因此您可以满足所有需求并异步重用相同的连接返回事件流。 RSocket 将所有逻辑流映射到物理连接并返回,因此您不会感到自己这样做的痛苦。
  • RSocket 有更多的交互模型,例如
    在以下情况下可能有用的即发即忘和流流
    以两种方式发送数据流。

  • 如何在 Spring 中使用 RSocket

    这样做的选项之一是使用 RSocket 协议(protocol)的 RSocket-Java 实现。 RSocket-Java 建立在 Project Reactor 之上,所以它自然适合 Spring WebFlux 生态系统。

    不幸的是,没有与 Spring 生态系统的特色集成。幸运的是,我花了几个小时提供了一个简单的 RSocket Spring Boot Starter将 Spring WebFlux 与 RSocket 集成在一起,并将 WebSocket RSocket 服务器与 WebFlux Http 服务器一起公开。

    为什么 RSocket 是更好的方法?

    基本上,RSocket 隐藏了自己实现相同方法的复杂性。使用 RSocket,我们不必关心作为自定义协议(protocol)和 Java 实现的交互模型定义。 RSocket 为我们将数据传送到特定的逻辑 channel 。它提供了一个内置客户端,可以将消息发送到同一个 WS 连接,因此我们不必为此发明自定义实现。

    使用 RSocket-RPC 让它变得更好

    由于 RSocket 只是一个协议(protocol),它不提供任何消息格式,所以这个挑战是针对业务逻辑的。然而,有一个 RSocket-RPC 项目提供了 Protocol Buffer 作为消息格式,并重用了与 GRPC 相同的代码生成技术。因此,使用 RSocket-RPC 我们可以轻松地为客户端和服务器构建 API,而无需关心传输和协议(protocol)抽象。

    同样的 RSocket Spring Boot 集成提供了一个 example RSocket-RPC 的使用也是如此。

    好吧,它没有说服我,我仍然想要一个自定义的 WebSocket 服务器

    所以,为了这个目的,你必须自己实现那个 hell 。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。
    不过,我可以分享一些代码示例,它们可以帮助您构建合适的客户端和服务器。

    服务器端

    处理程序和开放逻辑订阅者映射

    必须考虑的第一点是,一个物理连接中的所有逻辑流都应该存储在某个地方:
    class MyWebSocketRouter implements WebSocketHandler {

    final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;


    @Override
    public Mono<Void> handle(WebSocketSession session) {
    final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
    ...
    }
    }

    上面的示例中有两张 map 。第一个是你的路由映射,它允许你根据传入的消息参数来识别路由,等等。第二个是为请求流用例创建的(在我的例子中它是 Activity 订阅的映射),所以你可以发送一个消息帧来创建一个订阅,或者让你订阅一个特定的 Action 并保持这个订阅,所以一旦取消订阅执行操作,如果订阅存在,您将被取消订阅。

    使用 Processor 进行消息复用

    为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用 UnicastProcessor 来做到这一点。 :
    @Override
    public Mono<Void> handle(WebSocketSession session) {
    final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
    ...

    return Mono
    .subscriberContext()
    .flatMap(context -> Flux.merge(
    session
    .receive()
    ...
    .cast(ActionMessage.class)
    .publishOn(Schedulers.parallel())
    .doOnNext(am -> {
    switch (am.type) {
    case CREATE:
    case UPDATE:
    case CANCEL: {
    ...
    }
    case SUBSCRIBE: {
    Flux<ResponseMessage<?>> flux = Flux
    .from(
    channelsMapping.get(am.getChannelId())
    .get(ActionMessage.Type.SUBSCRIBE)
    .handle(am) // returns Publisher<>
    );

    if (flux != null) {
    channelsIdsToDisposableMap.compute(
    am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
    (cid, disposable) -> {
    ...

    return flux
    .subscriberContext(context)
    .subscribe(
    funIn::onNext, // send message to a Processor manually
    e -> {
    funIn.onNext(
    new ResponseMessage<>( // send errors as a messages to Processor here
    0,
    e.getMessage(),
    ...
    ResponseMessage.Type.ERROR
    )
    );
    }
    );
    }
    );
    }

    return;
    }
    case UNSABSCRIBE: {
    Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());

    if (disposable != null) {
    disposable.dispose();
    }
    }
    }
    })
    .then(Mono.empty()),

    funIn
    ...
    .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
    .as(session::send)
    ).then()
    );
    }

    正如我们从上面的示例中看到的那样,那里有很多东西:
  • 该消息应包含路由信息
  • 该消息应包含与其相关的唯一流 ID。
  • 用于消息多路复用的单独处理器,其中错误也应该是消息
  • 每个 channel 都应该存储在某个地方,在这种情况下,我们有一个简单的用例,其中每条消息都可以提供一个 Flux消息或只是 Mono (在单声道的情况下,它可以在服务器端更简单地实现,因此您不必保留唯一的流 ID)。
  • 此示例不包括消息编码-解码,因此这个挑战留给您。

  • 客户端

    客户端也不是那么简单:

    处理 session

    为了处理连接,我们必须分配两个处理器,以便我们可以进一步使用它们来复用和解复用消息:
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...
    (session) -> {
    return Flux.merge(
    session.receive()
    .subscribeWith(incoming)
    .then(Mono.empty()),
    session.send(outgoing)
    ).then();
    }

    将所有逻辑流保存在某处

    所有创建的流是否是 MonoFlux应该存储在某个地方,以便我们能够区分与哪个流消息相关:
    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;

    自 MonoSink 以来,我们必须保留两个映射,而 FluxSink 没有相同的父接口(interface)。

    消息路由

    在上面的示例中,我们只考虑了客户端的初始部分。现在我们要构建一个消息路由机制:
    ...
    .subscribeWith(incoming)
    .doOnNext(message -> {
    if (monoSinkMap.containsKey(message.getStreamId())) {
    MonoSink sink = monoSinkMap.get(message.getStreamId());
    monoSinkMap.remove(message.getStreamId());
    if (message.getType() == SUCCESS) {
    sink.success(message.getData());
    }
    else {
    sink.error(message.getCause());
    }
    } else if (fluxSinkMap.containsKey(message.getStreamId())) {
    FluxSink sink = fluxSinkMap.get(message.getStreamId());
    if (message.getType() == NEXT) {
    sink.next(message.getData());
    }
    else if (message.getType() == COMPLETE) {
    fluxSinkMap.remove(message.getStreamId());
    sink.next(message.getData());
    sink.complete();
    }
    else {
    fluxSinkMap.remove(message.getStreamId());
    sink.error(message.getCause());
    }
    }
    })

    上面的代码示例展示了我们如何路由传入的消息。

    多路请求

    最后一部分是消息复用。为此,我们将介绍可能的发送者类实现:
    class Sender {
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...

    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;

    public Sender () {

    //这里创建 websocket 连接并放前面提到的代码
    }
        Mono<R> sendForMono(T data) {
    //generate message with unique
    return Mono.<R>create(sink -> {
    monoSinksMap.put(streamId, sink);
    outgoing.onNext(message); // send message to server only when subscribed to Mono
    });
    }

    Flux<R> sendForFlux(T data) {
    return Flux.<R>create(sink -> {
    fluxSinksMap.put(streamId, sink);
    outgoing.onNext(message); // send message to server only when subscribed to Flux
    });
    }
    }

    自定义实现总结
  • 铁杆
  • 没有实现背压支持,这可能是另一个挑战
  • 轻松射中自己的脚

  • 外卖
  • 请使用 RSocket,不要自己发明协议(protocol),这很难!!!
  • 要从 Pivotal 人员那里了解有关 RSocket 的更多信息 - https://www.youtube.com/watch?v=WVnAbv65uCU
  • 从我的一次演讲中了解有关 RSocket 的更多信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
  • 有一个基于 RSocket 的特色框架,称为 Proteus - 您可能对此感兴趣 - https://www.netifi.com/
  • 从 RSocket 协议(protocol)核心开发者处了解更多关于 Proteus 的信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E
  • 关于java - Webflux websocketclient,如何在同一个 session 中发送多个请求[设计客户端库],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53812515/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com