gpt4 book ai didi

java - 在 @KafkaListener 注释方法中使用响应式(Reactive) webflux 代码

转载 作者:行者123 更新时间:2023-12-05 04:48:29 25 4
gpt4 key购买 nike

我正在使用 spring-kafka 实现一个从特定主题读取消息的消费者。所有这些消息都由它们处理,并通过 REST API 导出到另一个系统。为此,代码使用了 Spring Webflux 项目中的 WebClient,这导致了响应式代码:

  @KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
public void listenToTopic(final ConsumerRecord<String, String> record) {

// minimal, non-reactive code here (logging, serizializing the string)

webClient.get().uri(...).retrieve().bodyToMono(String.class)
// long, reactive chain here
.subscribe();
}

现在我想知道这个设置是否合理,或者这是否会导致很多问题,因为 spring-kafka 的 KafkaListener 逻辑本身并不是 react 性的。我想知道是否有必要改用reactor-kafka。我对整个 react 世界以及 kafka 世界的理解非常有限,但这是我目前假设上述设置需要的内容:

  1. listenToTopic 函数几乎会立即返回,因为大部分工作是在 react 链中完成的,不会阻止函数返回。这意味着,据我所知,KafkaListener 逻辑将假定消息已在该处得到正确处理,因此它可能会确认它并在某个时候也提交它。如果我理解正确,那么这意味着消息的处理可能会出现问题。当 KafkaListener 已经获取下一条记录时,工作仍然可以在之前的 react 链中完成。这意味着如果应用程序依赖于按严格顺序完全处理的消息,那么上述设置将是错误的。但如果没有,那么上面的设置就可以了吗?
  2. 上述设置的另一个问题是,如果有大量消息传入,应用程序可能会重载工作。因为监听器函数几乎立即返回,大量消息可能正在 react 链内部处理同时。
  3. @KafkaListener 逻辑内置的重试逻辑在这里不会真正起作用,因为 react 链内部的异常不会触发它。任何重试逻辑都必须由监听器函数本身内部的响应式(Reactive)代码处理。
  4. 当使用 reactor-kafka 而不是 @KafkaListener 注释时,可以更改第 1 点中描述的行为。因为监听器现在将集成到 react 链中,所以只有当 react 链实际上已经完成了。这样,据我了解,只有在通过 react 链完全处理一条消息后,才会获取下一条消息。这也可能会解决第 2-4 点中描述的问题/行为。

问题:我对情况的理解是否正确?是否还有我遗漏的此设置可能导致的其他问题?

最佳答案

你的理解是正确的;切换到非响应式(Reactive)休息客户端(例如 RestTemplate)或为消费者使用 reactor-kafka

关于java - 在 @KafkaListener 注释方法中使用响应式(Reactive) webflux 代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68059163/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com