gpt4 book ai didi

java - 将 Spring Integration executorChannel 与 Spring Cloud Function 结合使用

转载 作者:行者123 更新时间:2023-12-01 17:27:36 27 4
gpt4 key购买 nike

我正在使用 Spring Cloud 函数通过 Flux 处理来自 kafka 的数据。默认情况下,它在消费者线程(消费消息的地方)中处理数据。我将实现线程池来进行并行数据处理和限制,Spring Cloud Integration 中有一个很好的实现,称为 executorChannel ( https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html )

函数实现示例:

public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}

所以我发现没有简单的方法来连接通过 executorChannel 实现的函数。

M.b.有没有办法定义 inputChannel 类型?

UPD:阅读 Oleg 答案下的评论。它们非常有用。

最佳答案

你的意思是这样的吗?

@SpringBootApplication
public class SampleFunctoinAppApplication {

public static void main(String[] args) throws Exception {

ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
output.subscribe(System.out::println);

MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
channel.send(new GenericMessage<String>("hello"));
}

@Bean
public IntegrationFlow flow() {
return IntegrationFlows
.from("executorChannel")
.transform(echo())
.channel("output")
.get();
}

@Bean
public ExecutorChannel executorChannel() {
return new ExecutorChannel(Executors.newCachedThreadPool());
}

public Function<String, String> echo() {
return v -> v;
}
}

“定义 inputChannel 类型”是什么意思?

关于java - 将 Spring Integration executorChannel 与 Spring Cloud Function 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61188585/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com