gpt4 book ai didi

scala - 如何动态添加元素到Source?

转载 作者:行者123 更新时间:2023-12-03 01:53:13 25 4
gpt4 key购买 nike

我有示例代码来生成未绑定(bind)的源并使用它:

对象主要{

 def main(args : Array[String]): Unit = {

implicit val system = ActorSystem("Sys")
import system.dispatcher

implicit val materializer = ActorFlowMaterializer()

val source: Source[String] = Source(() => {
Iterator.continually({ "message:" + ThreadLocalRandom.current().nextInt(10000)})
})

source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}

}

我想创建实现以下功能的类:

trait MySources {
def addToSource(item: String)
def getSource() : Source[String]
}

我需要将它与多个线程一起使用,例如:

class MyThread(mySources: MySources) extends Thread {
override def run(): Unit = {
for(i <- 1 to 1000000) { // here will be infinite loop
mySources.addToSource(i.toString)
}
}
}

预期的完整代码:

object Main {
def main(args : Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher

implicit val materializer = ActorFlowMaterializer()

val sources = new MySourcesImplementation()

for(i <- 1 to 100) {
(new MyThread(sources)).start()
}

val source = sources.getSource()

source.runForeach((item:String) => { println(item) })
.onComplete{ _ => system.shutdown() }
}
}

如何实现MySources

最佳答案

获得非有限源的一种方法是使用一种特殊类型的 Actor 作为源,它混合了 ActorPublisher 特征。如果您创建其中一种类型的 Actor,然后通过调用 ActorPublisher.apply 进行包装,您最终会得到一个 Reactive Streams Publisher 实例,这样您就可以使用来自 Sourceapply 来从中生成 Source。之后,您只需确保您的 ActorPublisher 类正确处理用于向下游发送元素的响应式流协议(protocol),您就可以开始了。一个非常简单的例子如下:

import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._

object DynamicSourceExample extends App{

implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()

val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)

Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))

for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}

}

class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty

def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)

case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}


case other =>
println(s"got other $other")
}


}

关于scala - 如何动态添加元素到Source?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29072963/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com