- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 ReplyingKafkaTemplate,但间歇性地看到下面的消息。
No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic
它将源于下面的代码
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
if (this.sharedReplyTopic) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(missingCorrelationLogMessage(record, correlationId));
}
}
else if (this.logger.isErrorEnabled()) {
this.logger.error(missingCorrelationLogMessage(record, correlationId));
}
}
但只是间歇性地发生
我也将共享的 replyTopic 设置为 false,如下所示,并试图强制延长超时时间
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
replyKafkaTemplate.setSharedReplyTopic(false);
replyKafkaTemplate.setReplyTimeout(10000);
return replyKafkaTemplate;
我的容器如下
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setIdleEventInterval(10000L);
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
最佳答案
如果是断断续续的,很可能是回复时间过长。消息看起来很清楚
perhaps timed out, or using a shared reply topic
每个客户端实例必须使用它自己的回复主题或专用分区。
编辑
如果收到的消息的相关 ID 与当前在 this.futures 中的条目(待回复)不匹配,您将获得日志。这只会在以下情况下发生:
关于java - 没有待处理的回复 : ConsumerRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53752526/
当我尝试运行我的 SupplierConsumer 时遇到这些错误 eclipse 中的类。这是我的代码: public class SupplierConsumer{ public stat
我正在尝试使用 ReplyingKafkaTemplate,但间歇性地看到下面的消息。 No pending reply: ConsumerRecord(topic = request-reply-t
我写了一个 python 脚本: #!/usr/bin/env python from kafka import KafkaConsumer consumer = KafkaConsumer('dim
我是kafka技术的新手..我正在研究POC,我需要发送ProducerRecord到 Kafka 主题,其中 Paymnt 是我的 POJO..我能够发布记录并且我可以看到消息被传递到 Kafka
我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll()返回 ConsumerRecords 的实例.我想初始化 ConsumerRecords并在模拟中使用它,但
我正在使用 Spark 2.0.2 和 Kafka 0.11.0,并且 我试图在 Spark 流中使用来自 kafka 的消息。以下是代码: val topics = "notes" val kafk
这个问题已经有答案了: 奥 git _a (3 个回答) 已关闭 3 年前。 我是 Apache Kafka 的初学者。以下代码示例适用于我的 Kafka 生产者和消费者。 Kafka 生产者代码:
我很确定我只推送数据字符串并反序列化为字符串。我推送的记录也显示错误。 但是为什么突然出现这种类型的错误,是不是我遗漏了什么? 下面是代码, import java.util.HashMap;
我正在使用直接方法(对于 kafka 0.1.0 或更高版本)运行 Spark Streaming 作业以从 Kafka 消费。使用maven-assembly-plugin构建POM文件并使用 ja
Spark 2.0.0 Apache 卡夫卡 0.10.1.0 斯卡拉 2.11.8 当我使用 spark streaming and kafka integration with kafka bro
我是一名优秀的程序员,十分优秀!