gpt4 book ai didi

spring-webflux - Spring WebFlux (Flux) : how to publish dynamically

转载 作者:行者123 更新时间:2023-12-04 02:37:28 24 4
gpt4 key购买 nike

我是响应式(Reactive)编程和 Spring WebFlux 的新手。我想让我的 App 1 通过 Flux 发布 Server Sent 事件,我的 App 2 持续监听它。

我希望 Flux 按需发布(例如,当某事发生时)。我发现的所有示例都是使用 Flux.interval 定期发布事件,并且一旦创建 Flux 似乎无法附加/修改内容。

我怎样才能实现我的目标?或者我在概念上完全错误。

最佳答案

使用 FluxProcessor“动态”发布和 FluxSink手动向 Flux 提供数据的技术之一正在使用 FluxProcessor#sink方法如下例

@SpringBootApplication
@RestController
public class DemoApplication {

final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);

}

public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}

@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}

@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
在这里,我创建了 DirectProcessor为了支持多个订阅者,即会监听数据流。另外,我提供了额外的 FluxProcessor#serialize为 multiproducer 提供安全支持(从不同线程调用而不违反 Reactive Streams 规范规则,尤其是 rule 1.3 )。最后,通过调用“http://localhost:8080/send”,我们将看到消息 Hello World #1 (当然,只有在您之前连接到“http://localhost:8080”的情况下)
Reactor 3.4 更新
在 Reactor 3.4 中,您有一个名为 reactor.core.publisher.Sinks 的新 API。 . Sinks API 为手动数据发送提供了一个流畅的构建器,它允许您指定诸如流中的元素数量和背压行为、支持的订阅者数量和重放功能等内容:
@SpringBootApplication
@RestController
public class DemoApplication {

final Sinks.Many sink;
final AtomicLong counter;

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);

}

public DemoApplication() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.counter = new AtomicLong();
}

@GetMapping("/send")
public void test() {
EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

if (result.isFailure()) {
// do something here, since emission failed
}
}

@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
}
}
注意,通过 Sinks 发送消息API引入了 emission的新概念及其结果。之所以有这样的API,是因为Reactor 扩展了Reactive-Streams,必须遵循背压控制。也就是说如果你 emit比请求的信号多,并且底层实现不支持缓冲,您的消息将不会被传递。因此, tryEmitNext 的结果返回 EmitResult这表明消息是否已发送。
另外,请注意,默认情况下 Sinsk API 给出了 Sink 的序列化版本,这意味着您不必关心并发性。但是,如果您事先知道消息的发射是串行的,则可以构建一个 Sinks.unsafe()不序列化给定消息的版本

关于spring-webflux - Spring WebFlux (Flux) : how to publish dynamically,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51370463/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com