gpt4 book ai didi

kotlin - 结合入站 channel 适配器和流发射器

转载 作者:行者123 更新时间:2023-12-02 13:38:42 24 4
gpt4 key购买 nike

我正在玩 Spring Cloud Stream Reactive,我遇到了一个问题。

考虑以下代码:

@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "\${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()

我的目标是生成一种通量,该通量应由以下形式消耗:
 @StreamListener("list") @Output("download")
fun processList(center: Center): Flux<Product> = ...

但这似乎不起作用。 channel 适配器正确生成通量,但它显示以下内容:
 org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')

我想我应该添加一个 StreamEmitter沿入站 channel 适配器进行注释,但这似乎不起作用。

实现这种流程的正确方法是什么?

谢谢和问候,

费尔南多

最佳答案

异常(exception)很明显:您生成了 Flux对象作为 payload发送到 list 的消息要发送到消息传递中间件上的目标目的地的 channel 。并且它完全正确 Flux as is 与要序列化的 JSON 不兼容。

另一方面,我不确定 Kotlin 是什么并将您的代码编译为 Java,但我们有类似这样的开箱即用的东西(对于 Java):

@StreamEmitter
@Output("list")
public Flux<Center> timerMessageSource() {
return config.centers.toFlux();
}

并且通量中的每个发射项目都将被序列化并作为记录或消息发送到 Binder。如果您的 Center当然是 JSON 兼容的。为此,您需要一个 spring-cloud-stream-reactive依赖。

对, @InboundChannelAdapter在这里没有帮助甚至打扰。

如果你担心一些周期性的发射,如果应该考虑在 Project Reactor 中的调度支持。 .

关于kotlin - 结合入站 channel 适配器和流发射器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49320714/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com