gpt4 book ai didi

java - Project Reactor,在创建 lambda 之外使用 Flux sink

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:44:53 27 4
gpt4 key购买 nike

  • 当我的服务启动时,我想构建一个简单的管道。
  • 我想隔离 Flux sink 或 Processor,以发出事件。
  • 事件将从多个线程传入,并应根据管道的 subscribeOn() 规范进行处理,但一切似乎都在 main 线程上运行。<
  • 什么是最好的方法?我在下面附上了我的尝试。
  • (我正在使用 reactor-core v3.2.8.RELEASE。)
import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
* I want to construct my React pipelines during creation,
* then emit events over the lifetime of my services.
*/
public class React1Test
{
/**
* Attempt 1 - use a DirectProcessor and send items to it.
* Doesn't work though - seems to always run on the main thread.
*/
@Test
public void testReact1() throws InterruptedException
{
// Create the flux and sink.
FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
FluxSink<String> sink = fluxProcessor.sink();

// Create the pipeline.
fluxProcessor
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

// Give the multi-thread pipeline a second.
Thread.sleep(1000);

// Time passes ... things happen ...
// Pass a few messages to the sink, emulating events.
sink.next("a");
sink.next("b");
sink.next("c");

// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}

// Used down below during Flux.create().
private FluxSink<String> sink2;

/**
* Attempt 2 - use Flux.create() and its FluxSink object.
* Also seems to always run on the main thread.
*/
@Test
public void testReact2() throws InterruptedException
{
// Create the flux and sink.
Flux.<String>create(sink -> sink2 = sink)
.doOnNext(str -> showDebugMsg(str)) // What thread do ops work on?
.subscribeOn(Schedulers.elastic())
.subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

// Give the multi-thread pipeline a second.
Thread.sleep(1000);

// Pass a few messages to the sink.
sink2.next("a");
sink2.next("b");
sink2.next("c");

// It's multi-thread so wait a sec to receive.
Thread.sleep(1000);
}

// Show us what thread we're on.
private static void showDebugMsg(String msg)
{
System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
}
}

输出总是:

a [main]
a [main]
b [main]
b [main]
c [main]
c [main]

但我期望的是:

a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]

提前致谢。

最佳答案

您看到 [main] 是因为您正在从主线程调用 onNext。您使用的 subscribeOn 仅用于订阅(当 create 的 lambda 被触发时)。如果您使用 publishOn 而不是 subscribeOn,您将看到记录的 elastic-* 线程。

另外,考虑使用 Processors , 不鼓励将 sinkFlux.create 和类似的操作符中获取作为字段存储。

关于java - Project Reactor,在创建 lambda 之外使用 Flux sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56063468/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com