gpt4 book ai didi

java - Quarkus + Kafka + Smallrye 异常处理

转载 作者:行者123 更新时间:2023-12-03 19:18:23 30 4
gpt4 key购买 nike

如何使用 quarkus + kafka + smallrye 处理流处理异常?

我的代码与 quarkus 指南( https://quarkus.io/guides/kafka#imperative-usage )上的命令式生产者示例非常相似

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

@Inject @Channel("price-create") Emitter<Double> priceEmitter;

@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}

我想要一个类似于 vanilla Kafka 库的东西,它提供了处理请求发送的每条记录的回调的选项。
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.info(record.toString());

if (exception != null) {
logger.error("Producer exception", exception);
}
}
});

塔克斯

最佳答案

有一个section of the docs on Acknowlegement

@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Message<String>> manualAck(Message<String> input) {
return CompletableFuture.supplyAsync(input::getPayload)
.thenApply(Message::of)
.thenCompose(m -> input.ack().thenApply(x -> m));
}

关于java - Quarkus + Kafka + Smallrye 异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60005697/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com