gpt4 book ai didi

java - 间歇性出现 KafkaProducerException : Failed to send org. apache.kafka.common.errors.TimeoutException

转载 作者:行者123 更新时间:2023-12-01 17:09:21 28 4
gpt4 key购买 nike

我收到以下错误 - 原因为:org.apache.kafka.common.errors.TimeoutException:过期 1 记录如下,

 Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
record(s) for pipeline-demo-0: 60125 ms has passed since last append
2020-04-26 16:11:14.927 ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and
payload='KafkaMessage(message={grx_projectCode=Value(v=demo,
dataType=STRING), grx_gid=Value(v=5e5207a8-881d-...' to topic
pipeline-demo and partition 0:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for pipeline-demo-0: 60125 ms has passed since last append
2020-04-26 16:11:14.927 ERROR i.t.g.c.c.s.i.DumpToKafkaServiceImpl - Dump to kafka exception
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for pipeline-demo-0: 60125 ms has passed since last append

已经尝试了更大的超时和更小的批量大小的多种组合,并且 0 linger ms 仍然出现此错误。

消费者配置:

event.topic=events
consumer.threads=1
max.poll.records=1000
max.poll.interval.ms=120000
max.partition.fetch.bytes=1048576
fetch.max.bytes=524288000
fetch.min.bytes=1
fetch.max.wait.ms=500

生产者配置:

retries=2
batch.size=100
linger.ms=0
buffer.memory=17179869184
acks=all

生产者代码

@Override
public void send(String topic, KafkaMessage kafkaMessage, String partitionBy, String correlationId) {
Integer partition = null;
if (!StringUtils.isEmpty(partitionBy)) {
try {
int numPartitions = template.partitionsFor(topic).size();
partition = Utils.abs(Utils.murmur2(partitionBy.getBytes())) % numPartitions;
} catch (Exception e) {
log.error("Unable to get partitions for topic", e);
}
}

ProducerRecord<Integer, KafkaMessage> record = new ProducerRecord<Integer, KafkaMessage>(topic, partition, null,
kafkaMessage, null);
ListenableFuture<SendResult<Integer, KafkaMessage>> future = template.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<Integer,KafkaMessage>>() {

@Override
public void onSuccess(SendResult<Integer, KafkaMessage> result) {
MeterFactory.getEventsSavedMeter().mark();

}

@Override
public void onFailure(Throwable ex) {
log.error("Dump to kafka exception ", ex);
MeterFactory.getEventsSaveFailedMeter().mark();
}
});
}

配置代码,KafkaProducerConfig.java,

public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Value("${retries}")
private String retries;

@Value("${batch.size}")
private String batchSize;

@Value("${linger.ms}")
private String lingerMilliSeconds;

@Value("${buffer.memory}")
private String bufferMemory;

@Value("${acks}")
private String acks;

public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMilliSeconds);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

@Bean
public ProducerFactory<Integer, KafkaMessage> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<Integer, KafkaMessage> kafkaTemplate() {
return new KafkaTemplate<Integer, KafkaMessage>(producerFactory());
}

}

最佳答案

Kafka 不会立即发送记录。它对它们进行批处理,并定期发送配置大小的批处理 (batchSize & lingerMilliSconds) 。

根据只有少数记录过期的消息,您发送的数据太少而刷新生产者。

关于java - 间歇性出现 KafkaProducerException : Failed to send org. apache.kafka.common.errors.TimeoutException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61439665/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com