gpt4 book ai didi

org.springframework.web.reactive.socket.WebSocketSession类的使用及代码示例

转载 作者:知者 更新时间:2024-03-26 20:31:05 29 4
gpt4 key购买 nike

本文整理了Java中org.springframework.web.reactive.socket.WebSocketSession类的一些代码示例,展示了WebSocketSession类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WebSocketSession类的具体详情如下:
包路径:org.springframework.web.reactive.socket.WebSocketSession
类名称:WebSocketSession

WebSocketSession介绍

[英]Represents a WebSocket session.

Use WebSocketSession#receive() to compose on the inbound message stream, and WebSocketSession#send(Publisher) to provide the outbound message stream.
[中]表示WebSocket会话。
使用WebSocketSession#receive()在入站消息流上进行合成,使用WebSocketSession#send(Publisher)提供出站消息流。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(Flux
        .error(new Throwable())
        .onErrorResume(ex -> session.close(CloseStatus.GOING_AWAY)) // SPR-17306 (nested close)
        .cast(WebSocketMessage.class));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    String protocol = session.getHandshakeInfo().getSubProtocol();
    WebSocketMessage message = session.textMessage(protocol != null ? protocol : "none");
    return session.send(Mono.just(message));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    infoRef.set(session.getHandshakeInfo());
    return session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .subscribeWith(output)
        .then();
  }
})

代码示例来源:origin: ch.rasc/wamp2spring-reactive

