gpt4 book ai didi

scala - 将回调方法实现转换为 akka 流 Source

转载 作者:行者123 更新时间:2023-12-04 18:39:44 24 4
gpt4 key购买 nike

我正在与我无法控制的 java 库中的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(库是 java,但为了简洁起见,我将在 scala 中描述):

type DataType = ???

trait DataConsumer {
def onData(data : DataType) : Unit
}

该库的用户需要编写一个实现 onData 的类。方法并将其传递给 DataProducer ,库代码看起来像:
class DataProducer(consumer : DataConsumer) {...}
DataProducer有自己无法控制的内部线程和伴随的数据缓冲区,即调用 onData每当有另一个 DataType消费对象。

所以,我的问题是:我如何编写一个层来将原始库模式转换/翻译成 akka 流 Source目的?

先感谢您。

最佳答案

回调 --> 来源

详细说明 Endre Varga 的答案,下面是创建 DataConsumer 的代码。将消息发送到 akka 流的回调函数 Source .

警告:创建一个功能性的 ActorPublish 比我在下面指出的要多得多。特别是,需要进行缓冲以处理 DataProducer 的情况。正在调用onDataSink 更快是信号需求(见 example)。下面的代码只是设置“接线”。

import akka.actor.ActorRef
import akka.actor.Actor.noSender

import akka.stream.Actor.ActorPublisher

/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)

回调 --> 整个流

最初的问题专门要求对 Source 进行回调,但如果整个流已经可用(而不仅仅是 Source),则处理回调更容易处理。那是因为流可以被具体化为 ActorRef使用 Source#actorRef功能。举个例子:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val bufferSize = 100

val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()

val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}

val dataProducer = DataProducer(streamConsumer)

关于scala - 将回调方法实现转换为 akka 流 Source,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29436301/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com