gpt4 book ai didi

java - Reactor 3 发射极/用户并联

转载 作者:行者123 更新时间:2023-11-30 10:06:09 24 4
gpt4 key购买 nike

我是 Reactive 编程的新手,有很多问题。我认为这不是缺少示例或文档,而是我的理解有误。

我正在尝试模拟慢速订阅者;

代码示例如下

Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(MILLIS);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next(it);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(MILLIS + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

系统输出是

Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]

我想如果订阅者很慢,我应该看到更多的线程,因为 Schedulers.elastic()

我还尝试制作 publishOn(),看起来我让它异步了,但仍然无法在多个线程中处理结果。

感谢评论和回答。

最佳答案

如果你想让它在不同的线程中运行,你需要像这样使用 .parallel() 并且发射将在不同的线程中进行

Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(100);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})

.parallel()
.runOn(Schedulers.elastic())

.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(100 + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
;
}

关于java - Reactor 3 发射极/用户并联,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54802472/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com