gpt4 book ai didi

java - Akka Streams onFailuresWithBackoff 未重新启动流程

转载 作者:行者123 更新时间:2023-12-02 10:22:24 26 4
gpt4 key购买 nike

如果在阶段期间发生任何故障,我尝试在 Akka Streams javadsl 中使用 RestartFlow 来重新启动我的流程阶段之一,但它似乎并没有重新启动流程,而只是删除消息。

我已经看过这个:RestartFlow in Akka Streams not working as expected ,但我使用的是 2.5.19 版本,所以应该修复它?

我尝试了 RestartFlow.onFailuresWithBackoffRestartFlow.withBackoff 但都不起作用。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截异常,以便它不会从流程中抛出,并且 plus 似乎没有提供我想要的退避和最大重试策略。

流:

public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}

然后是流程:

private Flow<ConsumerMessage.CommittableMessage<String, String>,
ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
.mapAsyncUnordered(
config.getInt(ConfigKeys.PARALLELISM),
msg ->
streamProcessor.process(msg.record().value())
.whenComplete((done, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else {
if (done.status().isSuccess()){
streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
done.toString());
} else {
throw new RuntimeException("HTTP Error!");
}
}
})
.thenApply(done -> msg.committableOffset()));
}

我看到了一次异常,akka 指出它将因失败而重新启动图表,但此后就没有其他了。根据我的理解,我应该再看10次。消费者继续收听新消息,因此看起来消息刚刚被丢弃。

java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
... 6 more

如果有人可以帮助我指出正确的方向,我将不胜感激。

最佳答案

它的工作方式有点不同。长话短说 - 如果发生错误,消息将被删除,但源/流将重新启动,而不会杀死整个流。 RestartFlow.onFailuresWithBackoff documentation 中对此进行了描述。 :

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.

关于java - Akka Streams onFailuresWithBackoff 未重新启动流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54245791/

26 4 0