gpt4 book ai didi

java - 如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?

转载 作者:行者123 更新时间:2023-12-04 03:26:02 26 4
gpt4 key购买 nike

来自 Quarkus我需要将墓碑消息发布到压缩的 Apache Kafka 主题的应用程序。由于我的用例是必要的,所以我使用 Emitter用于向主题 ( as suggested in the quarkus blog ) 发送消息。 非逻辑删除 消息(带有负载)的代码是:

@Dependent
public class Publisher {

@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;

public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}

Emitter还实现了 <M extends Message<? extends T>> void send(M msg)我希望这能让我制作一个 Message负载为 null作为墓碑消息。不幸的是 Message.of(..) 的所有实现允许提供元数据(提供消息 key 所需的元数据)的工厂方法,指定有效负载不得为{@code null}

使用 Emitter 将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus/SmallRye Reactive Messaging 概念) ?

最佳答案

我建议使用 Record类(参见 documentation )。Record是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null ,但在您的情况下,只有值部分应该是 null : Record.of(key, null); .

因此,您需要将发射器的类型更改为 Record<Key, Value> ,例如:

@Dependent
public class Publisher {

@Inject
@Channel("theChannelName")
Emitter<Record<Key, MyDataStructure>> emitter;

public CompletionStage<Void> publish(final MyDataStructure myData) {
return emitter.send(Record.of(myData.getTopicKey(), null);
}
}

同时 runAsync很方便,发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。

我的代码返回了 send 的结果方法是 CompletionStage .当记录写入 Kafka(并被代理确认)时,该阶段将完成。

关于java - 如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67601699/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com