gpt4 book ai didi

org.springframework.web.reactive.socket.WebSocketSession.send()方法的使用及代码示例

转载 作者:知者 更新时间:2024-03-26 20:29:05 27 4
gpt4 key购买 nike

本文整理了Java中org.springframework.web.reactive.socket.WebSocketSession.send()方法的一些代码示例,展示了WebSocketSession.send()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WebSocketSession.send()方法的具体详情如下:
包路径:org.springframework.web.reactive.socket.WebSocketSession
类名称:WebSocketSession
方法名:send

WebSocketSession.send介绍

[英]Give a source of outgoing messages, write the messages and return a Mono that completes when the source completes and writing is done.

See the class-level doc of WebSocketHandler and the reference for more details and examples of how to handle the session.
[中]给出传出消息的来源,编写消息并返回一个Mono,当消息来源完成并完成写入时,该Mono将完成。
有关如何处理会话的更多细节和示例,请参阅WebSocketHandler的类级文档和参考。

代码示例

代码示例来源:origin: spring-cloud/spring-cloud-gateway

private static Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
  return session.send(output);
  // workaround for suspected RxNetty WebSocket client issue
  // https://github.com/ReactiveX/RxNetty/issues/560
  // return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    String protocol = session.getHandshakeInfo().getSubProtocol();
    WebSocketMessage message = session.textMessage(protocol != null ? protocol : "none");
    return session.send(Mono.just(message));
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public Mono<Void> handle(WebSocketSession proxySession) {
  // Use retain() for Reactor Netty
  Mono<Void> proxySessionSend = proxySession
      .send(session.receive().doOnNext(WebSocketMessage::retain));
      // .log("proxySessionSend", Level.FINE);
  Mono<Void> serverSessionSend = session
      .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
      // .log("sessionSend", Level.FINE);
  return Mono.zip(proxySessionSend, serverSessionSend).then();
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    HttpHeaders headers = session.getHandshakeInfo().getHeaders();
    String payload = "my-header:" + headers.getFirst("my-header");
    WebSocketMessage message = session.textMessage(payload);
    return session.send(Mono.just(message));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(Flux
        .error(new Throwable())
        .onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
        .cast(WebSocketMessage.class));
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
      .send(input.map(session::textMessage))
      .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
      .subscribeWith(output)
      .then())
      .block(TIMEOUT);
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getUrl("/echo"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echoForHttp() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getHttpUrl("/echoForHttp"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: hantsy/spring-reactive-sample

private Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
  return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}

代码示例来源:origin: hantsy/spring-reactive-sample

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: bclozel/webflux-workshop

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(session.receive()
        .doOnNext(WebSocketMessage::retain)
        .delayElements(Duration.ofSeconds(1)).log());
  }
}

代码示例来源:origin: ch.rasc/wamp2spring-reactive

@Override
public Mono<Void> handle(WebSocketSession session) {
  session.receive().doFinally(sig -> {
    Long wampSessionId = this.webSocketId2WampSessionId.get(session.getId());
    if (wampSessionId != null) {
      this.applicationEventPublisher.publishEvent(
          new WampDisconnectEvent(wampSessionId, session.getId(),
              session.getHandshakeInfo().getPrincipal().block()));
      this.webSocketId2WampSessionId.remove(session.getId());
    }
    session.close(); // ?
  }).subscribe(inMsg -> {
    handleIncomingMessage(inMsg, session);
  });
  Publisher<Message<Object>> publisher = MessageChannelReactiveUtils
      .toPublisher(this.clientOutboundChannel);
  return session.send(Flux.from(publisher)
      .filter(msg -> resolveSessionId(msg).equals(session.getId()))
      .map(msg -> handleOutgoingMessage(msg, session)));
}

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com