gpt4 book ai didi

java - 如何确保消息到达Kafka Broker?

转载 作者:行者123 更新时间:2023-11-30 05:47:30 27 4
gpt4 key购买 nike

我的本​​地计算机上有一个消息生成器,在远程主机 (aws) 上有一个代理。

生产者发送消息后,我等待并调用远程主机上的控制台消费者并查看过多的日志。没有来自生产者的值(value)。

生产者在调用send方法后刷新数据。一切都配置正确。

如何检查代理是否收到了来自生产者的消息以及生产者是否收到了答案?

最佳答案

Send方法异步将消息发送到主题,并且返回 FutureRecordMetadata .

java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Asynchronously sends a record to a topic

flush之后称呼,通过调用 isDone 检查 Future 是否已完成方法。(例如 Future.isDone() == true )

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.

RecordMetadata包含offsetpartition

公共(public) int 分区()

The partition the record was sent to

公共(public)长偏移()

the offset of the record, or -1 if {hasOffset()} returns false.

或者您也可以使用Callback确保消息是否发送到主题的函数

Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

这里是文档中的清晰示例

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});

关于java - 如何确保消息到达Kafka Broker?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54595952/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com