gpt4 book ai didi

java - 服务器上生成的带有 MediaType.TEXT_EVENT_STREAM 的事件何时会传递给客户端上的订阅者

转载 作者:行者123 更新时间:2023-12-02 03:09:18 34 4
gpt4 key购买 nike

我创建了一个示例客户端/服务器应用程序来熟悉 Spring Webflux/Reactor Netty。现在,当响应包含 Flux 并且媒体类型为“文本/事件流”时,我对客户端的行为有点困惑。我可以看到服务器上生成的每个元素都会立即发送到客户端,但尚未传递给订阅者。第一次交付给订阅者发生在服务器端的生产者完成 Flux 之后。对我来说,这意味着所有元素首先会在客户端的reactor-netty 中的某个位置收集,直到获得完整/错误事件。

我的结论是正确的还是我可能做错了什么?如果属实,这种情况在不久的将来会改变吗?根据我目前观察到的行为,使用 Spring Webflux 的大部分好处都被否定了,因为与 Spring Mvc 一样,消费者必须等到整个元素集合创建并传输后才能开始处理元素。

我的服务器应用程序是:`

@SpringBootApplication
public class ServerApp {
public static void main(String[] args) {
new SpringApplicationBuilder().sources(ServerApp.class).run(args);
}

@RestController
public static class TestController {
@GetMapping(value = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> testFlux() {
class AsyncSink implements Consumer<SynchronousSink<String>> {
private List<String> allStrings = List.of(
"Hello Flux1!",
"Hello Flux2!",
"Hello Flux3!",
"Hello Flux4!",
"Hello Flux5!");
private int index = 0;

@Override
public void accept(SynchronousSink<String> sink) {
if (index == allStrings.size()) {
sink.complete();
}
else {
sink.next(allStrings.get(index++));
}
}

}

return Flux.generate(new AsyncSink());
}
}
}

我的客户端应用程序是:

@SpringBootApplication
public class ClientApp {
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext aContext = new SpringApplicationBuilder().web(WebApplicationType.NONE).sources(ClientApp.class).run(args);

Flux<String> aTestFlux = aContext.getBean(TestProxy.class).getFlux();
aTestFlux.subscribe(new TestSubscriber());

System.out.println("Press ENTER to exit.");
System.in.read();
}

@Bean
public WebClient webClient() {
return WebClient.builder().baseUrl("http://localhost:8080").build();
}

@Component
public static class TestProxy {
@Autowired
private WebClient webClient;

public Flux<String> getFlux() {
return webClient.get().uri("/test").accept(MediaType.TEXT_EVENT_STREAM).exchange().flatMapMany(theResponse -> theResponse.bodyToFlux(String.class));
}
}

private static class TestSubscriber extends BaseSubscriber<String> {
@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(Long.MAX_VALUE);
}

@Override
public void hookOnNext(String theValue) {
System.out.println(" - " + theValue);
request(1);
}

@Override
protected void hookOnComplete() {
System.out.println(" done");
}

@Override
protected void hookOnCancel() {
System.out.println(" cancelled");
}

@Override
protected void hookOnError(Throwable theThrowable) {
theThrowable.printStackTrace(System.err);
}
}
}

当我访问网址http://localhost:8080/test时使用 Chrome 浏览器我看到:

data:Hello Flux1!

data:Hello Flux2!

data:Hello Flux3!

data:Hello Flux4!

data:Hello Flux5!

对我来说,看起来已经发送了 5 个 http 事件。

最佳答案

取自 react 性文档并重写以满足您的需求。

我的猜测是,在您的示例中,您已经向生成函数传递了一个消费者,该消费者在完成后将被发出。

改为使用方法 Flux#generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)您提供一个状态,其中将包含您想要发出的项目,然后在提供的 BiFunction 中您逐一发出每一项。

Flux<String> flux = Flux.generate(
() -> List.of("1!", "2!", "3!", "4!", "5!"),
(state, sink) -> {
if (index == allStrings.size()) {
sink.complete();
} else {
sink.next(state.get(index++));
}
});

我尚未测试在移动设备上编写的代码。

关于java - 服务器上生成的带有 MediaType.TEXT_EVENT_STREAM 的事件何时会传递给客户端上的订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57004002/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com