gpt4 book ai didi

java - Apache Kafka 消息消费确认

转载 作者:行者123 更新时间:2023-11-30 07:46:19 25 4
gpt4 key购买 nike

我已经实现了一个 Java 使用者,它使用来自 Kafka 主题的消息,然后将这些消息与 POST 请求一起发送到 REST API。

    while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
Object message = record.value();
JSONObject jsonObj = new JSONObject(message.toString());
try {
HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
StringEntity params = new StringEntity(message.toString());
request.addHeader("content-type", "application/json");
request.addHeader("Accept", "application/json");
request.setEntity(params);

CloseableHttpResponse response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
System.out.println(message.toString());
System.out.println(responseString);
} catch(Exception ex) {
ex.printStackTrace();
} finally {
try {
httpClient.close();
} catch(IOException ex) {
ex.printStackTrace();
}
}
}
}

假设一条消息已被使用,但 Java 类无法访问 REST API。该消息将永远不会被传递,但会被标记为已使用。处理此类情况的最佳方法是什么?当且仅当来自 REST API 的响应成功时,我能否以某种方式确认消息?

最佳答案

在消费者属性中,将 enable.auto.commit 设置为 false。这意味着提交补偿的责任在于消费者。

因此,在上面的示例中,基于 response.statusCode,您可以选择通过调用 consumer.commitAsync() 来提交偏移量。

关于java - Apache Kafka 消息消费确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50713020/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com