gpt4 book ai didi

java - 没有待处理的回复 : ConsumerRecord

转载 作者:行者123 更新时间:2023-12-01 14:17:33 27 4
gpt4 key购买 nike

我正在尝试使用 ReplyingKafkaTemplate,但间歇性地看到下面的消息。

No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic

它将源于下面的代码

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}

但只是间歇性地发生

我也将共享的 replyTopic 设置为 false,如下所示,并试图强制延长超时时间

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;

我的容器如下

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}

最佳答案

如果是断断续续的,很可能是回复时间过长。消息看起来很清楚

perhaps timed out, or using a shared reply topic

每个客户端实例必须使用它自己的回复主题或专用分区。

编辑

如果收到的消息的相关 ID 与当前在 this.futures 中的条目(待回复)不匹配,您将获得日志。这只会在以下情况下发生:

  1. 请求超时(在这种情况下会有相应的 WARN 日志)。
  2. 模板被停止()ped(在这种情况下 this.futures 被清除)。
  3. 出于某种原因(不应该发生)重新发送已处理的回复。
  4. 在将 key 添加到 this.futures 之前收到回复(不可能发生,因为它是在 send()ing 记录之前插入的)。
  5. 服务器端对同一个请求发送 2 个或更多个回复。
  6. 其他一些应用程序正在向同一回复主题发送数据。如果您可以使用 DEBUG 日志重现它,那将会有所帮助,因为这样我们也会在发送时记录相关 key 。

关于java - 没有待处理的回复 : ConsumerRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53752526/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com