- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我收到以下错误 - 原因为:org.apache.kafka.common.errors.TimeoutException:过期 1 记录如下,
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
record(s) for pipeline-demo-0: 60125 ms has passed since last append
2020-04-26 16:11:14.927 ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and
payload='KafkaMessage(message={grx_projectCode=Value(v=demo,
dataType=STRING), grx_gid=Value(v=5e5207a8-881d-...' to topic
pipeline-demo and partition 0:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for pipeline-demo-0: 60125 ms has passed since last append
2020-04-26 16:11:14.927 ERROR i.t.g.c.c.s.i.DumpToKafkaServiceImpl - Dump to kafka exception
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for pipeline-demo-0: 60125 ms has passed since last append
已经尝试了更大的超时和更小的批量大小的多种组合,并且 0 linger ms 仍然出现此错误。
消费者配置:
event.topic=events
consumer.threads=1
max.poll.records=1000
max.poll.interval.ms=120000
max.partition.fetch.bytes=1048576
fetch.max.bytes=524288000
fetch.min.bytes=1
fetch.max.wait.ms=500
生产者配置:
retries=2
batch.size=100
linger.ms=0
buffer.memory=17179869184
acks=all
生产者代码
@Override
public void send(String topic, KafkaMessage kafkaMessage, String partitionBy, String correlationId) {
Integer partition = null;
if (!StringUtils.isEmpty(partitionBy)) {
try {
int numPartitions = template.partitionsFor(topic).size();
partition = Utils.abs(Utils.murmur2(partitionBy.getBytes())) % numPartitions;
} catch (Exception e) {
log.error("Unable to get partitions for topic", e);
}
}
ProducerRecord<Integer, KafkaMessage> record = new ProducerRecord<Integer, KafkaMessage>(topic, partition, null,
kafkaMessage, null);
ListenableFuture<SendResult<Integer, KafkaMessage>> future = template.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<Integer,KafkaMessage>>() {
@Override
public void onSuccess(SendResult<Integer, KafkaMessage> result) {
MeterFactory.getEventsSavedMeter().mark();
}
@Override
public void onFailure(Throwable ex) {
log.error("Dump to kafka exception ", ex);
MeterFactory.getEventsSaveFailedMeter().mark();
}
});
}
配置代码,KafkaProducerConfig.java,
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;
@Value("${retries}")
private String retries;
@Value("${batch.size}")
private String batchSize;
@Value("${linger.ms}")
private String lingerMilliSeconds;
@Value("${buffer.memory}")
private String bufferMemory;
@Value("${acks}")
private String acks;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMilliSeconds);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<Integer, KafkaMessage> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<Integer, KafkaMessage> kafkaTemplate() {
return new KafkaTemplate<Integer, KafkaMessage>(producerFactory());
}
}
最佳答案
Kafka 不会立即发送记录。它对它们进行批处理,并定期发送配置大小的批处理 (batchSize & lingerMilliSconds) 。
根据只有少数记录过期的消息,您发送的数据太少而刷新生产者。
关于java - 间歇性出现 KafkaProducerException : Failed to send org. apache.kafka.common.errors.TimeoutException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61439665/
我在Web应用程序中使用WebRTC进行音频播放。因为我是WebRTC的新手,所以我使用@https://webrtc.github.io/samples/src/content/peerconnec
上下文 我的 VBA 代码经常替换工作簿中的工作表。因此,我无法直接在工作表模块中使用代码,因为它最终会在此过程中被删除。 我使用用户定义的类来处理我的事件(强烈受到 Chip Pearson's w
我已经搜索过这个问题,如果这个问题已经得到解答,我深表歉意(我很高兴被重定向),但具体来说,我们的问题是间歇性的。 我们的客户提示当事件从我们的软件发送到他们的手机时,通知音频间歇性地没有“响起”。它
背景故事优先: 我们有一个正在运行的部署在尝试使用 JMeter 等工具对其进行负载测试时遇到间歇性 502。它是一个将 POST 数据记录到另一个容器上的 mysql 数据库的容器。它每秒处理大约
在向我托管的 https://网站发出简单的 GET 请求时,我不断收到间歇性 SecureChannelFailure 错误。没有错误进入服务器日志文件。每 100 次调用的频率小于 1 个错误,但
我正在通过 eval 运行一些 JavaScript(我知道,开枪吧),它基本上枚举了文档对象上的所有属性。我的问题是,虽然它在 firebug 中工作,但从脚本运行时,它在 Firefox 中抛出未
我发现了这个关于 iBeacon 的教程 (http://www.appcoda.com/ios7-programming-ibeacons-tutorial/),我觉得很有趣。我已经下载了他们的源代
我在我的开发箱上本地运行 WCF 服务,我的测试检查该服务一切正常。 通常,一切都很好,但有时(5% 的时间),我会收到错误 The requested service, 'net.tcp://csm
我正在使用 django 和 jQuery 构建一个网络应用程序,并且在其中一个页面上 $(document).com('click'... 事件间歇性地触发。我在结帐队列中有一个项目列表,以及删除每
Excepcion:com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure ...
我正在尝试对 MySQL 5.5 数据库执行一系列检查/插入操作,但我经常遇到间歇性的 SIGSEGV 错误问题。在执行许多查询的过程中,SELECT 语句运行得很好。然而,在经过一些可变的时间或执行
我每天至少发生一次崩溃,我似乎真的无法理解。它似乎在随机时刻发生在我身上,我无法追踪堆栈来理解它发生的原因。如果有人能为我指出正确的方向,甚至向我展示一些关于如何正确追踪值的在线文档和教程,那将是完美
我尝试用一些更简单的函数重现它,但没有成功。所以下面的代码显示了我们的生产服务器抛出的 KeyError 的相关方法,很多。 class PokerGame: ... def serial
我们有一个托管在 Windows 服务中的 WCF 服务和一个访问该服务的非线程客户端。该服务正在执行对 SQL Server 2008 数据库的数据访问。间歇性地在客户端发生以下异常: System
我们有一个 SSL 问题,我 99% 认为这不是您通常使用的证书信任存储旋转木马。 我们有一个 Weblogic 服务器试图通过 LDAPS 与 Active Directory 建立 SSL 连接,
我有一个复杂的经典 ASP 系统,多年来运行良好,但最近开始出现奇怪的间歇性问题。 在某些表单上,人们会报告说他们点击了“提交”,但表单只是自行重置(或者,浏览器可能只是重新加载了表单——我的用户可能
我在编译顶点着色器时遇到间歇性错误,为新创建的 OpenGL 上下文的首次渲染做准备。它是通常在相同硬件上运行的相同顶点着色器。失败后,glGetShaderInfoLog 返回的信息日志通常显示如下
我有一个目前看来无法解决的 EXC_BAD_ACCESS 问题。我试过启用 NSZombie,这似乎是许多帖子中的建议,但我处理的是 c 指针而不是 obj c 对象,所以我没有获得任何有用的调试信息
在 iOS 上出现间歇性 SSL 错误。我已经关闭了 ATS,我们知道这也会发生在操作系统版本 < iOS9 上 Error Domain=NSURLErrorDomain Code=1011 "An
我有一个使用 RequireJS 的相当大的 Backbone.js 项目。随着项目规模的增长(这里的“规模”指的是独立模块文件的数量),间歇性错误开始出现。大多数情况下,这是一个对象错误: 未捕获的
我是一名优秀的程序员,十分优秀!