gpt4 book ai didi

java - 我通过kafka- Producer多线程发送消息,但出现消息丢失

转载 作者:行者123 更新时间:2023-11-30 06:09:57 27 4
gpt4 key购买 nike

我正在使用kafka-生产者并将数据发送到主题“test-topic”,该主题由kafka-cluster(由三个代理组成)中的复制因子3和分区1组成。

我创建了五个线程。每个线程发送 10,000 条消息(每条消息大小为 4000 字节)。

我预计最新偏移量为 50,000,但实际为 44,993。

发生了大约 5,000 条消息丢失。

为什么会出现消息丢失的情况?下面是我的代码...(KAFKA-VERSION 1.1.0)

KafkaMessageSender.class

public class KafkaMessageSender {
private final static Logger logger =
LoggerFactory.getLogger(KafkaMessageSender.class);
private Properties props;
private KafkaProducer<String, String> producer;
private String topic;
private AtomicInteger count;

public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
logger.info("KafkaMessageSender initializing...");
this.topic = topic;
this.count = count;
props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
logger.info("KafkaMessageSender initializing end");

}

public void sendMessages() {
producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
count.getAndIncrement();
logger.info("count : "+count.get());
}
}

KafkaMessageSenderMain.class

public class KafkaMessageSenderMain {

private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);

final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
final static String topic = "test-topic"; //topic name
final static AtomicInteger count = new AtomicInteger(0);
final static int MAX_LOOP = 10000; //message sending count
final static int MAX_THREAD = 5; //created number of threads

public static void main(String[] args) {
long startTime = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
for(int i = 0; i < MAX_THREAD; i++) {
executorService.execute(() ->{
KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
for(int j = 0; j < MAX_LOOP; j++) {
sender.sendMessages(); //send message
}
});
}

executorService.shutdown();
try {
boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long procTime = (endTime - startTime);
logger.info("all Threads is shutdown? : "+flag);
logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
} catch (InterruptedException e) {
logger.error("awaitTermination exception",e);
}
}
}

Result

Result image

最佳答案

您可以如下修改并运行您的代码来看看错误是什么吗?

producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
}
});

关于java - 我通过kafka- Producer多线程发送消息,但出现消息丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50478863/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com