gpt4 book ai didi

scala - 在运行时动态创建 Akka 流

转载 作者:行者123 更新时间:2023-12-03 14:30:15 26 4
gpt4 key购买 nike

我目前正在尝试在运行时动态创建 Akka Stream 图定义。这个想法是用户将能够以交互方式定义流并将它们附加到现有/正在运行的 BroadcastHubs .这意味着我不知道在编译时将使用哪些流,甚至不知道将使用多少流。

不幸的是,我正在为泛型/类型删除而苦苦挣扎。坦率地说,我什至不确定我正在尝试做的事情是否可以在 JVM 上实现。

我有一个函数可以返回 Akka Streams Flow代表两个连接Flows .它使用 Scala 的 TypeTags绕过类型删除。如果第一个流的输出类型与第二个流的输入类型相同,则可以成功连接。这工作得很好。

import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}

import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}

def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
Try {
if (typeOf[B] =:= typeOf[C]) {
val c = b.asInstanceOf[Flow[B, D, NotUsed]]

Flow.fromGraph {
GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
(s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
}
}
}
else
throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
}
}

所以如果我有 Flow[A,B]Flow[C,D] ,结果将是 Flow[A,D]假设 B 和 C 是同一类型。

我也有尝试合并/减少 List 的功能的 Flows下降到单个 Flow .让我们假设这个列表是从文件或 Web 请求的流定义列表中派生的。
def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
fcs match {
case Nil => Success(None)
case h :: Nil => Success(Some(h))
case h :: t =>
val n = t.head

connect(h, n) match {
case Success(fc) => merge(fc :: t)
case Failure(e) => Failure(e)
}
}
}

不幸的是,由于 Flows存储在 List 中, 由于标准 Lists 上的类型删除,我丢失了所有类型信息,因此无法连接 Flows在运行时。下面是一个例子:
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)

def flowI2S() = Flow.fromFunction[Int, String](_.toString)

val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()

val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)

val y = merge(fcs)

这导致异常:
Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)

我一直在查看迈尔斯·萨宾的 Shapeless ,并认为我可以使用 HLists保留类型信息。不幸的是,这似乎只有当我在编译时知道列表的各个类型和长度时才有效。如果我上传一个特定的 HList只是 HList ,看来我又丢失了类型信息。
val fcs: HList = a :: b :: c :: d :: HNil

所以我的问题是......这甚至可能吗?有没有办法用 Shapeless 泛型魔法来做到这一点(最好不需要使用特定的非存在类型提取器)?我想找到尽可能通用的解决方案,任何帮助将不胜感激。

谢谢!

最佳答案

正如您已经注意到的,它不起作用的原因是列表删除了您拥有的类型。因此这是不可能的。
如果您知道可以用作中间类型的所有类型,则可以通过添加解析函数来解决该问题。添加这样的功能也将简化您的连接方法。我将添加一个代码片段。我希望它会很清楚。

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)

def main(args: Array[String]): Unit = {
val idInt1 = flowIdentity[Int]()
val idInt2 = flowIdentity[Int]()
val int2String = flowI2S()
val idString = flowIdentity[String]()
val fcs = List(idInt1, idInt2, int2String, idString)

val source = Source(1 to 10)
val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
source.via(mergedGraph).to(Sink.foreach(println)).run()
}

def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = {
fcs match {
case Nil => None
case h :: Nil => Some(h)
case h :: t =>
val n = t.head

val fc = resolveConnect(h, n)
merge(fc :: t.tail)
}
}

def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = {
if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) {
connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
} else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) {
connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
} else {
throw new UnsupportedOperationException
}
}

def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = {
a.via(b)
}

def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = {
a.via(b)
}
附言
还有另一个错误隐藏在那里,无限循环。调用合并递归时,第一个元素应该被删除,因为它已经合并到主流程中。

关于scala - 在运行时动态创建 Akka 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43639831/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com