gpt4 book ai didi

spring-reactor - 为什么 ConnectableFlux.connect() 会阻塞?

转载 作者:行者123 更新时间:2023-12-05 01:36:35 25 4
gpt4 key购买 nike

我是 Spring Reactor 的新手。我一直在努力了解 ConnectableFlux 类的工作原理。我已经阅读了文档并看到了在线发布的示例,但仍被困在一个问题上。

有人能告诉我为什么 connect() 方法会阻塞吗?我在文档中没有看到任何说明它应该阻止的内容……特别是因为它返回一个 Disposable 供以后使用。给出我下面的示例代码,我永远不会通过 connect() 方法。

我试图基本上模拟我过去多次使用的旧式监听器接口(interface)范例。我想学习如何使用 react 流重新创建服务类和监听器架构。我有一个简单的 Service 类,它有一个名为“addUpdateListener(Listener l)”的方法,然后当我的服务类“doStuff()"方法,它会触发一些事件传递给任何监听器。

我应该说我将编写一个 API 供其他人使用,所以当我说服务类时,我并不是指 Spring 术语中的 @Service。它将是一个普通的 Java 单例类。

我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。

我从下面的测试类开始,只是为了了解库语法,然后陷入了阻塞问题。

我想我要找的是这里描述的:Multiple Subscribers

更新:通过调试器运行我的代码,ConnectableFlux 连接方法中的代码永远不会返回。它卡在内部连接方法上,永远不会从该方法返回。

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
Disposable[] out = new Disposable[]{null};
this.connect((r) -> {
out[0] = r;
});
return out[0];
}

任何帮助都会很棒!

这也是我的maven pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>SpringReactorTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
</excludes>
</artifactSet>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;

/**
* Testing ConnectableFlux
*/
public class Main {

private final static Logger LOG = LoggerFactory.getLogger(Main.class);

public static void main(String[] args) throws InterruptedException {
Main m = new Main();

// Get the connectable
ConnectableFlux<Object> flux = m.fluxPrintTime();

// Subscribe some listeners
// Tried using a new thread for the subscribers, but the connect call still blocks
LOG.info("Subscribing");
Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));

LOG.info("Connecting...");
Disposable connect = flux.connect();// WHY does this block??
LOG.info("Connected..");

// Sleep 5 seconds
TimeUnit.SECONDS.sleep(5);

// Cleanup - Remove listeners
LOG.info("Disposing");
connect.dispose();
disposable.dispose();
disposable2.dispose();
LOG.info("Disposed called");
}

// Just create a test flux
public ConnectableFlux<Object> fluxPrintTime() {
return Flux.create(fluxSink -> {
while (true) {
fluxSink.next(System.currentTimeMillis());
}
}).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
.sample(ofSeconds(2))
.publish();
}
}

运行上面的代码会得到以下输出..它只是以毫秒为单位打印时间,直到我按 Ctrl-C 处理过程..

09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...

最佳答案

我收到了 Spring Reactor 团队的答复,我只是将其发布在这里以防其他人遇到此问题...

The crux of the issue is that you're entering an infinite loop in Flux.create. The moment the flux gets subscribed, it will enter the loop and never exit it, producing data as fast as the CPU can. With Flux.create you should at least have a call to sink.complete() at some point.

I suggest to experiment with eg. Flux.interval as a source for your regular ticks, it will get rid of that extraneous complexity of Flux.create, which puts you in charge of lower level concepts of Reactive Streams (the onNext/onComplete/onError signals, that you'll need to learn about, but maybe not just right now 😄 ).

As a side note, I would take into consideration that emulating a listener-based API with Reactor (or RxJava) is not doing justice to what reactive programming can do. It is a constrained use case that will probably drive your focus and expectations away from the real benefits of reactive programming

From a higher perspective:

The broad idea of ConnectableFlux#connect() is that you have a "transient" source that you want to share between multiple subscribers, but it gets triggered the moment someone subscribes to it. So in order not to miss any event, you turn the source into a ConnectableFlux, perform some set up (subscribe several subscribers) and manually trigger the source (by calling connect()). It is not blocking, and returns a Disposable` that represents the upstream connection (in case you also want to manually cancel/dispose the whole subscription).

PS: Bismuth is now clearly outdated, prefer using the latest Dysprosium release train

关于spring-reactor - 为什么 ConnectableFlux.connect() 会阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61871723/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com