gpt4 book ai didi

java - Reactor Flux 上的条件逻辑

转载 作者:行者123 更新时间:2023-11-30 05:43:26 26 4
gpt4 key购买 nike

我是 Reactor 新手。我正在尝试开发以下应用程序逻辑:

  1. 从 Kafka 主题读取消息。
  2. 改变按摩方式。
  3. 将转换后的消息的子集写入新的 Kafka 主题目标
  4. 明确确认对最初从主题读取的所有消息的读取操作。

我找到的唯一解决方案是重写上述业务逻辑,如下所示。

  1. 从 Kafka 主题读取消息。
  2. 改变按摩方式。
  3. 立即确认消息不会写入主题目标
  4. 过滤上述所有消息。
  5. 将其余转换后的消息写入新的 Kafka 主题目标
  6. 明确确认这些消息的读取操作

实现第二个逻辑的代码如下:

receiver.receive()
.flatMap(this::processMessage)
.map(this::acknowledgeMessagesNotToWriteInKafka)
.filter(this::isMessageToWriteInKafka)
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());

显然,receiver 类型是 KafkaReceiver,并且方法 sendToKafka 使用 KafkaSender。我不喜欢的一件事是我使用 map 来确认一些消息。

有没有更好的方案来实现原来的逻辑?

最佳答案

这并不完全是您的四个业务逻辑步骤,但我认为它更接近您想要的。

您可以确认“丢弃”的消息不会写入 .doOnDiscard.filter之后...

receiver.receive()
.flatMap(this::processMessage)
.filter(this::isMessageToWriteInKafka)
.doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());

注意:您需要使用被丢弃的正确对象类型。我不知道从 processMessage 返回的 Publisher 发出什么类型的对象,但我假设您可以从中获取 ReceiverRecordReceiverOffset命令承认它。

或者,您可以将 filter/doOnDiscard 合并到单个 .handle 中。运算符...

receiver.receive()
.flatMap(this::processMessage)
.handle((m, sink) -> {
if (isMessageToWriteInKafka(m)) {
sink.next(m);
} else {
m.getReceiverRecord().getReceiverOffset().acknowledge();
}
})
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());

关于java - Reactor Flux 上的条件逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55264060/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com