gpt4 book ai didi

java - Spring Integration 5.0 + Project Reactor : controlling threads

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:08:50 26 4
gpt4 key购买 nike

关于 https://stackoverflow.com/a/47136941/1776585 的后续问题

在使用 Flux + split() + FluxMessageChannel 时,我无法让我的集成处理程序在并行线程中运行。

考虑以下片段:

// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

所有日志在一个线程中输出:

[     parallel-1] d.a.Application    : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9

如何强制在多线程中处理?

我试过在 Flux 上使用 .parallel().runOn(),但这只会使获取数据并行化,但实际处理仍然在一个线程上运行.

我还在 Flux 上尝试了 .publishOn(Schedulers.parallel()) 但没有效果。

同时向处理程序添加 ExecutorChannel 或带有执行程序的 Poller 也没有帮助。

最佳答案

这有一些技巧:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

FluxMessageChannel 使用的那些消息将与那个额外的 ExecutorChannel 并行。

我认为您要问的是使上述 FluxMessageChannel 可配置的功能请求。并且可以在那里配置这样的 subscribeOn/publishOn 等。

欢迎提出 JIRA关于这件事!

关于java - Spring Integration 5.0 + Project Reactor : controlling threads,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49496099/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com