- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 Spring Reactor 的新手。我一直在努力了解 ConnectableFlux 类的工作原理。我已经阅读了文档并看到了在线发布的示例,但仍被困在一个问题上。
有人能告诉我为什么 connect() 方法会阻塞吗?我在文档中没有看到任何说明它应该阻止的内容……特别是因为它返回一个 Disposable 供以后使用。给出我下面的示例代码,我永远不会通过 connect() 方法。
我试图基本上模拟我过去多次使用的旧式监听器接口(interface)范例。我想学习如何使用 react 流重新创建服务类和监听器架构。我有一个简单的 Service 类,它有一个名为“addUpdateListener(Listener l)”的方法,然后当我的服务类“doStuff()"方法,它会触发一些事件传递给任何监听器。
我应该说我将编写一个 API 供其他人使用,所以当我说服务类时,我并不是指 Spring 术语中的 @Service。它将是一个普通的 Java 单例类。
我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。
我从下面的测试类开始,只是为了了解库语法,然后陷入了阻塞问题。
我想我要找的是这里描述的:Multiple Subscribers
更新:通过调试器运行我的代码,ConnectableFlux 连接方法中的代码永远不会返回。它卡在内部连接方法上,永远不会从该方法返回。
reactor.core.publisher.ConnectableFlux
public final Disposable connect() {
Disposable[] out = new Disposable[]{null};
this.connect((r) -> {
out[0] = r;
});
return out[0];
}
任何帮助都会很棒!
这也是我的maven pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SpringReactorTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
</excludes>
</artifactSet>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofSeconds;
/**
* Testing ConnectableFlux
*/
public class Main {
private final static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
Main m = new Main();
// Get the connectable
ConnectableFlux<Object> flux = m.fluxPrintTime();
// Subscribe some listeners
// Tried using a new thread for the subscribers, but the connect call still blocks
LOG.info("Subscribing");
Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));
LOG.info("Connecting...");
Disposable connect = flux.connect();// WHY does this block??
LOG.info("Connected..");
// Sleep 5 seconds
TimeUnit.SECONDS.sleep(5);
// Cleanup - Remove listeners
LOG.info("Disposing");
connect.dispose();
disposable.dispose();
disposable2.dispose();
LOG.info("Disposed called");
}
// Just create a test flux
public ConnectableFlux<Object> fluxPrintTime() {
return Flux.create(fluxSink -> {
while (true) {
fluxSink.next(System.currentTimeMillis());
}
}).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
.sample(ofSeconds(2))
.publish();
}
}
运行上面的代码会得到以下输出..它只是以毫秒为单位打印时间,直到我按 Ctrl-C 处理过程..
09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...
最佳答案
我收到了 Spring Reactor 团队的答复,我只是将其发布在这里以防其他人遇到此问题...
The crux of the issue is that you're entering an infinite loop in Flux.create. The moment the flux gets subscribed, it will enter the loop and never exit it, producing data as fast as the CPU can. With Flux.create you should at least have a call to sink.complete() at some point.
I suggest to experiment with eg. Flux.interval as a source for your regular ticks, it will get rid of that extraneous complexity of Flux.create, which puts you in charge of lower level concepts of Reactive Streams (the onNext/onComplete/onError signals, that you'll need to learn about, but maybe not just right now 😄 ).
As a side note, I would take into consideration that emulating a listener-based API with Reactor (or RxJava) is not doing justice to what reactive programming can do. It is a constrained use case that will probably drive your focus and expectations away from the real benefits of reactive programming
From a higher perspective:
The broad idea of ConnectableFlux#connect() is that you have a "transient" source that you want to share between multiple subscribers, but it gets triggered the moment someone subscribes to it. So in order not to miss any event, you turn the source into a ConnectableFlux, perform some set up (subscribe several subscribers) and manually trigger the source (by calling connect()). It is not blocking, and returns a Disposable` that represents the upstream connection (in case you also want to manually cancel/dispose the whole subscription).
PS: Bismuth is now clearly outdated, prefer using the latest Dysprosium release train
关于spring-reactor - 为什么 ConnectableFlux.connect() 会阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61871723/
尝试使用集成到 QTCreator 的表单编辑器,但即使我将插件放入 QtCreator.app/Contents/MacOS/designer 也不会显示。不过,相同的 dylib 文件确实适用于独
在此代码示例中。 “this.method2();”之后会读到什么?在返回returnedValue之前会跳转到method2()吗? public int method1(int returnedV
我的项目有通过gradle配置的依赖项。我想添加以下依赖项: compile group: 'org.restlet.jse', name: 'org.restlet.ext.apispark', v
我将把我们基于 Windows 的客户管理软件移植到基于 Web 的软件。我发现 polymer 可能是一种选择。 但是,对于我们的使用,我们找不到 polymer 组件具有表格 View 、下拉菜单
我的项目文件夹 Project 中有一个文件夹,比如 ED 文件夹,当我在 Eclipse 中指定在哪里查找我写入的文件时 File file = new File("ED/text.txt"); e
这是奇怪的事情,这个有效: $('#box').css({"backgroundPosition": "0px 250px"}); 但这不起作用,它只是不改变位置: $('#box').animate
这个问题在这里已经有了答案: Why does OR 0 round numbers in Javascript? (3 个答案) 关闭 5 年前。 Mozilla JavaScript Guide
这个问题在这里已经有了答案: Is the function strcmpi in the C standard libary of ISO? (3 个答案) 关闭 8 年前。 我有一个问题,为什么
我目前使用的是共享主机方案,我不确定它使用的是哪个版本的 MySQL,但它似乎不支持 DATETIMEOFFSET 类型。 是否存在支持 DATETIMEOFFSET 的 MySQL 版本?或者有计划
研究 Seam 3,我发现 Seam Solder 允许将 @Named 注释应用于包 - 在这种情况下,该包中的所有 bean 都将自动命名,就好像它们符合条件一样@Named 他们自己。我没有看到
我知道 .append 偶尔会增加数组的容量并形成数组的新副本,但 .removeLast 会逆转这种情况并减少容量通过复制到一个新的更小的数组来改变数组? 最佳答案 否(或者至少如果是,则它是一个错
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
noexcept 函数说明符是否旨在 boost 性能,因为生成的对象中可能没有记录异常的代码,因此应尽可能将其添加到函数声明和定义中?我首先想到了可调用对象的包装器,其中 noexcept 可能会产
我正在使用 Angularjs 1.3.7,刚刚发现 Promise.all 在成功响应后不会更新 angularjs View ,而 $q.all 会。由于 Promises 包含在 native
我最近发现了这段JavaScript代码: Math.random() * 0x1000000 10.12345 10.12345 >> 0 10 > 10.12345 >>> 0 10 我使用
我正在编写一个玩具(物理)矢量库,并且遇到了 GHC 坚持认为函数应该具有 Integer 的问题。是他们的类型。我希望向量乘以向量以及标量(仅使用 * ),虽然这可以通过仅使用 Vector 来实现
PHP 的 mail() 函数发送邮件正常,但 Swiftmailer 的 Swift_MailTransport 不起作用! 这有效: mail('user@example.com', 'test
我尝试通过 php 脚本转储我的数据,但没有命令行。所以我用 this script 创建了我的 .sql 文件然后我尝试使用我的脚本: $link = mysql_connect($host, $u
使用 python 2.6.4 中的 sqlite3 标准库,以下查询在 sqlite3 命令行上运行良好: select segmentid, node_t, start, number,title
我最近发现了这段JavaScript代码: Math.random() * 0x1000000 10.12345 10.12345 >> 0 10 > 10.12345 >>> 0 10 我使用
我是一名优秀的程序员,十分优秀!