gpt4 book ai didi

scala - 如何将 Source 动态添加到现有 Graph?

转载 作者:行者123 更新时间:2023-12-04 18:32:43 25 4
gpt4 key购买 nike

什么可以替代动态更改运行图?这是我的情况。我有将文章摄取到数据库中的图表。文章来自 3 个不同格式的插件。因此我有几个流程

val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]

// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]

val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))

val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()

问题是我每 24 小时从内容提供商那里获取一次数据,每 2 小时从新闻中获取一次数据,最后一个来源可能随时出现,因为它来自人类。

我意识到图形是不可变的,但是我如何定期附加 Source 的新实例到我的图表,以便我对摄取过程进行单点节流?

更新:你可以说我的数据是 Source 的流-s,就我而言,三个来源。但我无法改变它,因为我得到了 Source 的实例。来自外部类(所谓的插件)。这些插件独立于我的摄取类工作。我不能把它们组合成一个巨大的类来拥有一个 Source .

最佳答案

好的,通常正确的方法是将源流加入单个源,即从 Source[Source[T, _], Whatever] 开始至 Source[T, Whatever] .这可以通过 flatMapConcat 来完成或与 flatMapMerge .因此,如果你能得到一个 Source[Source[Article, NotUsed], NotUsed] ,您可以使用 flatMap* 之一变体并获得最终 Source[Article, NotUsed] .为你的每个来源做这件事(没有双关语),然后你原来的方法应该有效。

关于scala - 如何将 Source 动态添加到现有 Graph?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37944150/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com