gpt4 book ai didi

scala - 使用相同的 Actor 创建​​ Actor 发布者和 Actor 订阅者

转载 作者:行者123 更新时间:2023-12-01 13:51:33 28 4
gpt4 key购买 nike

我是 akka 流的新手。我将 kafka 用作源(使用 ReactiveKafka 库)并通过流对数据进行一些处理并使用订阅者 (EsHandler) 作为接收器。

现在我需要处理错误并通过错误处理程序将其推送到不同的 kafka 队列。我正在尝试将 EsHandler 用作发布者和订阅者。我不确定如何将 EsHandler 作为中间人而不是接收器。

这是我的代码:

val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())

val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }

val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))

Source(publisher).via(flow).to(sink).run()


class EsHandler extends ActorSubscriber with ActorPublisher[Model] {

val requestStrategy = WatermarkRequestStrategy(100)

def receive = {
case OnNext(msg: Model) =>
context.actorOf(Props(classOf[EsStorage], self)) ! msg

case OnError(err: Exception) =>
context.stop(self)

case OnComplete =>
context.stop(self)

case Response(msg) =>
if (msg.isError()) onNext(msg.getContent())
}
}

class ErrorHandler extends ActorSubscriber {

val requestStrategy = WatermarkRequestStrategy(100)

def receive = {
case OnNext(msg: Model) =>
println(msg)
}
}

最佳答案

我们强烈建议反对实现您自己的处理器(这是 reactive streams 规范赋予“订阅者 && 发布者”的名称)。很难做到正确,这就是为什么没有Publisher 直接暴露为 helper trait。

相反,大多数时候您会希望使用Sources/Sinks(或Publishers/Subscribers) 提供给您并在这些步骤之间运行您的操作,如映射/过滤器等步骤。

事实上,您可以使用 Kafka Sources 和 Sinks 的现有实现,它称为 reactive-kafka并由 Reactive Streams TCK 验证,因此您可以相信它是有效的实现。

关于scala - 使用相同的 Actor 创建​​ Actor 发布者和 Actor 订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31272267/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com