gpt4 book ai didi

Spring Cloud 流 MessageChannel send() 总是返回 true

转载 作者:行者123 更新时间:2023-12-04 13:46:16 25 4
gpt4 key购买 nike

我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重新尝试将它们发布到主题上,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。

有人可以帮忙吗?

更新 application.yml 内容:

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json

代码:
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);

@Autowired
Source source;

@Autowired
NotificationFileService notificationFileService;

public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);

if (!resp){
log.info(" ======== failed to send the notification" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
}
}).get();
} catch (InterruptedException | ExecutionException e) {
log.info(" ======== failed to send the notification with exception" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
e.printStackTrace();
}
}
}

最佳答案

Kafka 默认是异步的;您需要设置 synctrue ;见 binder producer properties .

sync

Whether the producer is synchronous.

Default: false.

关于Spring Cloud 流 MessageChannel send() 总是返回 true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47678318/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com