@Override
public Mono<Void> handle(WebSocketSession session) {
  session.receive().doFinally(sig -> {
    Long wampSessionId = this.webSocketId2WampSessionId.get(session.getId());
    if (wampSessionId != null) {
      this.applicationEventPublisher.publishEvent(
          new WampDisconnectEvent(wampSessionId, session.getId(),
              session.getHandshakeInfo().getPrincipal().block()));
      this.webSocketId2WampSessionId.remove(session.getId());
    }
    session.close(); // ?
  }).subscribe(inMsg -> {
    handleIncomingMessage(inMsg, session);
  });
  Publisher<Message<Object>> publisher = MessageChannelReactiveUtils
      .toPublisher(this.clientOutboundChannel);
  return session.send(Flux.from(publisher)
      .filter(msg -> resolveSessionId(msg).equals(session.getId()))
      .map(msg -> handleOutgoingMessage(msg, session)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getUrl("/echo"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: ch.rasc/wamp2spring-reactive

session.close(CloseStatus.GOING_AWAY);
  return null;
String acceptedProtocol = session.getHandshakeInfo().getSubProtocol();
if (acceptedProtocol != null) {
  if (acceptedProtocol.equals(WampWebSocketHandler.MSGPACK_PROTOCOL)) {
      cm.setHeader(WampMessageHeader.WEBSOCKET_SESSION_ID, session.getId());
      this.clientOutboundChannel.send(cm);
          .binaryMessage(factory -> factory.wrap(bos.toByteArray()));
    return session.textMessage(
        new String(bos.toByteArray(), StandardCharsets.UTF_8));
          + session.getId(), ex);
    session.close(CloseStatus.PROTOCOL_ERROR);
    logger.error(
        "Failed to send WebSocket message to client because no accepted protocol "
            + session.getId());

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    String protocol = session.getHandshakeInfo().getSubProtocol();
    if (!StringUtils.hasText(protocol)) {
      return Mono.error(new IllegalStateException("Missing protocol"));
    }
    List<String> protocols = session.getHandshakeInfo().getHeaders().get(SEC_WEBSOCKET_PROTOCOL);
    assertThat(protocols).contains("echo-v1,echo-v2");
    WebSocketMessage message = session.textMessage(protocol);
    return doSend(session, Mono.just(message));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(TIMEOUT);
}

代码示例来源:origin: ch.rasc/wamp2spring-reactive

String acceptedProtocol = session.getHandshakeInfo().getSubProtocol();
  if (acceptedProtocol != null) {
    if (acceptedProtocol.equals(WampWebSocketHandler.MSGPACK_PROTOCOL)) {
      logger.error(
          "Deserialization failed because no accepted protocol "
              + inMsg + " in session " + session.getId());
  if (logger.isErrorEnabled()) {
    logger.error("Deserialization failed for message " + inMsg
        + " in session " + session.getId());
    session.getId());
wampMessage.setHeader(WampMessageHeader.PRINCIPAL,
    session.getHandshakeInfo().getPrincipal().block());
wampMessage.setHeader(WampMessageHeader.WAMP_SESSION_ID,
    this.webSocketId2WampSessionId.get(session.getId()));
    session.close(CloseStatus.PROTOCOL_ERROR);
  this.webSocketId2WampSessionId.put(session.getId(), newWampSessionId);
  session.close(CloseStatus.GOING_AWAY);
      WampError.GOODBYE_AND_OUT);
  goodbyeMessage.setHeader(WampMessageHeader.WEBSOCKET_SESSION_ID,
      session.getId());
  this.clientOutboundChannel.send(goodbyeMessage);

代码示例来源:origin: spring-projects/spring-framework

/**
 * Close the WebSocket session with the given status.
 * @param status the close status
 */
Mono<Void> close(CloseStatus status);

代码示例来源:origin: spring-cloud/spring-cloud-gateway

private static Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
  return session.send(output);
  // workaround for suspected RxNetty WebSocket client issue
  // https://github.com/ReactiveX/RxNetty/issues/560
  // return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}

代码示例来源:origin: spring-projects/spring-framework

private <T> WebSocketMessage toMessage(T message) {
  WebSocketSession session = this.delegateSession;
  Assert.state(session != null, "Cannot create message without a session");
  if (message instanceof String) {
    byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
    return new WebSocketMessage(Type.TEXT, session.bufferFactory().wrap(bytes));
  }
  else if (message instanceof ByteBuffer) {
    DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
    return new WebSocketMessage(Type.BINARY, buffer);
  }
  else if (message instanceof PongMessage) {
    DataBuffer buffer = session.bufferFactory().wrap(((PongMessage) message).getApplicationData());
    return new WebSocketMessage(Type.PONG, buffer);
  }
  else {
    throw new IllegalArgumentException("Unexpected message type: " + message);
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
  return getHttpClient()
      .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
      .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
      .uri(url.toString())
      .handle((inbound, outbound) -> {
        HttpHeaders responseHeaders = toHttpHeaders(inbound);
        String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
        HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
        NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
        WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
        if (logger.isDebugEnabled()) {
          logger.debug("Started session '" + session.getId() + "' for " + url);
        }
        return handler.handle(session);
      })
      .doOnRequest(n -> {
        if (logger.isDebugEnabled()) {
          logger.debug("Connecting to " + url);
        }
      })
      .next();
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echoForHttp() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getHttpUrl("/echoForHttp"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    infoRef.set(session.getHandshakeInfo());
    return session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .subscribeWith(output)
        .then();
  }
})

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    HttpHeaders headers = session.getHandshakeInfo().getHeaders();
    if (!headers.containsKey("my-header")) {
      return Mono.error(new IllegalStateException("Missing my-header"));
    }
    String payload = "my-header:" + headers.getFirst("my-header");
    WebSocketMessage message = session.textMessage(payload);
    return doSend(session, Mono.just(message));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void customHeader() throws Exception {
  HttpHeaders headers = new HttpHeaders();
  headers.add("my-header", "my-value");
  MonoProcessor<Object> output = MonoProcessor.create();
  this.client.execute(getUrl("/custom-header"), headers,
      session -> session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .subscribeWith(output)
          .then())
      .block(TIMEOUT);
  assertEquals("my-header:my-value", output.block(TIMEOUT));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
  }
}

代码示例来源:origin: hantsy/spring-reactive-sample

private Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
  return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com