gpt4 book ai didi

scala - 在失败时优雅地重新启动 Reactive-Kafka 消费者流

转载 作者:行者123 更新时间:2023-12-04 01:13:53 25 4
gpt4 key购买 nike

问题
当我重新启动/完成/停止流时,旧的消费者不会死/关机:

[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.

说明
我正在构建一个从 Kafka 主题接收消息并通过 HTTP 请求将消息发送到外部服务的服务。
  • 与外部服务的连接可能会中断,我的服务需要重试请求。
  • 此外,如果流中出现错误,则整个流都需要重新启动。
  • 最后,有时我不需要流及其相应的 Kafka 消费者,我想关闭整个流

  • 所以我有一个流:
    Consumer.committableSource(customizedSettings, subscriptions)
    .flatMapConcat(sourceFunction)
    .toMat(Sink.ignore)
    .run

    Http请求在 sourceFunction中发送

    我按照新文档中的新 Kafka Consumer Restart 说明进行操作
      RestartSource.withBackoff(
    minBackoff = 20.seconds,
    maxBackoff = 5.minutes,
    randomFactor = 0.2 ) { () =>
    Consumer.committableSource(customizedSettings, subscriptions)
    .watchTermination() {
    case (consumerControl, streamComplete) =>
    logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
    consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
    streamComplete
    .flatMap { _ =>
    consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
    }
    .recoverWith {
    case _ =>
    consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
    }
    }
    .flatMapConcat(sourceFunction)
    }
    .viaMat(KillSwitches.single)(Keep.right)
    .toMat(Sink.ignore)(Keep.left)
    .run

    有一个 issue在一个复杂的 Akka 流中讨论了这个非终止消费者,但目前还没有解决方案。

    是否有强制 Kafka Consumer 终止的解决方法

    最佳答案

    如何将消费者包装在一个 Actor 中并注册一个 KillSwitch,请参阅:https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

    然后在 Actor postStop 方法中,您可以终止流。
    通过将 Actor 包装在 BackoffSupervisor 中,您可以获得指数退避。

    示例 Actor :https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27

    关于scala - 在失败时优雅地重新启动 Reactive-Kafka 消费者流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50465969/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com