gpt4 book ai didi

Spring Kafka 异步发送调用 block

转载 作者:IT老高 更新时间:2023-10-28 13:47:09 24 4
gpt4 key购买 nike

我使用的是 Spring-Kafka 版本 1.2.1,当 Kafka 服务器关闭/无法访问时,异步发送调用会阻塞一段时间。这似乎是 TCP 超时。代码是这样的:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> result) {
...
}

@Override
public void onFailure(Throwable ex) {
...
}
});

我快速浏览了 Spring-Kafka 代码,它似乎只是将任务传递给 kafka 客户端库,将回调交互转换为 future 的对象交互。查看kafka客户端库,代码变得更加复杂,我没有花时间去理解它,但我猜它可能是在同一个线程中进行远程调用(至少是元数据?)。

作为用户,我希望返回 future 的 Spring-Kafka 方法立即返回,即使远程 kafka 服务器无法访问。

如果我的理解是错误的或者这是一个错误,任何确认都将受到欢迎。我最终暂时将其设为异步。

另一个问题是 Spring-Kafka 文档一开始就说它提供了同步和异步发送方法。我找不到任何不返回 future 的方法,也许文档需要更新。

如果需要,我很乐意提供更多详细信息。谢谢。

最佳答案

除了配置类上的@EnableAsync注解外,如果您调用此代码,还需要在方法上使用@Async注解。

http://www.baeldung.com/spring-async

这里有一些代码片段。 Kafka 生产者配置:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${kafka.brokers}")
private String servers;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}

@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
}

@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
}

@Bean
public Producer producer() {
return new Producer();
}
}

还有制作人本身:

public class Producer {

public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;

@Async
public void send(String topic, GenericMessage message) {
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

@Override
public void onSuccess(final SendResult<String, GenericMessage> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}

@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}

关于Spring Kafka 异步发送调用 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45084688/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com