gpt4 book ai didi

java - Spring Cloud Stream Kafka - 以批处理模式消费消息并作为单个处理消息发送出去

转载 作者:行者123 更新时间:2023-12-01 21:50:35 25 4
gpt4 key购买 nike

我尝试为下一个 Spring Cloud Stream 版本准备我们的应用程序。 (当前使用3.0.0.RC1)。使用 Kafka Binder。

现在我们收到一条消息,对其进行处理并将其重新发送到另一个主题。单独处理每条消息会导致对我们的数据库产生大量单个请求。

在 3.0.0 版本中,我们希望批量处理消息,这样我们就可以批量更新保存数据。

当前版本我们使用@EnableBinding、@StreamListener

@StreamListener( ExchangeableItemProcessor.STOCK_INPUT )
public void processExchangeableStocks( final ExchangeableStock item ) {
publishItems( exchangeableItemProcessor.stocks(), articleService.updateStockInformation( Collections.singletonList( item ) ) );
}

void publishItems( final MessageChannel messageChannel, final List<? extends ExchangeableItem> item ) {
for ( final ExchangeableItem exchangeableItem : item ) {
final Message<ExchangeableItem> message = MessageBuilder.withPayload( exchangeableItem )
.setHeader( "partitionKey", exchangeableItem.getId() )
.build();
messageChannel.send( message )
}
}

我已将使用者属性设置为“批处理模式”并将签名更改为 List<> ,但这样做会导致收到 List<byte[]>而不是预期的 List<ExchangeableStock> 。Ofc 可以在之后进行转换,但这感觉就像“meh”,我认为这应该在调用监听器之前发生。

然后我尝试了(新的)功能版本,并且消费效果很好。我也喜欢这种简单的处理版本

@Bean
public Function<List<ExchangeableStock>, List<ExchangeableStock>> stocks() {
return articleService::updateStockInformation;
}

但是输出主题现在将对象列表作为一条消息接收,并且以下使用者无法正常工作。

我想我错过了一些东西......

我是否需要添加某种 MessageConverter(对于注释驱动版本)或者是否也有办法通过功能版本实现所需的行为?

最佳答案

IIRC,批处理模式仅支持函数。

你能不能不用Consumer<List< ExchangeableStock>>并将消息发送到 channel ,就像您当前在 StreamListener 中所做的那样?

关于java - Spring Cloud Stream Kafka - 以批处理模式消费消息并作为单个处理消息发送出去,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58765961/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com