gpt4 book ai didi

org.springframework.web.reactive.socket.WebSocketSession.receive()方法的使用及代码示例

转载 作者:知者 更新时间:2024-03-26 20:15:05 32 4
gpt4 key购买 nike

本文整理了Java中org.springframework.web.reactive.socket.WebSocketSession.receive()方法的一些代码示例,展示了WebSocketSession.receive()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。WebSocketSession.receive()方法的具体详情如下:
包路径:org.springframework.web.reactive.socket.WebSocketSession
类名称:WebSocketSession
方法名:receive

WebSocketSession.receive介绍

[英]Provides access to the stream of inbound messages.

This stream receives a completion or error signal when the connection is closed. In a typical WebSocketHandler implementation this stream is composed into the overall processing flow, so that when the connection is closed, handling will end.

See the class-level doc of WebSocketHandler and the reference for more details and examples of how to handle the session.
[中]提供对入站消息流的访问。
当连接关闭时,该流接收完成或错误信号。在典型的WebSocketHandler实现中,该流被组成整个处理流,因此当连接关闭时,处理将结束。
有关如何处理会话的更多细节和示例,请参阅WebSocketHandler的类级文档和参考。

代码示例

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-projects/spring-framework

@Override
  public Mono<Void> handle(WebSocketSession session) {
    infoRef.set(session.getHandshakeInfo());
    return session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .subscribeWith(output)
        .then();
  }
})

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
public Mono<Void> handle(WebSocketSession proxySession) {
  // Use retain() for Reactor Netty
  Mono<Void> proxySessionSend = proxySession
      .send(session.receive().doOnNext(WebSocketMessage::retain));
      // .log("proxySessionSend", Level.FINE);
  Mono<Void> serverSessionSend = session
      .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
      // .log("sessionSend", Level.FINE);
  return Mono.zip(proxySessionSend, serverSessionSend).then();
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(TIMEOUT);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void customHeader() throws Exception {
  HttpHeaders headers = new HttpHeaders();
  headers.add("my-header", "my-value");
  MonoProcessor<Object> output = MonoProcessor.create();
  this.client.execute(getUrl("/custom-header"), headers,
      session -> session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .subscribeWith(output)
          .then())
      .block(TIMEOUT);
  assertEquals("my-header:my-value", output.block(TIMEOUT));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Override
  public Mono<Void> handle(WebSocketSession session) {
    infoRef.set(session.getHandshakeInfo());
    return session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .subscribeWith(output)
        .then();
  }
})

代码示例来源:origin: spring-projects/spring-framework

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  this.client.execute(getUrl("/echo"), session -> session
      .send(input.map(session::textMessage))
      .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
      .subscribeWith(output)
      .then())
      .block(TIMEOUT);
  assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void sessionClosing() throws Exception {
  this.client.execute(getUrl("/close"),
      session -> {
        logger.debug("Starting..");
        return session.receive()
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doFinally(signalType -> {
              logger.debug("Completed with: " + signalType);
            });
      })
      .block(Duration.ofMillis(5000));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void customHeader() throws Exception {
  HttpHeaders headers = new HttpHeaders();
  headers.add("my-header", "my-value");
  MonoProcessor<Object> output = MonoProcessor.create();
  client.execute(getUrl("/custom-header"), headers,
      session -> session.receive()
          .map(WebSocketMessage::getPayloadAsText)
          .subscribeWith(output)
          .then())
      .block(Duration.ofMillis(5000));
  assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echo() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getUrl("/echo"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: spring-cloud/spring-cloud-gateway

@Test
public void echoForHttp() throws Exception {
  int count = 100;
  Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
  ReplayProcessor<Object> output = ReplayProcessor.create(count);
  client.execute(getHttpUrl("/echoForHttp"),
      session -> {
        logger.debug("Starting to send messages");
        return session
            .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(s -> session.textMessage(s)))
            .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
            .subscribeWith(output)
            .doOnNext(s -> logger.debug("inbound " + s))
            .then()
            .doOnSuccessOrError((aVoid, ex) ->
                logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
      })
      .block(Duration.ofMillis(5000));
  assertEquals(input.collectList().block(Duration.ofMillis(5000)),
      output.collectList().block(Duration.ofMillis(5000)));
}

代码示例来源:origin: hantsy/spring-reactive-sample

@Override
  public Mono<Void> handle(WebSocketSession session) {
    // Use retain() for Reactor Netty
    return session.send(session.receive().doOnNext(WebSocketMessage::retain));
  }
}

代码示例来源:origin: bclozel/webflux-workshop

@Override
  public Mono<Void> handle(WebSocketSession session) {
    return session.send(session.receive()
        .doOnNext(WebSocketMessage::retain)
        .delayElements(Duration.ofSeconds(1)).log());
  }
}

代码示例来源:origin: ch.rasc/wamp2spring-reactive

@Override
public Mono<Void> handle(WebSocketSession session) {
  session.receive().doFinally(sig -> {
    Long wampSessionId = this.webSocketId2WampSessionId.get(session.getId());
    if (wampSessionId != null) {
      this.applicationEventPublisher.publishEvent(
          new WampDisconnectEvent(wampSessionId, session.getId(),
              session.getHandshakeInfo().getPrincipal().block()));
      this.webSocketId2WampSessionId.remove(session.getId());
    }
    session.close(); // ?
  }).subscribe(inMsg -> {
    handleIncomingMessage(inMsg, session);
  });
  Publisher<Message<Object>> publisher = MessageChannelReactiveUtils
      .toPublisher(this.clientOutboundChannel);
  return session.send(Flux.from(publisher)
      .filter(msg -> resolveSessionId(msg).equals(session.getId()))
      .map(msg -> handleOutgoingMessage(msg, session)));
}

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com